Files
blackroad-operating-system/services/analytics/anomaly_rules.py
Alexa Louise 9644737ba7 feat: Add domain architecture and extract core services from Prism Console
## Domain Architecture
- Complete domain-to-service mapping for 16 verified domains
- Subdomain architecture for blackroad.systems and blackroad.io
- GitHub organization mapping (BlackRoad-OS repos)
- Railway service-to-domain configuration
- DNS configuration templates for Cloudflare

## Extracted Services

### AIops Service (services/aiops/)
- Canary analysis for deployment validation
- Config drift detection
- Event correlation engine
- Auto-remediation with runbook mapping
- SLO budget management

### Analytics Service (services/analytics/)
- Rule-based anomaly detection with safe expression evaluation
- Cohort analysis with multi-metric aggregation
- Decision engine with credit budget constraints
- Narrative report generation

### Codex Governance (services/codex/)
- 82+ governance principles (entries)
- Codex Pantheon with 48+ agent archetypes
- Manifesto defining ethical framework

## Integration Points
- AIops → infra.blackroad.systems (blackroad-os-infra)
- Analytics → core.blackroad.systems (blackroad-os-core)
- Codex → operator.blackroad.systems (blackroad-os-operator)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-29 13:39:08 -06:00

99 lines
3.2 KiB
Python

import ast
import json
from collections import defaultdict
from datetime import datetime
from pathlib import Path
from typing import Dict, List
import yaml
from .utils import increment, log_event, validate
ROOT = Path(__file__).resolve().parents[1]
DATA_DIR = ROOT / "samples" / "metrics"
ART = ROOT / "artifacts" / "anomalies"
MIN_DATA_POINTS = 5
class SafeEval(ast.NodeVisitor):
allowed = (
ast.Expression,
ast.Compare,
ast.Name,
ast.Load,
ast.BinOp,
ast.UnaryOp,
ast.Num,
ast.operator,
ast.unaryop,
ast.cmpop,
)
def visit(self, node): # type: ignore[override]
if not isinstance(node, self.allowed):
raise ValueError("unsafe expression")
return super().visit(node)
def _eval(expr: str, ctx: Dict[str, float]) -> bool:
tree = ast.parse(expr, mode="eval")
SafeEval().visit(tree)
return bool(eval(compile(tree, "<expr>", "eval"), {"__builtins__": {}}, ctx))
def _load_series(metric: str, group_by: str) -> Dict[str, List[Dict[str, float]]]:
data = json.loads(Path(DATA_DIR / f"{metric}_{group_by}.json").read_text())
buckets: Dict[str, List[Dict[str, float]]] = defaultdict(list)
for row in data:
buckets[row[group_by]].append(row)
for rows in buckets.values():
rows.sort(key=lambda r: r["date"])
return buckets
def detect_anomalies(metric: str, group_by: str, window: str, rule: Dict[str, str]) -> List[Dict]:
series = _load_series(metric, group_by)
anomalies: List[Dict] = []
for grp, rows in series.items():
if len(rows) < MIN_DATA_POINTS:
continue
trailing = rows[-MIN_DATA_POINTS:-1]
mean = sum(r["value"] for r in trailing) / len(trailing)
current = rows[-1]["value"]
prev = rows[-2]["value"]
pct_drop = (mean - current) / mean * 100 if mean else 0
delta = current - prev
ctx = {"value": current, "pct_drop": pct_drop, "delta": delta}
if _eval(rule["condition"], ctx):
anomalies.append(
{
"metric": metric,
"group": grp,
"severity": rule["severity"],
"value": current,
"pct_drop": round(pct_drop, 2),
}
)
return anomalies
def run_rules(rules_path: Path, window: str) -> List[Dict]:
rules = yaml.safe_load(rules_path.read_text())["rules"]
all_anoms: List[Dict] = []
for rule in rules:
all_anoms.extend(
detect_anomalies(rule["metric"], rule["group_by"], window, rule)
)
ts = datetime.utcnow().strftime("%Y%m%d%H%M%S")
ART.mkdir(parents=True, exist_ok=True)
out_path = ART / f"{ts}.json"
storage_path = ART / "latest.json"
out_path.write_text(json.dumps(all_anoms, indent=2))
storage_path.write_text(json.dumps(all_anoms, indent=2))
summary = ART / f"{ts}.md"
summary.write_text("\n".join(f"- {a['metric']} {a['group']}" for a in all_anoms))
validate(all_anoms, "anomaly.schema.json")
increment("anomaly_detect")
log_event({"type": "anomaly_detect", "rules": str(rules_path), "lineage": [r.get("metric") for r in rules]})
return all_anoms