mirror of
https://github.com/blackboxprogramming/BlackRoad-Operating-System.git
synced 2026-03-17 00:57:12 -05:00
## Domain Architecture - Complete domain-to-service mapping for 16 verified domains - Subdomain architecture for blackroad.systems and blackroad.io - GitHub organization mapping (BlackRoad-OS repos) - Railway service-to-domain configuration - DNS configuration templates for Cloudflare ## Extracted Services ### AIops Service (services/aiops/) - Canary analysis for deployment validation - Config drift detection - Event correlation engine - Auto-remediation with runbook mapping - SLO budget management ### Analytics Service (services/analytics/) - Rule-based anomaly detection with safe expression evaluation - Cohort analysis with multi-metric aggregation - Decision engine with credit budget constraints - Narrative report generation ### Codex Governance (services/codex/) - 82+ governance principles (entries) - Codex Pantheon with 48+ agent archetypes - Manifesto defining ethical framework ## Integration Points - AIops → infra.blackroad.systems (blackroad-os-infra) - Analytics → core.blackroad.systems (blackroad-os-core) - Codex → operator.blackroad.systems (blackroad-os-operator) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
99 lines
3.2 KiB
Python
99 lines
3.2 KiB
Python
import ast
|
|
import json
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Dict, List
|
|
|
|
import yaml
|
|
|
|
from .utils import increment, log_event, validate
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
DATA_DIR = ROOT / "samples" / "metrics"
|
|
ART = ROOT / "artifacts" / "anomalies"
|
|
MIN_DATA_POINTS = 5
|
|
|
|
|
|
class SafeEval(ast.NodeVisitor):
|
|
allowed = (
|
|
ast.Expression,
|
|
ast.Compare,
|
|
ast.Name,
|
|
ast.Load,
|
|
ast.BinOp,
|
|
ast.UnaryOp,
|
|
ast.Num,
|
|
ast.operator,
|
|
ast.unaryop,
|
|
ast.cmpop,
|
|
)
|
|
|
|
def visit(self, node): # type: ignore[override]
|
|
if not isinstance(node, self.allowed):
|
|
raise ValueError("unsafe expression")
|
|
return super().visit(node)
|
|
|
|
|
|
def _eval(expr: str, ctx: Dict[str, float]) -> bool:
|
|
tree = ast.parse(expr, mode="eval")
|
|
SafeEval().visit(tree)
|
|
return bool(eval(compile(tree, "<expr>", "eval"), {"__builtins__": {}}, ctx))
|
|
|
|
|
|
def _load_series(metric: str, group_by: str) -> Dict[str, List[Dict[str, float]]]:
|
|
data = json.loads(Path(DATA_DIR / f"{metric}_{group_by}.json").read_text())
|
|
buckets: Dict[str, List[Dict[str, float]]] = defaultdict(list)
|
|
for row in data:
|
|
buckets[row[group_by]].append(row)
|
|
for rows in buckets.values():
|
|
rows.sort(key=lambda r: r["date"])
|
|
return buckets
|
|
|
|
|
|
def detect_anomalies(metric: str, group_by: str, window: str, rule: Dict[str, str]) -> List[Dict]:
|
|
series = _load_series(metric, group_by)
|
|
anomalies: List[Dict] = []
|
|
for grp, rows in series.items():
|
|
if len(rows) < MIN_DATA_POINTS:
|
|
continue
|
|
trailing = rows[-MIN_DATA_POINTS:-1]
|
|
mean = sum(r["value"] for r in trailing) / len(trailing)
|
|
current = rows[-1]["value"]
|
|
prev = rows[-2]["value"]
|
|
pct_drop = (mean - current) / mean * 100 if mean else 0
|
|
delta = current - prev
|
|
ctx = {"value": current, "pct_drop": pct_drop, "delta": delta}
|
|
if _eval(rule["condition"], ctx):
|
|
anomalies.append(
|
|
{
|
|
"metric": metric,
|
|
"group": grp,
|
|
"severity": rule["severity"],
|
|
"value": current,
|
|
"pct_drop": round(pct_drop, 2),
|
|
}
|
|
)
|
|
return anomalies
|
|
|
|
|
|
def run_rules(rules_path: Path, window: str) -> List[Dict]:
|
|
rules = yaml.safe_load(rules_path.read_text())["rules"]
|
|
all_anoms: List[Dict] = []
|
|
for rule in rules:
|
|
all_anoms.extend(
|
|
detect_anomalies(rule["metric"], rule["group_by"], window, rule)
|
|
)
|
|
ts = datetime.utcnow().strftime("%Y%m%d%H%M%S")
|
|
ART.mkdir(parents=True, exist_ok=True)
|
|
out_path = ART / f"{ts}.json"
|
|
storage_path = ART / "latest.json"
|
|
out_path.write_text(json.dumps(all_anoms, indent=2))
|
|
storage_path.write_text(json.dumps(all_anoms, indent=2))
|
|
summary = ART / f"{ts}.md"
|
|
summary.write_text("\n".join(f"- {a['metric']} {a['group']}" for a in all_anoms))
|
|
validate(all_anoms, "anomaly.schema.json")
|
|
increment("anomaly_detect")
|
|
log_event({"type": "anomaly_detect", "rules": str(rules_path), "lineage": [r.get("metric") for r in rules]})
|
|
return all_anoms
|