mirror of
https://github.com/blackboxprogramming/BlackRoad-Operating-System.git
synced 2026-03-17 04:57:15 -05:00
## Domain Architecture - Complete domain-to-service mapping for 16 verified domains - Subdomain architecture for blackroad.systems and blackroad.io - GitHub organization mapping (BlackRoad-OS repos) - Railway service-to-domain configuration - DNS configuration templates for Cloudflare ## Extracted Services ### AIops Service (services/aiops/) - Canary analysis for deployment validation - Config drift detection - Event correlation engine - Auto-remediation with runbook mapping - SLO budget management ### Analytics Service (services/analytics/) - Rule-based anomaly detection with safe expression evaluation - Cohort analysis with multi-metric aggregation - Decision engine with credit budget constraints - Narrative report generation ### Codex Governance (services/codex/) - 82+ governance principles (entries) - Codex Pantheon with 48+ agent archetypes - Manifesto defining ethical framework ## Integration Points - AIops → infra.blackroad.systems (blackroad-os-infra) - Analytics → core.blackroad.systems (blackroad-os-core) - Codex → operator.blackroad.systems (blackroad-os-operator) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
66 lines
2.2 KiB
Python
66 lines
2.2 KiB
Python
import json
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Dict, List
|
|
|
|
from tools import storage
|
|
|
|
from .utils import increment, log_event
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
COHORTS_DIR = ROOT / "artifacts" / "cohorts"
|
|
DATA_DIR = ROOT / "samples" / "data"
|
|
|
|
|
|
def define_cohort(name: str, criteria: Dict[str, str]) -> None:
|
|
path = COHORTS_DIR / f"{name}.json"
|
|
storage.write(str(path), criteria)
|
|
|
|
|
|
def _period(ts: str, window: str) -> str:
|
|
dt = datetime.fromisoformat(ts)
|
|
if window == "M":
|
|
return dt.strftime("%Y-%m")
|
|
if window == "W":
|
|
return f"{dt.isocalendar().year}-W{dt.isocalendar().week:02d}"
|
|
if window == "Q":
|
|
q = (dt.month - 1) // 3 + 1
|
|
return f"{dt.year}-Q{q}"
|
|
raise ValueError("unknown window")
|
|
|
|
|
|
METRIC_FUNCS = {
|
|
"revenue": lambda rows: sum(r["revenue"] for r in rows),
|
|
"gross_margin_pct": lambda rows: (
|
|
round(
|
|
(sum(r["revenue"] - r["cost"] for r in rows) / revenue_sum * 100)
|
|
if (revenue_sum := sum(r["revenue"] for r in rows))
|
|
else 0,
|
|
2,
|
|
)
|
|
),
|
|
"nps": lambda rows: round(sum(r["nps"] for r in rows) / len(rows), 2),
|
|
"return_rate": lambda rows: round(sum(r["return_rate"] for r in rows) / len(rows), 4),
|
|
"uptime": lambda rows: round(sum(r["uptime"] for r in rows) / len(rows), 3),
|
|
"mttr": lambda rows: round(sum(r["mttr"] for r in rows) / len(rows), 2),
|
|
}
|
|
|
|
|
|
def cohort_view(table: str, cohort_name: str, metrics: List[str], window: str) -> List[Dict[str, float]]:
|
|
criteria = json.loads(storage.read(str(COHORTS_DIR / f"{cohort_name}.json")))
|
|
rows = json.loads(Path(DATA_DIR / f"{table}.json").read_text())
|
|
filtered = [r for r in rows if all(r.get(k) == v for k, v in criteria.items())]
|
|
buckets: Dict[str, List[Dict]] = defaultdict(list)
|
|
for r in filtered:
|
|
buckets[_period(r["date"], window)].append(r)
|
|
out: List[Dict[str, float]] = []
|
|
for period, bucket in sorted(buckets.items()):
|
|
rec = {"period": period}
|
|
for m in metrics:
|
|
rec[m] = METRIC_FUNCS[m](bucket)
|
|
out.append(rec)
|
|
increment("cohort_run")
|
|
log_event({"type": "cohort_run", "cohort": cohort_name, "table": table})
|
|
return out
|