Files
blackroad/fleet/alice/opt-blackroad/stats-proxy.py
Alexa Amundson 78fbe80f2a Initial monorepo — everything BlackRoad in one place
bin/       230 CLI tools (ask-*, br-*, agent-*, roadid, carpool)
scripts/   99 automation scripts
fleet/     Node configs and deployment
workers/   Cloudflare Worker sources (roadpay, road-search, squad webhooks)
roadc/     RoadC programming language
roadnet/   Mesh network (5 APs, WireGuard)
operator/  Memory system scripts
config/    System configs
dotfiles/  Shell configs
docs/      Documentation

BlackRoad OS — Pave Tomorrow.

RoadChain-SHA2048: d1a24f55318d338b
RoadChain-Identity: alexa@sovereign
RoadChain-Full: d1a24f55318d338b24b60bad7be39286379c76ae5470817482100cb0ddbbcb97e147d07ac7243da0a9f0363e4e5c833d612b9c0df3a3cd20802465420278ef74875a5b77f55af6fe42a931b8b635b3d0d0b6bde9abf33dc42eea52bc03c951406d8cbe49f1a3d29b26a94dade05e9477f34a7d4d4c6ec4005c3c2ac54e73a68440c512c8e83fd9b1fe234750b898ef8f4032c23db173961fe225e67a0432b5293a9714f76c5c57ed5fdf35b9fb40fd73c03ebf88b7253c6a0575f5afb6a6b49b3bda310602fb1ef676859962dad2aebbb2875814b30eee0a8ba195e482d4cbc91d8819e7f38f6db53e8063401649c77bb994371473cabfb917fb53e8cbe73d60
2026-03-14 17:08:41 -05:00

570 lines
22 KiB
Python

