Files
blackroad-operating-system/services/analytics/utils.py
Alexa Louise 9644737ba7 feat: Add domain architecture and extract core services from Prism Console
## Domain Architecture
- Complete domain-to-service mapping for 16 verified domains
- Subdomain architecture for blackroad.systems and blackroad.io
- GitHub organization mapping (BlackRoad-OS repos)
- Railway service-to-domain configuration
- DNS configuration templates for Cloudflare

## Extracted Services

### AIops Service (services/aiops/)
- Canary analysis for deployment validation
- Config drift detection
- Event correlation engine
- Auto-remediation with runbook mapping
- SLO budget management

### Analytics Service (services/analytics/)
- Rule-based anomaly detection with safe expression evaluation
- Cohort analysis with multi-metric aggregation
- Decision engine with credit budget constraints
- Narrative report generation

### Codex Governance (services/codex/)
- 82+ governance principles (entries)
- Codex Pantheon with 48+ agent archetypes
- Manifesto defining ethical framework

## Integration Points
- AIops → infra.blackroad.systems (blackroad-os-infra)
- Analytics → core.blackroad.systems (blackroad-os-core)
- Codex → operator.blackroad.systems (blackroad-os-operator)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-29 13:39:08 -06:00

32 lines
869 B
Python

import hashlib
import json
from pathlib import Path
from typing import Any
import jsonschema
from tools import storage
ROOT = Path(__file__).resolve().parents[1]
SCHEMAS = ROOT / "schemas"
MEMORY = ROOT / "orchestrator" / "memory.jsonl"
METRICS = ROOT / "artifacts" / "metrics.json"
def validate(instance: Any, schema_name: str) -> None:
schema_path = SCHEMAS / schema_name
schema = json.loads(schema_path.read_text())
jsonschema.validate(instance=instance, schema=schema)
def log_event(event: dict) -> None:
text = json.dumps(event, sort_keys=True)
sig = hashlib.sha256(text.encode("utf-8")).hexdigest()
storage.write(str(MEMORY), {**event, "signature": sig})
def increment(metric: str) -> None:
data = json.loads(storage.read(str(METRICS)) or "{}")
data[metric] = data.get(metric, 0) + 1
storage.write(str(METRICS), data)