#!/usr/bin/env python3
"""BlackRoad Fleet Telemetry — full infrastructure collector.
Runs on each Pi node, collects EVERYTHING, pushes to Worker constellation."""
import http.server
import json
import os
import time
import urllib.request
import urllib.error
import hashlib
import threading
import socket
import subprocess
import re
PORT = 7890
CACHE_DIR = "/tmp/blackroad-stats-cache"
CACHE_TTL = 1800
GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN", "")
CF_API_TOKEN = os.environ.get("CF_API_TOKEN", "")
HOSTNAME = socket.gethostname()
GITHUB_ORGS = [
"BlackRoad-OS", "BlackRoad-AI", "Blackbox-Enterprises", "BlackRoad-Labs",
"BlackRoad-Cloud", "BlackRoad-Ventures", "BlackRoad-Foundation", "BlackRoad-Media",
"BlackRoad-Hardware", "BlackRoad-Education", "BlackRoad-Gov", "BlackRoad-Security",
"BlackRoad-Interactive", "BlackRoad-Archive", "BlackRoad-Studio", "BlackRoad-OS-Inc"
]
GITHUB_USER = "blackboxprogramming"
CF_ACCOUNT_ID = "848cf0b18d51e0170e0d1537aec3505a"
PI_NODES = {
"alice": {"ip": "192.168.4.49", "wg": "10.8.0.6", "role": "gateway"},
"cecilia": {"ip": "192.168.4.96", "wg": "10.8.0.3", "role": "ai-core"},
"octavia": {"ip": "192.168.4.97", "wg": "10.8.0.4", "role": "gitea-swarm"},
"aria": {"ip": "192.168.4.98", "wg": "10.8.0.7", "role": "portainer"},
}
DROPLETS = {
"gematria": {"ip": "10.8.0.8", "role": "compute"},
"anastasia": {"ip": "10.8.0.1", "role": "hub"},
}
os.makedirs(CACHE_DIR, exist_ok=True)
def run_cmd(cmd, timeout=10):
try:
r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=timeout)
return r.stdout.strip()
except:
return ""
def cache_key(url):
return os.path.join(CACHE_DIR, hashlib.md5(url.encode()).hexdigest() + ".json")
def cached_fetch(url, headers=None, ttl=CACHE_TTL):
key = cache_key(url)
if os.path.exists(key):
if time.time() - os.path.getmtime(key) < ttl:
with open(key) as f:
return json.load(f)
req = urllib.request.Request(url, headers=headers or {})
try:
with urllib.request.urlopen(req, timeout=15) as resp:
data = json.loads(resp.read().decode())
with open(key, "w") as f:
json.dump(data, f)
return data
except:
if os.path.exists(key):
with open(key) as f:
return json.load(f)
return None
def gh_headers():
h = {"User-Agent": "BlackRoad-Fleet", "Accept": "application/vnd.github.v3+json"}
if GITHUB_TOKEN:
h["Authorization"] = f"token {GITHUB_TOKEN}"
return h
def cf_headers():
return {"Authorization": f"Bearer {CF_API_TOKEN}"} if CF_API_TOKEN else {}
# ─── SYSTEM TELEMETRY ────────────────────────────────────────────
def get_system_stats():
"""Collect local Pi system metrics."""
stats = {"hostname": HOSTNAME, "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())}
# CPU
stats["cpu_cores"] = int(run_cmd("nproc") or "0")
load = run_cmd("cat /proc/loadavg")
if load:
parts = load.split()
stats["load_1m"] = float(parts[0])
stats["load_5m"] = float(parts[1])
stats["load_15m"] = float(parts[2])
# Memory
mem = run_cmd("free -b | awk '/Mem/{print $2,$3,$7}'")
if mem:
parts = mem.split()
stats["ram_total_bytes"] = int(parts[0])
stats["ram_used_bytes"] = int(parts[1])
stats["ram_available_bytes"] = int(parts[2])
stats["ram_percent"] = round(int(parts[1]) / int(parts[0]) * 100, 1)
# Disk
disk = run_cmd("df -B1 / | awk 'NR==2{print $2,$3,$4,$5}'")
if disk:
parts = disk.split()
stats["disk_total_bytes"] = int(parts[0])
stats["disk_used_bytes"] = int(parts[1])
stats["disk_available_bytes"] = int(parts[2])
stats["disk_percent"] = parts[3]
# Temperature
temp = run_cmd("vcgencmd measure_temp 2>/dev/null")
if "temp=" in (temp or ""):
stats["temperature_c"] = float(re.search(r"temp=([\d.]+)", temp).group(1))
# Uptime
uptime_s = run_cmd("cat /proc/uptime")
if uptime_s:
stats["uptime_seconds"] = int(float(uptime_s.split()[0]))
days = stats["uptime_seconds"] // 86400
hours = (stats["uptime_seconds"] % 86400) // 3600
stats["uptime_human"] = f"{days}d {hours}h"
# Network
stats["listening_ports"] = int(run_cmd("ss -tlnp 2>/dev/null | grep -c LISTEN") or "0")
stats["running_services"] = int(run_cmd("systemctl list-units --type=service --state=running --no-pager 2>/dev/null | grep -c running") or "0")
return stats
# ─── OLLAMA / AI MODELS ─────────────────────────────────────────
def get_ollama_stats():
"""Get Ollama models — local node only (Ollama binds to localhost)."""
nodes = {}
# Only check local Ollama (it binds to 127.0.0.1)
try:
url = "http://127.0.0.1:11434/api/tags"
req = urllib.request.Request(url)
with urllib.request.urlopen(req, timeout=5) as resp:
data = json.loads(resp.read().decode())
models = []
total_size = 0
for m in data.get("models", []):
sz = m.get("size", 0)
total_size += sz
details = m.get("details", {})
models.append({
"name": m["name"],
"size_bytes": sz,
"size_gb": round(sz / (1024**3), 2),
"family": details.get("family", "unknown"),
"params": details.get("parameter_size", "unknown"),
"quantization": details.get("quantization_level", "unknown"),
})
nodes[HOSTNAME] = {
"online": True,
"model_count": len(models),
"total_size_gb": round(total_size / (1024**3), 2),
"models": models,
}
except:
nodes[HOSTNAME] = {"online": False, "model_count": 0, "models": []}
# Also check other nodes via SSH or just report what we know
for name in PI_NODES:
if name != HOSTNAME and name not in nodes:
# Try remote Ollama via their LAN IP (won't work if bound to localhost)
# Instead, mark as unknown — other nodes will report their own
nodes[name] = {"online": None, "model_count": 0, "note": "reported by own node"}
total_models = sum(n["model_count"] for n in nodes.values() if n.get("online"))
total_size = sum(n.get("total_size_gb", 0) for n in nodes.values() if n.get("online"))
return {"total_models": total_models, "total_size_gb": round(total_size, 2), "nodes": nodes}
# ─── HAILO-8 AI ACCELERATORS ────────────────────────────────────
def get_hailo_stats():
"""Check Hailo-8 accelerator status."""
hailo = run_cmd("hailortcli fw-control identify 2>/dev/null")
if hailo and "Board" in hailo:
return {
"present": True,
"tops": 26,
"info": hailo.strip(),
"node": HOSTNAME,
}
return {"present": False, "node": HOSTNAME}
# ─── GITEA ───────────────────────────────────────────────────────
def get_gitea_stats():
"""Get Gitea stats from Octavia."""
try:
url = "http://192.168.4.97:3100/api/v1/repos/search?limit=50&sort=updated"
req = urllib.request.Request(url)
with urllib.request.urlopen(req, timeout=10) as resp:
data = json.loads(resp.read().decode())
repos = data.get("data", [])
total_size = sum(r.get("size", 0) for r in repos)
return {
"online": True,
"repo_count": len(repos),
"total_size_kb": total_size,
"repos": [{"name": r["full_name"], "size_kb": r.get("size", 0),
"language": r.get("language", ""), "updated": r.get("updated_at", "")}
for r in repos[:20]],
}
except:
return {"online": False, "repo_count": 0}
# ─── DOCKER ──────────────────────────────────────────────────────
def get_docker_stats():
"""Docker container info from all nodes."""
nodes = {}
for name, info in PI_NODES.items():
ip = info["ip"]
try:
url = f"http://{ip}:2375/containers/json?all=true"
req = urllib.request.Request(url)
with urllib.request.urlopen(req, timeout=5) as resp:
containers = json.loads(resp.read().decode())
running = sum(1 for c in containers if c.get("State") == "running")
nodes[name] = {
"total": len(containers),
"running": running,
"containers": [{"name": c.get("Names", [""])[0], "state": c.get("State", ""),
"image": c.get("Image", "")} for c in containers[:10]],
}
except:
# Try via SSH command output
pass
return nodes
# ─── WIREGUARD MESH ─────────────────────────────────────────────
def get_wireguard_stats():
"""WireGuard mesh status."""
wg = run_cmd("sudo wg show 2>/dev/null")
if not wg:
return {"active": False}
peers = []
current_peer = {}
for line in wg.split("\n"):
line = line.strip()
if line.startswith("peer:"):
if current_peer:
peers.append(current_peer)
current_peer = {"public_key": line.split(":")[1].strip()[:16] + "..."}
elif "endpoint:" in line:
current_peer["endpoint"] = line.split(":")[1].strip() + ":" + line.split(":")[-1].strip()
elif "latest handshake:" in line:
current_peer["last_handshake"] = line.split(":", 1)[1].strip()
elif "transfer:" in line:
current_peer["transfer"] = line.split(":", 1)[1].strip()
elif "allowed ips:" in line:
current_peer["allowed_ips"] = line.split(":", 1)[1].strip()
if current_peer:
peers.append(current_peer)
return {"active": True, "peer_count": len(peers), "peers": peers}
# ─── NETWORK MESH PING ──────────────────────────────────────────
def get_mesh_latency():
"""Ping all nodes and droplets to measure mesh latency."""
results = {}
all_targets = {**{k: v["ip"] for k, v in PI_NODES.items()}, **{k: v["ip"] for k, v in DROPLETS.items()}}
for name, ip in all_targets.items():
if name == HOSTNAME:
results[name] = {"reachable": True, "latency_ms": 0, "local": True}
continue
ping = run_cmd(f"ping -c1 -W2 {ip} 2>/dev/null | grep -o 'time=[0-9.]*'")
if ping:
ms = float(ping.replace("time=", ""))
results[name] = {"reachable": True, "latency_ms": round(ms, 2)}
else:
results[name] = {"reachable": False}
return results
# ─── CLOUDFLARE FULL INVENTORY ───────────────────────────────────
def get_cf_full_stats():
"""Full Cloudflare inventory: Pages, KV, D1, R2, Workers, DNS zones."""
if not CF_API_TOKEN:
return {}
base = f"https://api.cloudflare.com/client/v4/accounts/{CF_ACCOUNT_ID}"
h = cf_headers()
stats = {}
# Pages projects
pages_data = cached_fetch(f"{base}/pages/projects", h, ttl=3600)
if pages_data and pages_data.get("success"):
total = pages_data.get("result_info", {}).get("total_count", 0)
all_projects = pages_data.get("result", [])
total_pages = pages_data.get("result_info", {}).get("total_pages", 1)
for pg in range(2, total_pages + 1):
pg_data = cached_fetch(f"{base}/pages/projects?page={pg}", h, ttl=3600)
if pg_data and pg_data.get("result"):
all_projects.extend(pg_data["result"])
domains = []
for p in all_projects:
domains.extend(p.get("domains", []))
stats["pages"] = {
"total_projects": total or len(all_projects),
"total_domains": len(domains),
"projects": [p["name"] for p in all_projects],
"custom_domains": [d for d in domains if ".pages.dev" not in d],
}
# KV namespaces
kv_data = cached_fetch(f"{base}/storage/kv/namespaces", h, ttl=3600)
if kv_data and kv_data.get("success"):
stats["kv"] = {
"total_namespaces": len(kv_data.get("result", [])),
"namespaces": [{"id": ns["id"], "title": ns["title"]} for ns in kv_data.get("result", [])],
}
# D1 databases
d1_data = cached_fetch(f"{base}/d1/database", h, ttl=3600)
if d1_data and d1_data.get("success"):
stats["d1"] = {
"total_databases": len(d1_data.get("result", [])),
"databases": [{"uuid": db["uuid"], "name": db["name"]} for db in d1_data.get("result", [])],
}
# R2 buckets
r2_data = cached_fetch(f"{base}/r2/buckets", h, ttl=3600)
if r2_data and r2_data.get("success"):
stats["r2"] = {
"total_buckets": len(r2_data.get("result", {}).get("buckets", [])),
"buckets": [b["name"] for b in r2_data.get("result", {}).get("buckets", [])],
}
# Workers
workers_data = cached_fetch(f"{base}/workers/scripts", h, ttl=3600)
if workers_data and workers_data.get("success"):
stats["workers"] = {
"total_workers": len(workers_data.get("result", [])),
"workers": [{"id": w["id"], "modified": w.get("modified_on", "")} for w in workers_data.get("result", [])],
}
# DNS zones
zones_data = cached_fetch("https://api.cloudflare.com/client/v4/zones?per_page=50", h, ttl=3600)
if zones_data and zones_data.get("success"):
stats["dns_zones"] = {
"total_zones": len(zones_data.get("result", [])),
"zones": [{"name": z["name"], "status": z["status"]} for z in zones_data.get("result", [])],
}
return stats
# ─── GITHUB FULL STATS ──────────────────────────────────────────
def get_github_full_stats():
"""Full GitHub stats across all orgs."""
total_pub = 0
total_priv = 0
org_count = 0
org_details = []
for org in GITHUB_ORGS:
data = cached_fetch(f"https://api.github.com/orgs/{org}", gh_headers())
if data and "public_repos" in data:
org_count += 1
pub = data.get("public_repos", 0)
priv = data.get("total_private_repos", data.get("owned_private_repos", 0))
total_pub += pub
total_priv += priv
org_details.append({"name": org, "public_repos": pub, "private_repos": priv, "total": pub + priv})
user_data = cached_fetch(f"https://api.github.com/users/{GITHUB_USER}", gh_headers())
personal = user_data.get("public_repos", 0) if user_data else 0
total_pub += personal
# Repo sizes from top orgs
all_langs = {}
total_size_kb = 0
for org in ["BlackRoad-OS", "BlackRoad-AI", "BlackRoad-OS-Inc"]:
page = 1
while page <= 15:
repos = cached_fetch(
f"https://api.github.com/orgs/{org}/repos?per_page=100&page={page}&sort=size&direction=desc",
gh_headers(), ttl=3600
)
if not repos or not isinstance(repos, list) or len(repos) == 0:
break
for r in repos:
sz = r.get("size", 0)
total_size_kb += sz
lang = r.get("language")
if lang:
all_langs[lang] = all_langs.get(lang, 0) + sz
if len(repos) < 100:
break
page += 1
sorted_langs = sorted(all_langs.items(), key=lambda x: -x[1])
lang_list = [{"language": l, "size_kb": s, "estimated_loc": s * 25} for l, s in sorted_langs[:15]]
return {
"total_repos": total_pub + total_priv,
"public_repos": total_pub,
"private_repos": total_priv,
"personal_repos": personal,
"org_count": org_count,
"orgs": sorted(org_details, key=lambda x: -x["total"]),
"languages": lang_list,
"total_size_kb": total_size_kb,
"estimated_loc": total_size_kb * 25,
}
# ─── GATHER ALL ──────────────────────────────────────────────────
def gather_all():
"""Gather EVERYTHING."""
system = get_system_stats()
github = get_github_full_stats()
cf = get_cf_full_stats()
ollama = get_ollama_stats()
hailo = get_hailo_stats()
gitea = get_gitea_stats()
mesh = get_mesh_latency()
wireguard = get_wireguard_stats()
# Build summary
cf_pages = cf.get("pages", {})
cf_kv = cf.get("kv", {})
cf_d1 = cf.get("d1", {})
cf_r2 = cf.get("r2", {})
cf_workers = cf.get("workers", {})
cf_dns = cf.get("dns_zones", {})
nodes_online = sum(1 for n, d in mesh.items() if d.get("reachable") and n in PI_NODES)
return {
"_generated": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"_cache_ttl_seconds": CACHE_TTL,
"_served_by": HOSTNAME,
"_version": "2.0-fleet",
"github": github,
"cloudflare": {
"total_projects": cf_pages.get("total_projects", 0),
"total_domains": cf_pages.get("total_domains", 0),
"custom_domains": len(cf_pages.get("custom_domains", [])),
"projects": cf_pages.get("projects", []),
"kv_namespaces": cf_kv.get("total_namespaces", 0),
"d1_databases": cf_d1.get("total_databases", 0),
"r2_buckets": cf_r2.get("total_buckets", 0),
"workers": cf_workers.get("total_workers", 0),
"dns_zones": cf_dns.get("total_zones", 0),
"worker_list": [w["id"] for w in cf_workers.get("workers", [])],
"zone_list": [z["name"] for z in cf_dns.get("zones", [])],
},
"fleet": {
"nodes_online": nodes_online,
"total_nodes": len(PI_NODES),
"mesh_latency": mesh,
"wireguard": wireguard,
"collector_node": system,
},
"ai": {
"ollama": ollama,
"hailo": {
"total_accelerators": 2,
"total_tops": 52,
"nodes": ["cecilia", "octavia"],
"local_status": hailo,
},
"total_models": ollama.get("total_models", 0),
"total_model_size_gb": ollama.get("total_size_gb", 0),
},
"gitea": gitea,
"summary": {
"repos": github["total_repos"],
"orgs": github["org_count"],
"public_repos": github["public_repos"],
"cloudflare_projects": cf_pages.get("total_projects", 0),
"cloudflare_domains": cf_pages.get("total_domains", 0),
"estimated_loc": github["estimated_loc"],
"kv_namespaces": cf_kv.get("total_namespaces", 0),
"d1_databases": cf_d1.get("total_databases", 0),
"r2_buckets": cf_r2.get("total_buckets", 0),
"workers": cf_workers.get("total_workers", 0),
"dns_zones": cf_dns.get("total_zones", 0),
"pi_nodes_online": nodes_online,
"ollama_models": ollama.get("total_models", 0),
"ollama_size_gb": ollama.get("total_size_gb", 0),
"hailo_tops": 52,
"gitea_repos": gitea.get("repo_count", 0),
},
}
# ─── SERVER ──────────────────────────────────────────────────────
_stats_cache = {"data": None, "time": 0}
_stats_lock = threading.Lock()
def get_or_refresh():
with _stats_lock:
if _stats_cache["data"] and time.time() - _stats_cache["time"] < CACHE_TTL:
return _stats_cache["data"]
stats = gather_all()
with _stats_lock:
_stats_cache["data"] = stats
_stats_cache["time"] = time.time()
return stats
class Handler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/health":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "ok", "node": HOSTNAME, "version": "2.0-fleet"}).encode())
return
if self.path == "/" or self.path.startswith("/stats"):
stats = get_or_refresh()
body = json.dumps(stats, indent=2).encode()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Cache-Control", "public, max-age=300")
self.send_header("X-Served-By", HOSTNAME)
self.send_header("X-Fleet-Version", "2.0")
self.end_headers()
self.wfile.write(body)
return
self.send_response(404)
self.end_headers()
def do_OPTIONS(self):
self.send_response(204)
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type")
self.end_headers()
def log_message(self, format, *args):
pass
if __name__ == "__main__":
threading.Thread(target=get_or_refresh, daemon=True).start()
server = http.server.HTTPServer(("0.0.0.0", PORT), Handler)
print(f"[{HOSTNAME}] Fleet telemetry v2.0 running on port {PORT}")
server.serve_forever()