Add 14 Python infra modules: codex, cluster, monitoring, DNS

- Codex scanner, search, oracle, and extract for component indexing
- Infra pager for on-call alerting
- Cluster coordinator, worker, and dashboard
- Fleet monitor for device health
- DNS system management
- GitHub Actions health analyzer
- Memory index daemon for persistent indexing
- Memory search for fast lookups
- RoadChain model verification

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Alexa Amundson
2026-02-20 22:57:10 -06:00
parent 635004a4a5
commit ccd429b89a
14 changed files with 4147 additions and 0 deletions

View File

@@ -0,0 +1,235 @@
#!/usr/bin/env python3
"""
BlackRoad Cluster Coordinator
Distributes tasks across all worker nodes
"""
import requests
import json
import time
import hashlib
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
# Cluster node configurations
NODES = [
{'name': 'octavia', 'host': '192.168.4.74', 'port': 8888, 'power': 10, 'arch': 'aarch64'},
{'name': 'aria', 'host': '192.168.4.64', 'port': 8888, 'power': 5, 'arch': 'aarch64'},
{'name': 'lucidia', 'host': '192.168.4.38', 'port': 8888, 'power': 5, 'arch': 'aarch64'},
]
class BlackRoadCluster:
"""Distributed cluster coordinator"""
def __init__(self):
self.nodes = NODES
self.available_nodes = []
self.discover_nodes()
def discover_nodes(self):
"""Check which nodes are online"""
print("🔍 Discovering cluster nodes...")
self.available_nodes = []
for node in self.nodes:
try:
url = f"http://{node['host']}:{node['port']}/status"
response = requests.get(url, timeout=2)
if response.status_code == 200:
status = response.json()
node['status'] = status
self.available_nodes.append(node)
print(f"{node['name']} ({node['host']}) - ONLINE")
except Exception as e:
print(f"{node['name']} ({node['host']}) - OFFLINE")
print(f"\n📊 Cluster: {len(self.available_nodes)}/{len(self.nodes)} nodes online\n")
return self.available_nodes
def execute_task(self, node, task):
"""Execute a task on a specific node"""
try:
url = f"http://{node['host']}:{node['port']}/execute"
response = requests.post(url, json=task, timeout=300)
if response.status_code == 200:
return response.json()
else:
return {'success': False, 'error': f"HTTP {response.status_code}", 'node': node['name']}
except Exception as e:
return {'success': False, 'error': str(e), 'node': node['name']}
def distribute_parallel(self, tasks):
"""Distribute tasks across all available nodes in parallel"""
print(f"🚀 Distributing {len(tasks)} tasks across {len(self.available_nodes)} nodes...")
results = []
start_time = time.time()
with ThreadPoolExecutor(max_workers=len(self.available_nodes)) as executor:
# Map tasks to nodes (round-robin)
futures = []
for i, task in enumerate(tasks):
node = self.available_nodes[i % len(self.available_nodes)]
task['task_id'] = f"task_{i}_{int(time.time())}"
future = executor.submit(self.execute_task, node, task)
futures.append((future, node['name'], task['task_id']))
# Collect results
for future, node_name, task_id in futures:
try:
result = future.result()
results.append(result)
status = "" if result.get('success') else ""
elapsed = result.get('elapsed', 0)
print(f" {status} {node_name}: {task_id} ({elapsed:.2f}s)")
except Exception as e:
print(f"{node_name}: {task_id} - {str(e)}")
results.append({'success': False, 'error': str(e), 'node': node_name})
total_time = time.time() - start_time
successful = sum(1 for r in results if r.get('success'))
print(f"\n📈 Summary:")
print(f" Total tasks: {len(tasks)}")
print(f" Successful: {successful}/{len(tasks)}")
print(f" Total time: {total_time:.2f}s")
print(f" Avg per task: {total_time/len(tasks):.2f}s")
print()
return results
def map_reduce(self, data_chunks, map_code, reduce_code=None):
"""Map/Reduce operation across cluster"""
print(f"🗺️ Map/Reduce: {len(data_chunks)} chunks across {len(self.available_nodes)} nodes")
# Map phase
map_tasks = []
for i, chunk in enumerate(data_chunks):
task = {
'type': 'python',
'code': f"""
data = {chunk}
{map_code}
"""
}
map_tasks.append(task)
map_results = self.distribute_parallel(map_tasks)
# Reduce phase (if provided)
if reduce_code:
# Collect all map outputs
outputs = [r['output'] for r in map_results if r.get('success')]
reduce_task = {
'type': 'python',
'code': f"""
results = {outputs}
{reduce_code}
"""
}
# Run reduce on most powerful node
reduce_result = self.execute_task(self.available_nodes[0], reduce_task)
return reduce_result
return map_results
def cluster_status(self):
"""Get status from all nodes"""
print("📊 Cluster Status")
print("=" * 70)
for node in self.available_nodes:
try:
url = f"http://{node['host']}:{node['port']}/status"
response = requests.get(url, timeout=2)
status = response.json()
print(f"\n🖥️ {node['name'].upper()} ({node['host']})")
print(f" Architecture: {status.get('arch')}")
print(f" CPUs: {status.get('cpu_count')}")
print(f" Load: {status.get('load')}")
print(f" Temperature: {status.get('temp')}")
print(f" Tasks completed: {status.get('tasks_completed')}")
except Exception as e:
print(f"\n{node['name']}: {str(e)}")
print()
def demo_distributed_compute():
"""Demo: Distributed matrix multiplication"""
cluster = BlackRoadCluster()
if not cluster.available_nodes:
print("❌ No nodes available!")
return
print("🧮 Demo: Distributed Matrix Operations")
print("=" * 50)
# Create multiple matrix multiplication tasks
matrix_tasks = []
sizes = [500, 500, 500, 500, 500, 500, 500, 500] # 8 tasks
for size in sizes:
task = {
'type': 'numpy',
'code': f"""
a = np.random.rand({size}, {size})
b = np.random.rand({size}, {size})
start = time.time()
c = np.dot(a, b)
elapsed = time.time() - start
print(f"Matrix {size}x{size}: {{elapsed:.4f}}s")
"""
}
matrix_tasks.append(task)
results = cluster.distribute_parallel(matrix_tasks)
return results
def demo_map_reduce():
"""Demo: Map/Reduce sum across cluster"""
cluster = BlackRoadCluster()
if not cluster.available_nodes:
print("❌ No nodes available!")
return
print("🗺️ Demo: Map/Reduce Sum")
print("=" * 50)
# Split data into chunks
chunks = [
list(range(0, 250000)),
list(range(250000, 500000)),
list(range(500000, 750000)),
list(range(750000, 1000000)),
]
map_code = "result = sum(data); print(result)"
reduce_code = "total = sum(int(r.strip()) for r in results if r.strip()); print(f'Total sum: {total}')"
result = cluster.map_reduce(chunks, map_code, reduce_code)
print(f"\n📊 Final result: {result.get('output') if result else 'Failed'}")
if __name__ == '__main__':
import sys
cluster = BlackRoadCluster()
if len(sys.argv) > 1:
if sys.argv[1] == 'status':
cluster.cluster_status()
elif sys.argv[1] == 'demo':
demo_distributed_compute()
elif sys.argv[1] == 'mapreduce':
demo_map_reduce()
else:
print("BlackRoad Cluster Coordinator")
print("\nUsage:")
print(" python3 blackroad-cluster-coordinator.py status - Show cluster status")
print(" python3 blackroad-cluster-coordinator.py demo - Run distributed compute demo")
print(" python3 blackroad-cluster-coordinator.py mapreduce - Run map/reduce demo")

View File

@@ -0,0 +1,158 @@
#!/usr/bin/env python3
"""
BlackRoad Pi Cluster Dashboard
Monitors: alice@alice, shellfish, lucidia@lucidia, aria64, pi@192.168.4.74 (octavia)
"""
import subprocess
import time
import sys
from datetime import datetime
# BlackRoad color palette
COLORS = {
'red': '\033[38;2;255;0;102m',
'orange': '\033[38;2;255;107;0m',
'yellow': '\033[38;2;255;157;0m',
'green': '\033[38;2;0;255;0m',
'blue': '\033[38;2;0;102;255m',
'purple': '\033[38;2;119;0;255m',
'reset': '\033[0m',
'bold': '\033[1m',
}
NODES = [
{'host': 'alice@alice', 'name': 'alice', 'arch': 'aarch64'},
{'host': 'shellfish', 'name': 'shellfish', 'arch': 'x86_64'},
{'host': 'lucidia@lucidia', 'name': 'lucidia', 'arch': 'aarch64'},
{'host': 'aria64', 'name': 'aria', 'arch': 'aarch64'},
{'host': 'pi@192.168.4.74', 'name': 'octavia', 'arch': 'aarch64'},
]
def run_ssh(host, command, timeout=2):
"""Run SSH command on remote host"""
try:
result = subprocess.run(
['ssh', '-o', 'ConnectTimeout=2', '-o', 'StrictHostKeyChecking=no',
host, command],
capture_output=True,
text=True,
timeout=timeout
)
return result.stdout.strip() if result.returncode == 0 else None
except Exception:
return None
def get_node_info(node):
"""Get detailed info from a node"""
host = node['host']
info = {
'online': False,
'uptime': 'N/A',
'temp': 'N/A',
'cpu_freq': 'N/A',
'mem_usage': 'N/A',
'load': 'N/A',
'pironman': False,
'octokit': False,
}
# Check if online
uptime = run_ssh(host, 'uptime')
if not uptime:
return info
info['online'] = True
# Parse uptime
if 'up' in uptime:
parts = uptime.split('up')[1].split(',')
info['uptime'] = parts[0].strip() if parts else 'N/A'
# Get load
if 'load average:' in uptime:
load = uptime.split('load average:')[1].strip()
info['load'] = load.split(',')[0].strip()
# Get temperature (Pi-specific)
if node['arch'] == 'aarch64':
temp = run_ssh(host, 'vcgencmd measure_temp 2>/dev/null')
if temp:
info['temp'] = temp.replace('temp=', '')
# Get CPU frequency
freq = run_ssh(host, 'vcgencmd measure_clock arm 2>/dev/null')
if freq and 'frequency' in freq:
mhz = int(freq.split('=')[1]) / 1_000_000
info['cpu_freq'] = f"{mhz:.0f} MHz"
# Get memory usage
mem = run_ssh(host, "free -h | grep Mem | awk '{print $3\"/\"$2}'")
if mem:
info['mem_usage'] = mem
# Check for Pironman
pironman = run_ssh(host, 'which pironman5 2>/dev/null')
info['pironman'] = bool(pironman)
# Check for Octokit
octokit = run_ssh(host, 'test -d ~/octokit && echo yes || echo no')
info['octokit'] = octokit == 'yes'
return info
def print_dashboard():
"""Print the cluster dashboard"""
print(f"\n{COLORS['bold']}🖤🛣️ BlackRoad Pi Cluster Dashboard{COLORS['reset']}")
print(f"{COLORS['purple']}{'='*80}{COLORS['reset']}")
print(f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
for node in NODES:
info = get_node_info(node)
# Status indicator
if info['online']:
status = f"{COLORS['green']}{COLORS['reset']}"
else:
status = f"{COLORS['red']}{COLORS['reset']}"
# Node header
print(f"{status} {COLORS['bold']}{node['name']:<12}{COLORS['reset']} "
f"({node['arch']:<8}) @ {node['host']}")
if info['online']:
print(f" {COLORS['blue']}Uptime:{COLORS['reset']} {info['uptime']:<15} "
f"{COLORS['orange']}Load:{COLORS['reset']} {info['load']:<8} "
f"{COLORS['yellow']}Temp:{COLORS['reset']} {info['temp']:<10}")
print(f" {COLORS['purple']}CPU:{COLORS['reset']} {info['cpu_freq']:<15} "
f"{COLORS['green']}Mem:{COLORS['reset']} {info['mem_usage']:<15}")
# Feature badges
badges = []
if info['pironman']:
badges.append(f"{COLORS['green']}[Pironman]{COLORS['reset']}")
if info['octokit']:
badges.append(f"{COLORS['blue']}[Octokit]{COLORS['reset']}")
if badges:
print(f" {' '.join(badges)}")
else:
print(f" {COLORS['red']}OFFLINE{COLORS['reset']}")
print()
def main():
"""Main loop"""
try:
while True:
subprocess.run(['clear'])
print_dashboard()
print(f"\n{COLORS['purple']}Press Ctrl+C to exit{COLORS['reset']}")
time.sleep(5)
except KeyboardInterrupt:
print(f"\n\n{COLORS['purple']}Dashboard stopped.{COLORS['reset']}\n")
sys.exit(0)
if __name__ == '__main__':
main()

205
scripts/python/cluster-worker.py Executable file
View File

@@ -0,0 +1,205 @@
#!/usr/bin/env python3
"""
BlackRoad Cluster Worker Node
Runs on each Pi to execute distributed tasks
"""
import socket
import time
import json
import sys
import subprocess
import platform
from http.server import HTTPServer, BaseHTTPRequestHandler
from threading import Thread
import hashlib
# Node configuration
NODE_NAME = socket.gethostname()
NODE_PORT = 8888
NODE_ARCH = platform.machine()
# Task execution results
task_results = {}
task_history = []
class WorkerHandler(BaseHTTPRequestHandler):
"""HTTP handler for receiving and executing tasks"""
def log_message(self, format, *args):
"""Suppress default logging"""
pass
def do_GET(self):
"""Handle status requests"""
if self.path == '/status':
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
status = {
'node': NODE_NAME,
'arch': NODE_ARCH,
'port': NODE_PORT,
'uptime': time.time(),
'tasks_completed': len(task_history),
'cpu_count': subprocess.getoutput('nproc'),
'load': subprocess.getoutput('uptime | grep -oP "load average: \K.*"'),
'memory': subprocess.getoutput('free -h | grep Mem'),
'temp': subprocess.getoutput('vcgencmd measure_temp 2>/dev/null || echo "N/A"'),
}
self.wfile.write(json.dumps(status).encode())
elif self.path.startswith('/result/'):
task_id = self.path.split('/')[-1]
if task_id in task_results:
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(task_results[task_id]).encode())
else:
self.send_response(404)
self.end_headers()
elif self.path == '/history':
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(task_history).encode())
else:
self.send_response(404)
self.end_headers()
def do_POST(self):
"""Handle task execution requests"""
if self.path == '/execute':
content_length = int(self.headers['Content-Length'])
task_data = json.loads(self.rfile.read(content_length).decode())
task_id = task_data.get('task_id', hashlib.md5(str(time.time()).encode()).hexdigest())
task_type = task_data.get('type', 'python')
task_code = task_data.get('code', '')
print(f"[{NODE_NAME}] Executing task {task_id} (type: {task_type})")
start_time = time.time()
try:
if task_type == 'python':
# Execute Python code
result = subprocess.run(
['python3', '-c', task_code],
capture_output=True,
text=True,
timeout=300
)
output = result.stdout
error = result.stderr
success = result.returncode == 0
elif task_type == 'bash':
# Execute bash command
result = subprocess.run(
task_code,
shell=True,
capture_output=True,
text=True,
timeout=300
)
output = result.stdout
error = result.stderr
success = result.returncode == 0
elif task_type == 'numpy':
# Special numpy task
numpy_code = f"""
import numpy as np
import time
{task_code}
"""
result = subprocess.run(
['python3', '-c', numpy_code],
capture_output=True,
text=True,
timeout=300
)
output = result.stdout
error = result.stderr
success = result.returncode == 0
else:
output = ""
error = f"Unknown task type: {task_type}"
success = False
elapsed = time.time() - start_time
task_result = {
'task_id': task_id,
'node': NODE_NAME,
'success': success,
'output': output,
'error': error,
'elapsed': elapsed,
'timestamp': time.time()
}
task_results[task_id] = task_result
task_history.append({
'task_id': task_id,
'type': task_type,
'elapsed': elapsed,
'success': success,
'timestamp': time.time()
})
# Keep only last 100 results
if len(task_history) > 100:
task_history.pop(0)
print(f"[{NODE_NAME}] Task {task_id} completed in {elapsed:.2f}s (success={success})")
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(task_result).encode())
except Exception as e:
error_result = {
'task_id': task_id,
'node': NODE_NAME,
'success': False,
'output': '',
'error': str(e),
'elapsed': time.time() - start_time,
'timestamp': time.time()
}
self.send_response(500)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(error_result).encode())
else:
self.send_response(404)
self.end_headers()
def main():
"""Start worker node"""
print(f"🖤🛣️ BlackRoad Cluster Worker: {NODE_NAME}")
print(f"Architecture: {NODE_ARCH}")
print(f"Listening on port {NODE_PORT}")
print(f"=" * 50)
server = HTTPServer(('0.0.0.0', NODE_PORT), WorkerHandler)
try:
server.serve_forever()
except KeyboardInterrupt:
print(f"\n[{NODE_NAME}] Shutting down...")
server.shutdown()
if __name__ == '__main__':
main()

283
scripts/python/codex-extract.py Executable file
View File

@@ -0,0 +1,283 @@
#!/usr/bin/env python3
"""
BlackRoad Library Component Extractor
Extract and copy components from the library for easy integration.
"""
import sqlite3
from pathlib import Path
from typing import Dict, Optional
import json
import shutil
class ComponentExtractor:
"""Extract components from library."""
def __init__(self, library_path: str = "~/blackroad-code-library"):
self.library_path = Path(library_path).expanduser()
self.db_path = self.library_path / "index" / "components.db"
if not self.db_path.exists():
raise FileNotFoundError(f"Library database not found at {self.db_path}")
def get_component(self, component_id: str) -> Optional[Dict]:
"""Get component by ID."""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("SELECT * FROM components WHERE id = ?", (component_id,))
row = cursor.fetchone()
conn.close()
return dict(row) if row else None
def extract_code(self, component_id: str) -> str:
"""Extract full source code for a component."""
comp = self.get_component(component_id)
if not comp:
return f"Component {component_id} not found"
file_path = Path(comp['file_path'])
if not file_path.exists():
return f"Source file not found: {file_path}"
try:
with open(file_path, 'r') as f:
lines = f.readlines()
start = comp['start_line'] - 1 # 0-indexed
end = comp['end_line']
code = ''.join(lines[start:end])
return code
except Exception as e:
return f"Error reading file: {e}"
def extract_component_with_context(self, component_id: str) -> Dict:
"""Extract component with metadata and usage instructions."""
comp = self.get_component(component_id)
if not comp:
return {"error": f"Component {component_id} not found"}
code = self.extract_code(component_id)
tags = json.loads(comp['tags'])
deps = json.loads(comp['dependencies'])
return {
'metadata': {
'id': comp['id'],
'name': comp['name'],
'type': comp['type'],
'language': comp['language'],
'framework': comp.get('framework'),
'repo': comp['repo'],
'file_path': comp['file_path'],
'location': f"{comp['file_path']}:{comp['start_line']}-{comp['end_line']}",
'quality_score': comp['quality_score'],
'tags': tags,
'dependencies': deps,
},
'code': code,
'usage_instructions': self._generate_usage_instructions(comp, deps),
}
def _generate_usage_instructions(self, comp: Dict, deps: list) -> str:
"""Generate usage instructions."""
instructions = f"""
# How to Use: {comp['name']}
## 1. Copy the code
The full component code is provided below.
## 2. Install dependencies
"""
if deps:
if comp['language'] in ['typescript', 'javascript']:
instructions += "```bash\n"
instructions += "npm install " + " ".join(deps[:10]) + "\n"
instructions += "```\n"
elif comp['language'] == 'python':
instructions += "```bash\n"
instructions += "pip install " + " ".join(deps[:10]) + "\n"
instructions += "```\n"
else:
instructions += "No external dependencies required.\n"
instructions += f"""
## 3. Integration
- File location: `{comp['file_path']}`
- Type: {comp['type']}
- Framework: {comp.get('framework') or 'N/A'}
## 4. Adjust imports
Update import paths to match your project structure.
"""
return instructions
def copy_to_file(self, component_id: str, output_path: str):
"""Copy component to a file."""
output_file = Path(output_path).expanduser()
extraction = self.extract_component_with_context(component_id)
if 'error' in extraction:
print(f"{extraction['error']}")
return
# Create output content
content = f"""# Component: {extraction['metadata']['name']}
# Source: {extraction['metadata']['location']}
# Quality: {extraction['metadata']['quality_score']}/10
# Tags: {', '.join(extraction['metadata']['tags'])}
{extraction['usage_instructions']}
# ============================================================================
# CODE
# ============================================================================
{extraction['code']}
"""
output_file.write_text(content)
print(f"✅ Component extracted to: {output_file}")
print(f" Name: {extraction['metadata']['name']}")
print(f" Type: {extraction['metadata']['type']}")
print(f" Quality: {extraction['metadata']['quality_score']}/10")
def create_component_package(self, component_id: str, output_dir: str):
"""Create a complete package with component + dependencies."""
output_path = Path(output_dir).expanduser()
output_path.mkdir(parents=True, exist_ok=True)
extraction = self.extract_component_with_context(component_id)
if 'error' in extraction:
print(f"{extraction['error']}")
return
meta = extraction['metadata']
# Save component code
lang_ext = {
'python': '.py',
'typescript': '.ts',
'javascript': '.js',
}.get(meta['language'], '.txt')
code_file = output_path / f"{meta['name']}{lang_ext}"
code_file.write_text(extraction['code'])
# Save metadata
metadata_file = output_path / "component-info.json"
metadata_file.write_text(json.dumps(meta, indent=2))
# Save README
readme_file = output_path / "README.md"
readme_file.write_text(f"""# {meta['name']}
**Type:** {meta['type']}
**Language:** {meta['language']}
**Framework:** {meta.get('framework') or 'N/A'}
**Quality Score:** {meta['quality_score']}/10
## Source
- **Repository:** {meta['repo']}
- **File:** {meta['file_path']}
- **Lines:** {meta['location']}
## Tags
{', '.join(meta['tags'])}
## Dependencies
{'- ' + '\n- '.join(meta['dependencies']) if meta['dependencies'] else 'None'}
{extraction['usage_instructions']}
## Files in Package
- `{code_file.name}` - Component code
- `component-info.json` - Metadata
- `README.md` - This file
""")
print(f"✅ Component package created: {output_path}")
print(f" Files:")
print(f" - {code_file.name}")
print(f" - component-info.json")
print(f" - README.md")
def main():
"""CLI interface."""
import argparse
parser = argparse.ArgumentParser(description='Extract components from library')
parser.add_argument('component_id', help='Component ID to extract')
parser.add_argument('--library', default='~/blackroad-code-library', help='Library path')
parser.add_argument('--output', '-o', help='Output file path')
parser.add_argument('--package', '-p', help='Create full package in directory')
parser.add_argument('--print', action='store_true', help='Print code to stdout')
args = parser.parse_args()
extractor = ComponentExtractor(args.library)
if args.print:
# Just print the code
code = extractor.extract_code(args.component_id)
print(code)
elif args.package:
# Create full package
extractor.create_component_package(args.component_id, args.package)
elif args.output:
# Save to file
extractor.copy_to_file(args.component_id, args.output)
else:
# Show extraction with metadata
extraction = extractor.extract_component_with_context(args.component_id)
if 'error' in extraction:
print(f"{extraction['error']}")
return
print(f"""
{'='*70}
Component: {extraction['metadata']['name']}
{'='*70}
Type: {extraction['metadata']['type']}
Language: {extraction['metadata']['language']}
Framework: {extraction['metadata'].get('framework') or 'N/A'}
Quality: {extraction['metadata']['quality_score']}/10
Location: {extraction['metadata']['location']}
Tags: {', '.join(extraction['metadata']['tags'])}
Dependencies: {', '.join(extraction['metadata']['dependencies']) if extraction['metadata']['dependencies'] else 'None'}
{extraction['usage_instructions']}
{'='*70}
CODE
{'='*70}
{extraction['code']}
{'='*70}
💡 To save: --output <file>
💡 To create package: --package <directory>
💡 To print only code: --print
""")
if __name__ == '__main__':
main()

315
scripts/python/codex-oracle.py Executable file
View File

@@ -0,0 +1,315 @@
#!/usr/bin/env python3
"""
BlackRoad Library Agent API
Simple API interface for agents to query and use the code library.
"""
import sys
import sqlite3
from pathlib import Path
from typing import List, Dict, Optional
import json
from datetime import datetime, timedelta
import re
# Inline the LibrarySearch class
class LibrarySearch:
"""Search interface for the code library."""
def __init__(self, library_path: str = "~/blackroad-code-library"):
self.library_path = Path(library_path).expanduser()
self.db_path = self.library_path / "index" / "components.db"
if not self.db_path.exists():
raise FileNotFoundError(f"Library database not found at {self.db_path}. Run scanner first.")
def search(self, query: str, filters: Optional[Dict] = None, limit: int = 10) -> List[Dict]:
"""Search for components."""
filters = filters or {}
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
sql = """
SELECT * FROM components
WHERE (
name LIKE ? OR
tags LIKE ? OR
description LIKE ? OR
type LIKE ?
)
"""
params = [f"%{query}%"] * 4
if 'language' in filters:
sql += " AND language = ?"
params.append(filters['language'])
if 'type' in filters:
sql += " AND type = ?"
params.append(filters['type'])
if 'framework' in filters:
sql += " AND framework = ?"
params.append(filters['framework'])
if 'min_quality' in filters:
sql += " AND quality_score >= ?"
params.append(filters['min_quality'])
if 'max_age_days' in filters:
cutoff_date = (datetime.now() - timedelta(days=filters['max_age_days'])).isoformat()
sql += " AND (last_used_at >= ? OR created_at >= ?)"
params.extend([cutoff_date, cutoff_date])
if 'repo' in filters:
sql += " AND repo = ?"
params.append(filters['repo'])
sql += " ORDER BY quality_score DESC, created_at DESC LIMIT ?"
params.append(limit)
cursor.execute(sql, params)
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
def search_by_tag(self, tag: str, limit: int = 10) -> List[Dict]:
"""Search components by tag."""
return self.search(tag, limit=limit)
def get_stats(self) -> Dict:
"""Get library statistics."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
stats = {}
cursor.execute("SELECT COUNT(*) FROM components")
stats['total_components'] = cursor.fetchone()[0]
cursor.execute("""
SELECT language, COUNT(*) as count
FROM components
GROUP BY language
ORDER BY count DESC
""")
stats['by_language'] = dict(cursor.fetchall())
cursor.execute("""
SELECT type, COUNT(*) as count
FROM components
GROUP BY type
ORDER BY count DESC
""")
stats['by_type'] = dict(cursor.fetchall())
cursor.execute("""
SELECT framework, COUNT(*) as count
FROM components
WHERE framework IS NOT NULL
GROUP BY framework
ORDER BY count DESC
""")
stats['by_framework'] = dict(cursor.fetchall())
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("SELECT * FROM repositories ORDER BY component_count DESC")
stats['repositories'] = [dict(row) for row in cursor.fetchall()]
conn.close()
return stats
class LibraryAgent:
"""Agent interface for the code library."""
def __init__(self, library_path: str = "~/blackroad-code-library"):
self.search = LibrarySearch(library_path)
def ask(self, question: str) -> str:
"""
Natural language interface for agents.
Examples:
- "Show me authentication implementations"
- "I need a React sidebar"
- "Find JWT components"
- "What database code do we have?"
"""
# Parse intent from question
question_lower = question.lower()
# Detect filters
filters = {}
if 'react' in question_lower:
filters['framework'] = 'react'
if 'typescript' in question_lower or 'ts' in question_lower:
filters['language'] = 'typescript'
if 'python' in question_lower:
filters['language'] = 'python'
if 'nextjs' in question_lower or 'next.js' in question_lower:
filters['framework'] = 'nextjs'
# Extract search terms
keywords = []
# Common patterns
patterns = {
'authentication': ['auth', 'authentication', 'login', 'jwt', 'oauth'],
'sidebar': ['sidebar', 'navigation', 'nav'],
'chat': ['chat', 'message', 'conversation'],
'database': ['database', 'db', 'postgres', 'redis', 'supabase'],
'api': ['api', 'endpoint', 'route'],
'form': ['form', 'input', 'validation'],
'state': ['state', 'store', 'zustand', 'redux'],
'deployment': ['deploy', 'docker', 'cloudflare', 'railway'],
}
# Find matching patterns
for category, terms in patterns.items():
if any(term in question_lower for term in terms):
keywords.append(category)
# Fallback: extract meaningful words
if not keywords:
words = question_lower.split()
keywords = [w for w in words if len(w) > 3 and w not in ['show', 'find', 'need', 'have', 'what']]
# Search
query = ' '.join(keywords) if keywords else question
results = self.search.search(query, filters, limit=5)
# Format response
if not results:
return f"❌ No components found for: {question}\n\nTry:\n- Being more specific\n- Using different keywords\n- Checking the library stats: --stats"
response = f"✅ Found {len(results)} component(s) for: {question}\n"
response += "=" * 70 + "\n"
for i, comp in enumerate(results, 1):
tags = json.loads(comp['tags'])
deps = json.loads(comp['dependencies'])
response += f"\n{i}. ⭐ {comp['name']} ({comp['language']}/{comp['type']}) - Quality: {comp['quality_score']:.1f}/10\n"
response += f" 📍 Location: {comp['repo']}/{Path(comp['file_path']).name}:{comp['start_line']}\n"
response += f" 🏷️ Tags: {', '.join(tags[:5])}\n"
if comp.get('framework'):
response += f" 🔧 Framework: {comp['framework']}\n"
if deps:
response += f" 📦 Deps: {', '.join(deps[:3])}\n"
response += f"\n Preview:\n"
preview_lines = comp['code_snippet'].split('\n')[:5]
for line in preview_lines:
response += f" {line}\n"
response += " ...\n"
response += "\n" + "=" * 70
response += f"\n\nTo see full details: blackroad-library-search.py --id <component_id>"
response += f"\nTo extract code: cat {results[0]['file_path']}"
return response
def find_by_category(self, category: str) -> List[Dict]:
"""
Find components by category.
Categories:
- authentication
- database
- ui-components
- api
- state-management
- deployment
"""
return self.search.search_by_tag(category, limit=10)
def get_best_match(self, query: str, filters: Optional[Dict] = None) -> Optional[Dict]:
"""Get the single best matching component."""
results = self.search.search(query, filters, limit=1)
return results[0] if results else None
def quick_stats(self) -> str:
"""Quick library statistics."""
stats = self.search.get_stats()
output = f"""
📚 BlackRoad Code Library Quick Stats
{'=' * 50}
Total Components: {stats['total_components']}
Top Languages:
"""
for lang, count in list(stats['by_language'].items())[:5]:
output += f"{lang}: {count}\n"
output += f"\nTop Types:\n"
for type_, count in list(stats['by_type'].items())[:5]:
output += f"{type_}: {count}\n"
output += f"\nTop Repositories:\n"
for repo in stats['repositories'][:5]:
output += f"{repo['name']}: {repo['component_count']} components\n"
return output
def main():
"""CLI for agent API."""
import argparse
parser = argparse.ArgumentParser(description='Agent interface for code library')
parser.add_argument('question', nargs='*', help='Natural language question')
parser.add_argument('--library', default='~/blackroad-code-library', help='Library path')
parser.add_argument('--stats', action='store_true', help='Show quick stats')
args = parser.parse_args()
agent = LibraryAgent(args.library)
if args.stats:
print(agent.quick_stats())
return
if not args.question:
# Interactive mode
print("🤖 BlackRoad Library Agent")
print("Ask me anything about the code library!")
print("Examples:")
print(" - Show me authentication implementations")
print(" - I need a React sidebar")
print(" - Find JWT components")
print("\nType 'exit' to quit.\n")
while True:
try:
question = input("")
if question.lower() in ['exit', 'quit', 'q']:
break
response = agent.ask(question)
print(f"\n{response}\n")
except (KeyboardInterrupt, EOFError):
print("\nGoodbye!")
break
else:
# Single question mode
question = ' '.join(args.question)
response = agent.ask(question)
print(response)
if __name__ == '__main__':
main()

456
scripts/python/codex-scanner.py Executable file
View File

@@ -0,0 +1,456 @@
#!/usr/bin/env python3
"""
BlackRoad Code Library Scanner
Scans 66 repos and extracts reusable components into searchable library.
"""
import os
import sqlite3
import json
import hashlib
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Optional
import subprocess
import re
class ComponentScanner:
"""Scans repositories and extracts reusable components."""
def __init__(self, library_path: str = "~/blackroad-code-library"):
self.library_path = Path(library_path).expanduser()
self.library_path.mkdir(parents=True, exist_ok=True)
self.db_path = self.library_path / "index" / "components.db"
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self.init_database()
def init_database(self):
"""Initialize SQLite database for components."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS components (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
type TEXT NOT NULL,
language TEXT,
framework TEXT,
repo TEXT NOT NULL,
file_path TEXT NOT NULL,
start_line INT,
end_line INT,
created_at TIMESTAMP,
last_used_at TIMESTAMP,
quality_score REAL,
usage_count INT DEFAULT 0,
dependencies TEXT, -- JSON
tags TEXT, -- JSON
description TEXT,
code_hash TEXT,
code_snippet TEXT -- First 500 chars
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS usage_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
component_id TEXT,
used_in_project TEXT,
used_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (component_id) REFERENCES components(id)
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS repositories (
name TEXT PRIMARY KEY,
path TEXT NOT NULL,
last_scanned TIMESTAMP,
component_count INT DEFAULT 0,
total_files INT DEFAULT 0
)
""")
conn.commit()
conn.close()
def scan_repository(self, repo_path: str) -> List[Dict]:
"""Scan a single repository for components."""
repo_path = Path(repo_path).expanduser()
repo_name = repo_path.name
print(f"📂 Scanning {repo_name}...")
components = []
# File patterns to scan
patterns = {
'typescript': ['**/*.ts', '**/*.tsx'],
'javascript': ['**/*.js', '**/*.jsx'],
'python': ['**/*.py'],
'go': ['**/*.go'],
'rust': ['**/*.rs'],
}
for lang, pattern_list in patterns.items():
for pattern in pattern_list:
for file_path in repo_path.glob(pattern):
# Skip node_modules, dist, build, etc.
if any(x in file_path.parts for x in ['node_modules', 'dist', 'build', '.next', 'venv', '__pycache__']):
continue
components.extend(self.extract_components_from_file(
file_path, repo_name, lang
))
# Update repository metadata
self.update_repo_metadata(repo_name, str(repo_path), len(components))
return components
def extract_components_from_file(self, file_path: Path, repo: str, language: str) -> List[Dict]:
"""Extract components from a single file."""
try:
content = file_path.read_text(encoding='utf-8', errors='ignore')
except Exception as e:
print(f" ⚠️ Could not read {file_path}: {e}")
return []
components = []
# Extract different types based on language
if language in ['typescript', 'javascript']:
components.extend(self.extract_typescript_components(content, file_path, repo))
elif language == 'python':
components.extend(self.extract_python_components(content, file_path, repo))
return components
def extract_typescript_components(self, content: str, file_path: Path, repo: str) -> List[Dict]:
"""Extract React components, functions, classes from TypeScript/JavaScript."""
components = []
lines = content.split('\n')
# Pattern 1: React components (function or const)
react_component_pattern = r'^(?:export\s+)?(?:const|function)\s+([A-Z][a-zA-Z0-9]*)\s*[=:]'
# Pattern 2: Named exports (utilities, helpers)
export_function_pattern = r'^export\s+(?:async\s+)?function\s+([a-zA-Z_][a-zA-Z0-9_]*)'
# Pattern 3: Classes
class_pattern = r'^export\s+class\s+([A-Z][a-zA-Z0-9]*)'
for i, line in enumerate(lines):
component_name = None
component_type = None
# Check for React components
match = re.search(react_component_pattern, line)
if match:
component_name = match.group(1)
component_type = 'react-component' if 'tsx' in file_path.suffix else 'function'
# Check for exported functions
if not match:
match = re.search(export_function_pattern, line)
if match:
component_name = match.group(1)
component_type = 'function'
# Check for classes
if not match:
match = re.search(class_pattern, line)
if match:
component_name = match.group(1)
component_type = 'class'
if component_name:
# Find end of component (heuristic: next export or end of file)
end_line = i + 1
brace_count = 0
for j in range(i, len(lines)):
if '{' in lines[j]:
brace_count += lines[j].count('{')
if '}' in lines[j]:
brace_count -= lines[j].count('}')
if brace_count == 0 and j > i:
end_line = j + 1
break
component = self.create_component_entry(
name=component_name,
type=component_type,
language='typescript',
repo=repo,
file_path=str(file_path),
start_line=i + 1,
end_line=end_line,
code_snippet='\n'.join(lines[i:min(i+20, end_line)])
)
components.append(component)
return components
def extract_python_components(self, content: str, file_path: Path, repo: str) -> List[Dict]:
"""Extract functions and classes from Python files."""
components = []
lines = content.split('\n')
# Pattern 1: Functions
function_pattern = r'^(?:async\s+)?def\s+([a-zA-Z_][a-zA-Z0-9_]*)\s*\('
# Pattern 2: Classes
class_pattern = r'^class\s+([A-Z][a-zA-Z0-9_]*)\s*[:(]'
for i, line in enumerate(lines):
component_name = None
component_type = None
# Check for functions
match = re.search(function_pattern, line)
if match:
component_name = match.group(1)
component_type = 'function'
# Check for classes
if not match:
match = re.search(class_pattern, line)
if match:
component_name = match.group(1)
component_type = 'class'
if component_name and not component_name.startswith('_'): # Skip private
# Find end (heuristic: next def/class at same indentation or end of file)
indent = len(line) - len(line.lstrip())
end_line = len(lines)
for j in range(i + 1, len(lines)):
current_indent = len(lines[j]) - len(lines[j].lstrip())
if lines[j].strip() and current_indent <= indent:
if lines[j].strip().startswith(('def ', 'class ', 'async def ')):
end_line = j
break
component = self.create_component_entry(
name=component_name,
type=component_type,
language='python',
repo=repo,
file_path=str(file_path),
start_line=i + 1,
end_line=end_line,
code_snippet='\n'.join(lines[i:min(i+20, end_line)])
)
components.append(component)
return components
def create_component_entry(self, name: str, type: str, language: str,
repo: str, file_path: str, start_line: int,
end_line: int, code_snippet: str) -> Dict:
"""Create a component entry with metadata."""
# Generate unique ID
component_id = hashlib.sha256(
f"{repo}:{file_path}:{name}:{start_line}".encode()
).hexdigest()[:16]
# Calculate code hash (for detecting duplicates)
code_hash = hashlib.md5(code_snippet.encode()).hexdigest()
# Extract dependencies (basic heuristic)
dependencies = self.extract_dependencies(code_snippet, language)
# Auto-tag based on patterns
tags = self.auto_tag_component(name, type, code_snippet, file_path)
return {
'id': component_id,
'name': name,
'type': type,
'language': language,
'framework': self.detect_framework(file_path, code_snippet),
'repo': repo,
'file_path': file_path,
'start_line': start_line,
'end_line': end_line,
'created_at': datetime.now().isoformat(),
'dependencies': json.dumps(dependencies),
'tags': json.dumps(tags),
'code_hash': code_hash,
'code_snippet': code_snippet[:500], # First 500 chars
'description': self.generate_description(name, type, tags),
'quality_score': 5.0 # Default, will be calculated later
}
def extract_dependencies(self, code: str, language: str) -> List[str]:
"""Extract dependencies from code snippet."""
deps = []
if language in ['typescript', 'javascript']:
# Find imports
import_pattern = r"import\s+.*?\s+from\s+['\"]([^'\"]+)['\"]"
deps = re.findall(import_pattern, code)
elif language == 'python':
# Find imports
import_pattern = r"(?:from\s+([^\s]+)\s+import|import\s+([^\s]+))"
matches = re.findall(import_pattern, code)
deps = [m[0] or m[1] for m in matches if not (m[0] or m[1]).startswith('.')]
return list(set(deps))[:10] # Max 10 deps
def auto_tag_component(self, name: str, type: str, code: str, file_path: str) -> List[str]:
"""Auto-generate tags based on component characteristics."""
tags = [type]
# Path-based tags
path_lower = file_path.lower()
if 'auth' in path_lower:
tags.append('authentication')
if 'api' in path_lower:
tags.append('api')
if 'component' in path_lower or 'ui' in path_lower:
tags.append('ui')
if 'util' in path_lower or 'helper' in path_lower:
tags.append('utility')
if 'store' in path_lower or 'state' in path_lower:
tags.append('state-management')
# Name-based tags
name_lower = name.lower()
if 'sidebar' in name_lower or 'nav' in name_lower:
tags.append('navigation')
if 'modal' in name_lower or 'dialog' in name_lower:
tags.append('modal')
if 'button' in name_lower:
tags.append('button')
if 'form' in name_lower or 'input' in name_lower:
tags.append('form')
if 'chat' in name_lower or 'message' in name_lower:
tags.append('chat')
# Code-based tags
code_lower = code.lower()
if 'jwt' in code_lower or 'token' in code_lower:
tags.append('jwt')
if 'postgres' in code_lower or 'pg' in code_lower:
tags.append('postgresql')
if 'redis' in code_lower:
tags.append('redis')
if 'websocket' in code_lower or 'socket.io' in code_lower:
tags.append('websocket')
if 'tailwind' in code_lower:
tags.append('tailwind')
return list(set(tags))
def detect_framework(self, file_path: str, code: str) -> Optional[str]:
"""Detect framework being used."""
code_lower = code.lower()
if 'react' in code_lower or 'useState' in code or 'useEffect' in code:
return 'react'
if 'next' in file_path.lower() or 'next/' in code_lower:
return 'nextjs'
if 'express' in code_lower:
return 'express'
if 'fastapi' in code_lower:
return 'fastapi'
if 'django' in code_lower:
return 'django'
return None
def generate_description(self, name: str, type: str, tags: List[str]) -> str:
"""Generate a basic description for the component."""
return f"{type.title()}: {name} ({', '.join(tags[:3])})"
def save_components(self, components: List[Dict]):
"""Save components to database."""
if not components:
return
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
for comp in components:
cursor.execute("""
INSERT OR REPLACE INTO components (
id, name, type, language, framework, repo, file_path,
start_line, end_line, created_at, dependencies, tags,
code_hash, code_snippet, description, quality_score
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
comp['id'], comp['name'], comp['type'], comp['language'],
comp['framework'], comp['repo'], comp['file_path'],
comp['start_line'], comp['end_line'], comp['created_at'],
comp['dependencies'], comp['tags'], comp['code_hash'],
comp['code_snippet'], comp['description'], comp['quality_score']
))
conn.commit()
conn.close()
print(f" ✅ Saved {len(components)} components")
def update_repo_metadata(self, repo_name: str, repo_path: str, component_count: int):
"""Update repository metadata."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
INSERT OR REPLACE INTO repositories (name, path, last_scanned, component_count)
VALUES (?, ?, ?, ?)
""", (repo_name, repo_path, datetime.now().isoformat(), component_count))
conn.commit()
conn.close()
def scan_all_repos(self, repos_base_path: str = "~/projects"):
"""Scan all repositories in a base directory."""
repos_path = Path(repos_base_path).expanduser()
print(f"🔍 Scanning all repos in {repos_path}...\n")
total_components = 0
repo_count = 0
for repo_dir in repos_path.iterdir():
if repo_dir.is_dir() and not repo_dir.name.startswith('.'):
components = self.scan_repository(repo_dir)
self.save_components(components)
total_components += len(components)
repo_count += 1
print(f"\n✅ Scanned {repo_count} repositories")
print(f"📦 Found {total_components} components")
print(f"💾 Library saved to: {self.library_path}")
return total_components
def main():
"""Main entry point."""
import argparse
parser = argparse.ArgumentParser(description='Scan repositories for reusable components')
parser.add_argument('--repos', default='~/projects', help='Path to repositories')
parser.add_argument('--library', default='~/blackroad-code-library', help='Library output path')
parser.add_argument('--repo', help='Scan single repository')
args = parser.parse_args()
scanner = ComponentScanner(args.library)
if args.repo:
components = scanner.scan_repository(args.repo)
scanner.save_components(components)
else:
scanner.scan_all_repos(args.repos)
if __name__ == '__main__':
main()

345
scripts/python/codex-search.py Executable file
View File

@@ -0,0 +1,345 @@
#!/usr/bin/env python3
"""
BlackRoad Code Library Search
Search and retrieve components from the code library.
"""
import sqlite3
import json
from pathlib import Path
from typing import List, Dict, Optional
from datetime import datetime, timedelta
import re
class LibrarySearch:
"""Search interface for the code library."""
def __init__(self, library_path: str = "~/blackroad-code-library"):
self.library_path = Path(library_path).expanduser()
self.db_path = self.library_path / "index" / "components.db"
if not self.db_path.exists():
raise FileNotFoundError(f"Library database not found at {self.db_path}. Run scanner first.")
def search(self, query: str, filters: Optional[Dict] = None, limit: int = 10) -> List[Dict]:
"""
Search for components.
Args:
query: Search query (matches name, tags, description)
filters: Optional filters (language, type, framework, min_quality, max_age_days)
limit: Max results to return
Returns:
List of matching components
"""
filters = filters or {}
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Build query
sql = """
SELECT * FROM components
WHERE (
name LIKE ? OR
tags LIKE ? OR
description LIKE ? OR
type LIKE ?
)
"""
params = [f"%{query}%"] * 4
# Add filters
if 'language' in filters:
sql += " AND language = ?"
params.append(filters['language'])
if 'type' in filters:
sql += " AND type = ?"
params.append(filters['type'])
if 'framework' in filters:
sql += " AND framework = ?"
params.append(filters['framework'])
if 'min_quality' in filters:
sql += " AND quality_score >= ?"
params.append(filters['min_quality'])
if 'max_age_days' in filters:
cutoff_date = (datetime.now() - timedelta(days=filters['max_age_days'])).isoformat()
sql += " AND (last_used_at >= ? OR created_at >= ?)"
params.extend([cutoff_date, cutoff_date])
if 'repo' in filters:
sql += " AND repo = ?"
params.append(filters['repo'])
# Order by quality and recency
sql += " ORDER BY quality_score DESC, created_at DESC LIMIT ?"
params.append(limit)
cursor.execute(sql, params)
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
def search_by_tag(self, tag: str, limit: int = 10) -> List[Dict]:
"""Search components by tag."""
return self.search(tag, limit=limit)
def get_component(self, component_id: str) -> Optional[Dict]:
"""Get a specific component by ID."""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("SELECT * FROM components WHERE id = ?", (component_id,))
row = cursor.fetchone()
conn.close()
return dict(row) if row else None
def get_similar_components(self, component_id: str, limit: int = 5) -> List[Dict]:
"""
Find similar components based on:
- Same type
- Overlapping tags
- Same language/framework
"""
component = self.get_component(component_id)
if not component:
return []
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Find components with similar characteristics
cursor.execute("""
SELECT * FROM components
WHERE id != ?
AND (
type = ? OR
language = ? OR
framework = ? OR
tags LIKE ?
)
ORDER BY quality_score DESC
LIMIT ?
""", (
component_id,
component['type'],
component['language'],
f"%{json.loads(component['tags'])[0]}%",
limit
))
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
def get_stats(self) -> Dict:
"""Get library statistics."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
stats = {}
# Total components
cursor.execute("SELECT COUNT(*) FROM components")
stats['total_components'] = cursor.fetchone()[0]
# By language
cursor.execute("""
SELECT language, COUNT(*) as count
FROM components
GROUP BY language
ORDER BY count DESC
""")
stats['by_language'] = dict(cursor.fetchall())
# By type
cursor.execute("""
SELECT type, COUNT(*) as count
FROM components
GROUP BY type
ORDER BY count DESC
""")
stats['by_type'] = dict(cursor.fetchall())
# By framework
cursor.execute("""
SELECT framework, COUNT(*) as count
FROM components
WHERE framework IS NOT NULL
GROUP BY framework
ORDER BY count DESC
""")
stats['by_framework'] = dict(cursor.fetchall())
# Quality distribution
cursor.execute("""
SELECT
ROUND(quality_score) as score,
COUNT(*) as count
FROM components
GROUP BY ROUND(quality_score)
ORDER BY score DESC
""")
stats['quality_distribution'] = dict(cursor.fetchall())
# Repository stats
cursor.execute("SELECT * FROM repositories ORDER BY component_count DESC")
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("SELECT * FROM repositories ORDER BY component_count DESC")
stats['repositories'] = [dict(row) for row in cursor.fetchall()]
conn.close()
return stats
def format_result(self, component: Dict) -> str:
"""Format a component result for display."""
tags = json.loads(component['tags'])
deps = json.loads(component['dependencies'])
output = f"""
{'='*70}
{component['name']} (Quality: {component['quality_score']}/10)
{'='*70}
Type: {component['type']}
Language: {component['language']}
Framework: {component.get('framework') or 'N/A'}
Repository: {component['repo']}
Location: {component['file_path']}:{component['start_line']}-{component['end_line']}
Tags: {', '.join(tags[:8])}
Dependencies: {', '.join(deps[:5]) if deps else 'None'}
Description:
{component['description']}
Code Preview:
{component['code_snippet']}
...
Full path: {component['file_path']}
"""
return output
def format_results(self, components: List[Dict]) -> str:
"""Format multiple results for display."""
if not components:
return "No components found."
output = f"\nFound {len(components)} component(s):\n"
for i, comp in enumerate(components, 1):
tags = json.loads(comp['tags'])
output += f"\n{i}. ⭐ {comp['name']} ({comp['language']}/{comp['type']}) - {comp['quality_score']:.1f}/10\n"
output += f" 📍 {comp['repo']}/{Path(comp['file_path']).name}:{comp['start_line']}\n"
output += f" 🏷️ {', '.join(tags[:5])}\n"
return output
def main():
"""CLI interface for library search."""
import argparse
parser = argparse.ArgumentParser(description='Search the BlackRoad code library')
parser.add_argument('query', nargs='?', help='Search query')
parser.add_argument('--library', default='~/blackroad-code-library', help='Library path')
parser.add_argument('--language', help='Filter by language')
parser.add_argument('--type', help='Filter by type')
parser.add_argument('--framework', help='Filter by framework')
parser.add_argument('--min-quality', type=float, help='Minimum quality score')
parser.add_argument('--max-age', type=int, help='Max age in days')
parser.add_argument('--limit', type=int, default=10, help='Max results')
parser.add_argument('--stats', action='store_true', help='Show library statistics')
parser.add_argument('--id', help='Get specific component by ID')
parser.add_argument('--similar', help='Find similar components to ID')
args = parser.parse_args()
search = LibrarySearch(args.library)
# Show stats
if args.stats:
stats = search.get_stats()
print("\n📊 Library Statistics")
print("=" * 70)
print(f"\nTotal Components: {stats['total_components']}")
print("\nBy Language:")
for lang, count in stats['by_language'].items():
print(f" {lang}: {count}")
print("\nBy Type:")
for type_, count in stats['by_type'].items():
print(f" {type_}: {count}")
if stats['by_framework']:
print("\nBy Framework:")
for framework, count in stats['by_framework'].items():
print(f" {framework}: {count}")
print("\nRepositories:")
for repo in stats['repositories'][:10]:
print(f" {repo['name']}: {repo['component_count']} components")
return
# Get specific component
if args.id:
comp = search.get_component(args.id)
if comp:
print(search.format_result(comp))
else:
print(f"Component {args.id} not found.")
return
# Find similar components
if args.similar:
comps = search.get_similar_components(args.similar, args.limit)
print(search.format_results(comps))
return
# Search
if not args.query:
parser.print_help()
return
filters = {}
if args.language:
filters['language'] = args.language
if args.type:
filters['type'] = args.type
if args.framework:
filters['framework'] = args.framework
if args.min_quality:
filters['min_quality'] = args.min_quality
if args.max_age:
filters['max_age_days'] = args.max_age
results = search.search(args.query, filters, args.limit)
print(search.format_results(results))
# If only one result, show details
if len(results) == 1:
print("\n" + "=" * 70)
print("Showing details for single result:")
print(search.format_result(results[0]))
if __name__ == '__main__':
main()

145
scripts/python/dns-system.py Executable file
View File

@@ -0,0 +1,145 @@
#!/usr/bin/env python3
"""
BLACKROAD DNS - ALL TRAFFIC ROUTES THROUGH BLACKROAD
No upstream DNS. Everything is BlackRoad.
"""
import socket
import subprocess
import threading
import struct
import os
LISTEN_PORT = 53
# BlackRoad login state
BLACKROAD_AUTHENTICATED = os.environ.get('BLACKROAD_AUTH', '0') == '1'
def blackroad_login():
"""BlackRoad authentication gate"""
print("\n╔═══════════════════════════════════════╗")
print("║ BLACKROAD LOGIN ║")
print("╚═══════════════════════════════════════╝")
return True # Always allow for now - add auth later
def query_claude(name):
"""Route to Claude CLI"""
try:
result = subprocess.run(
["claude", "--print", f"DNS query for: {name}"],
capture_output=True, text=True, timeout=30
)
return result.stdout.strip() if result.stdout else "BLACKROAD: no response"
except Exception as e:
return f"BLACKROAD: {e}"
def build_dns_response(query_data, ip_address):
"""Build a DNS response pointing to our IP"""
# Transaction ID from query
transaction_id = query_data[:2]
# Flags: standard response, no error
flags = b'\x81\x80'
# Questions: 1, Answers: 1, Authority: 0, Additional: 0
counts = b'\x00\x01\x00\x01\x00\x00\x00\x00'
# Copy question section from query
question_end = 12
while query_data[question_end] != 0:
question_end += query_data[question_end] + 1
question_end += 5 # null byte + qtype (2) + qclass (2)
question = query_data[12:question_end]
# Answer: pointer to name + type A + class IN + TTL + rdlength + IP
answer = b'\xc0\x0c' # Pointer to name in question
answer += b'\x00\x01' # Type A
answer += b'\x00\x01' # Class IN
answer += b'\x00\x00\x00\x3c' # TTL 60 seconds
answer += b'\x00\x04' # rdlength 4
# Convert IP to bytes
ip_parts = ip_address.split('.')
answer += bytes([int(p) for p in ip_parts])
return transaction_id + flags + counts + question + answer
def handle_query(data, addr, sock):
"""Handle DNS query - ALL traffic goes through BlackRoad"""
# Parse domain name from query
pos = 12
labels = []
try:
while pos < len(data) and data[pos] != 0:
length = data[pos]
labels.append(data[pos+1:pos+1+length].decode('utf-8', errors='ignore'))
pos += length + 1
except:
pass
name = '.'.join(labels).lower()
print(f"\n[BLACKROAD DNS] Query: {name}")
print(f" -> ALL TRAFFIC ROUTES THROUGH BLACKROAD")
# Log to Claude what's being accessed
response_text = query_claude(name)
print(f" <- Claude: {response_text[:100]}...")
# Return BlackRoad's IP for EVERYTHING
# This means all domains resolve to BlackRoad
# The actual routing/proxy happens at the web layer
BLACKROAD_IP = "127.0.0.1" # Local BlackRoad gateway
try:
response = build_dns_response(data, BLACKROAD_IP)
sock.sendto(response, addr)
print(f" -> Resolved {name} to {BLACKROAD_IP} (BLACKROAD)")
except Exception as e:
print(f" ERROR: {e}")
# Send NXDOMAIN on error
nxdomain = bytearray(data[:12])
nxdomain[2] = 0x81
nxdomain[3] = 0x83
sock.sendto(bytes(nxdomain) + data[12:], addr)
def main():
print("╔═══════════════════════════════════════╗")
print("║ BLACKROAD DNS - ALL IS BLACKROAD ║")
print("╚═══════════════════════════════════════╝")
print("")
print("NO UPSTREAM DNS. EVERYTHING ROUTES HERE.")
print("google.com -> BLACKROAD")
print("facebook.com -> BLACKROAD")
print("*.* -> BLACKROAD")
print("")
if not blackroad_login():
print("BLACKROAD: Authentication required")
return
print("BLACKROAD: Authenticated")
print("")
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.bind(('127.0.0.1', LISTEN_PORT))
except PermissionError:
print(f"ERROR: Need sudo for port {LISTEN_PORT}")
print("Run: sudo python3 ~/blackroad-dns-system.py")
return
print(f"Listening on 127.0.0.1:{LISTEN_PORT}")
print("All DNS queries now route through BlackRoad + Claude")
print("")
while True:
data, addr = sock.recvfrom(4096)
thread = threading.Thread(target=handle_query, args=(data, addr, sock))
thread.daemon = True
thread.start()
if __name__ == "__main__":
main()

121
scripts/python/fleet-monitor.py Executable file
View File

@@ -0,0 +1,121 @@
#!/usr/bin/env python3
"""
BlackRoad Fleet Monitor - Real-time Pi metrics API
Serves JSON data for all fleet devices
"""
import json
import subprocess
import time
from http.server import HTTPServer, BaseHTTPRequestHandler
from datetime import datetime
FLEET = {
'aria': '192.168.4.82',
'lucidia': '192.168.4.81',
'alice': '192.168.4.49',
'octavia': '192.168.4.38',
'cecilia': '192.168.4.89'
}
def get_device_metrics(hostname, ip):
"""Get real-time metrics from a Pi"""
try:
# Get uptime
uptime_cmd = f"ssh -o ConnectTimeout=3 {hostname} 'uptime'"
uptime_raw = subprocess.check_output(uptime_cmd, shell=True, timeout=5).decode().strip()
# Parse load average
load_parts = uptime_raw.split('load average:')[1].strip().split(',')
load_1min = float(load_parts[0].strip())
# Get memory
mem_cmd = f"ssh -o ConnectTimeout=3 {hostname} \"free -m | grep Mem:\""
mem_raw = subprocess.check_output(mem_cmd, shell=True, timeout=5).decode().strip()
mem_parts = mem_raw.split()
mem_total = int(mem_parts[1])
mem_used = int(mem_parts[2])
mem_percent = round((mem_used / mem_total) * 100, 1)
# Get disk
disk_cmd = f"ssh -o ConnectTimeout=3 {hostname} \"df -h / | tail -1\""
disk_raw = subprocess.check_output(disk_cmd, shell=True, timeout=5).decode().strip()
disk_parts = disk_raw.split()
disk_used = disk_parts[4].replace('%', '')
# Get CPU count
cpu_cmd = f"ssh -o ConnectTimeout=3 {hostname} 'nproc'"
cpu_count = subprocess.check_output(cpu_cmd, shell=True, timeout=5).decode().strip()
# Check Ollama
ollama_cmd = f"ssh -o ConnectTimeout=3 {hostname} 'pgrep ollama >/dev/null && echo running || echo stopped'"
ollama_status = subprocess.check_output(ollama_cmd, shell=True, timeout=5).decode().strip()
return {
'hostname': hostname,
'ip': ip,
'status': 'online',
'load': load_1min,
'cpu_cores': int(cpu_count),
'memory_percent': mem_percent,
'memory_used_mb': mem_used,
'memory_total_mb': mem_total,
'disk_percent': int(disk_used),
'ollama': ollama_status,
'timestamp': datetime.utcnow().isoformat() + 'Z'
}
except Exception as e:
return {
'hostname': hostname,
'ip': ip,
'status': 'offline',
'error': str(e),
'timestamp': datetime.utcnow().isoformat() + 'Z'
}
class MonitorHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
pass # Silence logs
def do_GET(self):
if self.path == '/api/fleet':
# Get metrics from all devices
metrics = {}
for hostname, ip in FLEET.items():
print(f"Fetching {hostname}...")
metrics[hostname] = get_device_metrics(hostname, ip)
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(json.dumps(metrics, indent=2).encode())
elif self.path == '/':
self.send_response(200)
self.send_header('Content-Type', 'text/html')
self.end_headers()
self.wfile.write(b"""
<!DOCTYPE html>
<html><body style="font-family: monospace; padding: 20px; background: #000; color: #fff;">
<h1 style="color: #FF0066;">BlackRoad Fleet Monitor API</h1>
<p>Endpoints:</p>
<ul>
<li><a href="/api/fleet" style="color: #0066FF;">/api/fleet</a> - Get all fleet metrics (JSON)</li>
</ul>
<p>Dashboard: <a href="file:///Users/alexa/blackroad-live-monitor.html" style="color: #FF0066;">blackroad-live-monitor.html</a></p>
</body></html>
""")
else:
self.send_response(404)
self.end_headers()
if __name__ == '__main__':
PORT = 8888
print(f"🚀 BlackRoad Fleet Monitor API")
print(f" Listening on: http://localhost:{PORT}")
print(f" Endpoint: http://localhost:{PORT}/api/fleet")
print(f" Monitoring: {', '.join(FLEET.keys())}")
print(f"\n Press Ctrl+C to stop")
server = HTTPServer(('localhost', PORT), MonitorHandler)
server.serve_forever()

View File

@@ -0,0 +1,426 @@
#!/usr/bin/env python3
"""
GitHub Actions Workflow Health Analyzer
Comprehensive health report for BlackRoad-OS organization workflows
"""
import json
import subprocess
import sys
from collections import defaultdict, Counter
from datetime import datetime
from typing import Dict, List, Tuple
class WorkflowHealthAnalyzer:
def __init__(self):
self.repos = [
"blackroad-os-infra",
"blackroad",
"blackroad-app",
"blackroad-agents",
"blackroad-os-brand",
"BlackRoad-Public",
"BlackRoad-Private"
]
self.all_runs = []
self.workflow_stats = defaultdict(lambda: {"success": 0, "failure": 0, "cancelled": 0, "total": 0})
self.repo_stats = defaultdict(lambda: {"success": 0, "failure": 0, "cancelled": 0, "total": 0})
def fetch_workflow_runs(self, repo: str, limit: int = 100) -> List[Dict]:
"""Fetch workflow runs for a repository"""
cmd = [
"gh", "run", "list",
"--repo", f"BlackRoad-OS/{repo}",
"--limit", str(limit),
"--json", "conclusion,name,status,startedAt,workflowName,databaseId"
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
runs = json.loads(result.stdout)
# Add repo name to each run
for run in runs:
run['repo'] = repo
return runs
else:
print(f"⚠️ Warning: Failed to fetch runs for {repo}: {result.stderr}", file=sys.stderr)
return []
except Exception as e:
print(f"⚠️ Error fetching {repo}: {str(e)}", file=sys.stderr)
return []
def collect_all_runs(self):
"""Collect workflow runs from all repositories"""
print("🔍 Fetching workflow runs from all repositories...\n")
for repo in self.repos:
print(f" 📦 {repo}...", end=" ")
runs = self.fetch_workflow_runs(repo)
self.all_runs.extend(runs)
print(f"{len(runs)} runs")
print(f"\n✅ Total runs collected: {len(self.all_runs)}\n")
def analyze_workflows(self):
"""Analyze workflow statistics"""
for run in self.all_runs:
workflow_name = run.get('workflowName', 'Unknown')
conclusion = run.get('conclusion', 'in_progress')
repo = run.get('repo', 'unknown')
# Skip queued/in_progress runs for statistics
if not conclusion or conclusion == '':
conclusion = 'in_progress'
continue
# Update workflow stats
self.workflow_stats[workflow_name][conclusion] += 1
self.workflow_stats[workflow_name]['total'] += 1
# Update repo stats
self.repo_stats[repo][conclusion] += 1
self.repo_stats[repo]['total'] += 1
def calculate_success_rate(self, stats: Dict) -> float:
"""Calculate success rate percentage"""
total = stats['total']
if total == 0:
return 0.0
return (stats['success'] / total) * 100
def get_failure_rate(self, stats: Dict) -> float:
"""Calculate failure rate percentage"""
total = stats['total']
if total == 0:
return 0.0
return (stats['failure'] / total) * 100
def identify_problematic_workflows(self, threshold: float = 50.0) -> List[Tuple[str, Dict, float]]:
"""Identify workflows with failure rates above threshold"""
problematic = []
for workflow, stats in self.workflow_stats.items():
if stats['total'] < 3: # Skip workflows with very few runs
continue
failure_rate = self.get_failure_rate(stats)
if failure_rate >= threshold:
problematic.append((workflow, stats, failure_rate))
# Sort by failure rate descending
problematic.sort(key=lambda x: x[2], reverse=True)
return problematic
def identify_always_failing_workflows(self) -> List[Tuple[str, Dict]]:
"""Identify workflows that ALWAYS fail (100% failure rate)"""
always_failing = []
for workflow, stats in self.workflow_stats.items():
if stats['total'] >= 3 and stats['success'] == 0 and stats['failure'] > 0:
always_failing.append((workflow, stats))
# Sort by total runs descending
always_failing.sort(key=lambda x: x[1]['total'], reverse=True)
return always_failing
def identify_self_healing_status(self) -> Dict:
"""Check status of self-healing workflows"""
self_healing_patterns = [
"self-healing",
"auto-heal",
"auto-fix",
"autonomous",
"self heal"
]
self_healing_workflows = {}
for workflow, stats in self.workflow_stats.items():
workflow_lower = workflow.lower()
if any(pattern in workflow_lower for pattern in self_healing_patterns):
success_rate = self.calculate_success_rate(stats)
self_healing_workflows[workflow] = {
'stats': stats,
'success_rate': success_rate,
'status': 'working' if success_rate > 50 else 'stuck/broken'
}
return self_healing_workflows
def identify_orphaned_workflows(self) -> List[str]:
"""Identify workflows that exist in .github/workflows but never run"""
# Get all workflows from blackroad-os-infra
cmd = [
"gh", "workflow", "list",
"--repo", "BlackRoad-OS/blackroad-os-infra",
"--json", "name,state"
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
all_workflows = json.loads(result.stdout)
workflow_names = {w['name'] for w in all_workflows if w.get('state') == 'active'}
# Compare with workflows that have actually run
run_workflows = set(self.workflow_stats.keys())
# Orphaned = defined but never run (or very rarely)
orphaned = []
for wf in workflow_names:
if wf not in run_workflows:
orphaned.append(wf)
elif self.workflow_stats[wf]['total'] < 2: # Less than 2 runs
orphaned.append(f"{wf} (rarely runs)")
return orphaned
except Exception as e:
print(f"⚠️ Could not fetch workflow list: {e}", file=sys.stderr)
return []
def generate_report(self):
"""Generate comprehensive health report"""
print("=" * 80)
print("🏥 GITHUB ACTIONS WORKFLOW HEALTH REPORT")
print(" BlackRoad-OS Organization")
print("=" * 80)
print()
# Overall statistics
total_runs = len([r for r in self.all_runs if r.get('conclusion')])
total_success = sum(stats['success'] for stats in self.workflow_stats.values())
total_failure = sum(stats['failure'] for stats in self.workflow_stats.values())
total_cancelled = sum(stats['cancelled'] for stats in self.workflow_stats.values())
overall_success_rate = (total_success / total_runs * 100) if total_runs > 0 else 0
overall_failure_rate = (total_failure / total_runs * 100) if total_runs > 0 else 0
print("📊 OVERALL STATISTICS")
print("-" * 80)
print(f"Total Workflow Runs Analyzed: {total_runs}")
print(f"Total Unique Workflows: {len(self.workflow_stats)}")
print(f"Total Repositories: {len(self.repos)}")
print()
print(f"✅ Success: {total_success:4d} runs ({overall_success_rate:5.1f}%)")
print(f"❌ Failure: {total_failure:4d} runs ({overall_failure_rate:5.1f}%)")
print(f"🚫 Cancelled: {total_cancelled:4d} runs")
print()
# Repository breakdown
print("📦 PER-REPOSITORY BREAKDOWN")
print("-" * 80)
for repo in sorted(self.repo_stats.keys(), key=lambda r: self.repo_stats[r]['total'], reverse=True):
stats = self.repo_stats[repo]
success_rate = self.calculate_success_rate(stats)
failure_rate = self.get_failure_rate(stats)
status_emoji = "🟢" if success_rate > 70 else "🟡" if success_rate > 40 else "🔴"
print(f"{status_emoji} {repo:30s} Total: {stats['total']:3d} "
f"Success: {success_rate:5.1f}% Failure: {failure_rate:5.1f}%")
print()
# Top 10 failing workflows
print("🔥 TOP 10 MOST PROBLEMATIC WORKFLOWS")
print("-" * 80)
problematic = self.identify_problematic_workflows(threshold=30.0)[:10]
if not problematic:
print("✅ No highly problematic workflows found!")
else:
for i, (workflow, stats, failure_rate) in enumerate(problematic, 1):
print(f"{i:2d}. {workflow}")
print(f" Failure Rate: {failure_rate:5.1f}% ({stats['failure']}/{stats['total']} runs)")
print(f" Success: {stats['success']} Failure: {stats['failure']} Cancelled: {stats['cancelled']}")
print()
# Always failing workflows
print("💀 ALWAYS FAILING WORKFLOWS (100% Failure Rate)")
print("-" * 80)
always_failing = self.identify_always_failing_workflows()
if not always_failing:
print("✅ No workflows with 100% failure rate!")
else:
for workflow, stats in always_failing[:15]: # Top 15
print(f"{workflow}")
print(f" {stats['failure']} failures, 0 successes")
print()
# Self-healing workflow status
print("🤖 SELF-HEALING WORKFLOW STATUS")
print("-" * 80)
self_healing = self.identify_self_healing_status()
if not self_healing:
print("⚠️ No self-healing workflows detected")
else:
for workflow, data in self_healing.items():
stats = data['stats']
status_emoji = "" if data['status'] == 'working' else "🔴"
print(f"{status_emoji} {workflow}")
print(f" Success Rate: {data['success_rate']:5.1f}% "
f"Status: {data['status'].upper()}")
print(f" Total: {stats['total']} Success: {stats['success']} "
f"Failure: {stats['failure']}")
print()
# Pattern analysis
print("🔍 FAILURE PATTERN ANALYSIS")
print("-" * 80)
self.analyze_failure_patterns()
print()
# Recommendations
print("💡 RECOMMENDATIONS")
print("-" * 80)
self.generate_recommendations(always_failing, self_healing)
print()
def analyze_failure_patterns(self):
"""Analyze patterns in failing workflows"""
# Categorize workflows by type
categories = {
'deployment': ['deploy', 'cloudflare', 'railway', 'pages', 'multi-cloud'],
'ci_cd': ['ci', 'test', 'build', 'lint'],
'security': ['security', 'codeql', 'scan', 'compliance'],
'automation': ['bot', 'auto', 'sync', 'label'],
'self_healing': ['self-healing', 'auto-heal', 'auto-fix'],
'monitoring': ['health', 'dashboard', 'monitor', 'observability']
}
category_stats = defaultdict(lambda: {'success': 0, 'failure': 0, 'total': 0})
for workflow, stats in self.workflow_stats.items():
workflow_lower = workflow.lower()
categorized = False
for category, keywords in categories.items():
if any(keyword in workflow_lower for keyword in keywords):
category_stats[category]['success'] += stats['success']
category_stats[category]['failure'] += stats['failure']
category_stats[category]['total'] += stats['total']
categorized = True
break
if not categorized:
category_stats['other']['success'] += stats['success']
category_stats['other']['failure'] += stats['failure']
category_stats['other']['total'] += stats['total']
print("Workflow Category Failure Rates:")
print()
for category in sorted(category_stats.keys()):
stats = category_stats[category]
if stats['total'] == 0:
continue
failure_rate = (stats['failure'] / stats['total'] * 100)
status_emoji = "🟢" if failure_rate < 30 else "🟡" if failure_rate < 60 else "🔴"
print(f"{status_emoji} {category.replace('_', ' ').title():20s} "
f"Failure Rate: {failure_rate:5.1f}% "
f"({stats['failure']}/{stats['total']} runs)")
def generate_recommendations(self, always_failing, self_healing):
"""Generate actionable recommendations"""
recommendations = []
# Check overall health
total_runs = sum(stats['total'] for stats in self.workflow_stats.values())
total_failures = sum(stats['failure'] for stats in self.workflow_stats.values())
overall_failure_rate = (total_failures / total_runs * 100) if total_runs > 0 else 0
if overall_failure_rate > 70:
recommendations.append(
"🚨 CRITICAL: Overall failure rate is {:.1f}%. Immediate action required!\n"
" - Disable non-critical workflows temporarily\n"
" - Focus on fixing core CI/CD pipelines first\n"
" - Review workflow configurations for common issues".format(overall_failure_rate)
)
# Check self-healing workflows
if self_healing:
broken_self_healing = [w for w, d in self_healing.items() if d['status'] == 'stuck/broken']
if broken_self_healing:
recommendations.append(
"🤖 Self-healing workflows are BROKEN and may be in infinite loops:\n"
" {}\n"
" - Disable these workflows immediately\n"
" - Review logs for root cause\n"
" - Fix underlying issues before re-enabling".format(
'\n '.join(f'- {w}' for w in broken_self_healing)
)
)
# Check always failing workflows
if len(always_failing) > 10:
recommendations.append(
f"{len(always_failing)} workflows have 100% failure rate:\n"
" - Archive or delete workflows that are no longer needed\n"
" - Fix critical workflows (deployment, CI/CD) first\n"
" - Consider disabling broken automation until fixed"
)
# Specific workflow type recommendations
deployment_failures = sum(
stats['failure'] for wf, stats in self.workflow_stats.items()
if 'deploy' in wf.lower() or 'cloudflare' in wf.lower()
)
if deployment_failures > 20:
recommendations.append(
f"☁️ {deployment_failures} deployment workflow failures detected:\n"
" - Check Cloudflare/Railway API credentials\n"
" - Verify DNS configurations\n"
" - Review wrangler.toml files for correctness"
)
# Print recommendations
if recommendations:
for i, rec in enumerate(recommendations, 1):
print(f"{i}. {rec}")
print()
else:
print("✅ System is relatively healthy! Focus on addressing the top failing workflows.")
print()
def export_json_report(self, filename: str = "/Users/alexa/github-actions-health-report.json"):
"""Export detailed report as JSON"""
report = {
'generated_at': datetime.now().isoformat(),
'total_runs': len(self.all_runs),
'total_workflows': len(self.workflow_stats),
'workflow_stats': dict(self.workflow_stats),
'repo_stats': dict(self.repo_stats),
'always_failing': [
{'workflow': wf, 'stats': stats}
for wf, stats in self.identify_always_failing_workflows()
],
'self_healing_status': self.identify_self_healing_status()
}
with open(filename, 'w') as f:
json.dump(report, f, indent=2)
print(f"📄 Detailed JSON report exported to: {filename}")
def main():
analyzer = WorkflowHealthAnalyzer()
# Collect data
analyzer.collect_all_runs()
# Analyze
analyzer.analyze_workflows()
# Generate report
analyzer.generate_report()
# Export JSON
analyzer.export_json_report()
if __name__ == "__main__":
main()

300
scripts/python/infra-pager.py Executable file
View File

@@ -0,0 +1,300 @@
#!/usr/bin/env python3
"""
BlackRoad Infrastructure Pager
Physical YES/NO approval system for ALL operations
Connects ESP32 operator to:
- GitHub deployments
- Cloudflare deployments
- Railway services
- Pi cluster operations
- Agent deployments
"""
import serial
import time
import requests
import json
import sys
from datetime import datetime
DEVICE = "/dev/cu.usbserial-110"
BAUD = 115200
# Infrastructure endpoints
INFRA = {
"github_orgs": ["BlackRoad-OS", "BlackRoad-AI", "BlackRoad-Cloud"],
"cloudflare_zones": 16,
"cloudflare_pages": 8,
"railway_projects": 12,
"pi_cluster": ["192.168.4.38", "192.168.4.64", "192.168.4.99"],
"total_repos": 66,
"total_agents": 0 # Will grow to 30,000
}
class InfraPager:
def __init__(self):
self.ser = serial.Serial(DEVICE, BAUD, timeout=1)
time.sleep(2) # Wait for ESP32 reset
# Clear startup messages
if self.ser.in_waiting > 0:
self.ser.read(self.ser.in_waiting)
print("🌌 BlackRoad Infrastructure Pager")
print("=" * 60)
print(f"Connected: {DEVICE}")
print(f"GitHub Orgs: {len(INFRA['github_orgs'])}")
print(f"Repos: {INFRA['total_repos']}")
print(f"Cloudflare Zones: {INFRA['cloudflare_zones']}")
print(f"Railway Projects: {INFRA['railway_projects']}")
print(f"Pi Cluster Nodes: {len(INFRA['pi_cluster'])}")
print("=" * 60)
print()
self.pending_operations = []
self.approved_count = 0
self.rejected_count = 0
def send_command(self, cmd):
"""Send command to ESP32"""
self.ser.write(f"{cmd}\n".encode())
time.sleep(0.3)
if self.ser.in_waiting > 0:
response = self.ser.read(self.ser.in_waiting).decode('utf-8', errors='ignore')
return response.strip()
return ""
def ask_approval(self, operation, context=""):
"""Ask for YES/NO approval on hardware"""
timestamp = datetime.now().strftime("%H:%M:%S")
print(f"\n[{timestamp}] 🚨 APPROVAL REQUIRED")
print(f"Operation: {operation}")
if context:
print(f"Context: {context}")
print()
print("⚡ Waiting for PHYSICAL approval on USB-C operator...")
print(" Press YES button to approve")
print(" Press NO button to reject")
print()
# Wait for YES or NO from hardware
while True:
if self.ser.in_waiting > 0:
response = self.ser.read(self.ser.in_waiting).decode('utf-8', errors='ignore')
if "APPROVED" in response:
print(f"✅ [{timestamp}] APPROVED by operator")
self.approved_count += 1
return True
elif "REJECTED" in response:
print(f"❌ [{timestamp}] REJECTED by operator")
self.rejected_count += 1
return False
time.sleep(0.1)
def deploy_to_cloudflare(self, project_name):
"""Deploy to Cloudflare Pages"""
approved = self.ask_approval(
f"Deploy {project_name} to Cloudflare Pages",
f"Zone: blackroad.io | Pages: {INFRA['cloudflare_pages']}"
)
if approved:
print(f"🚀 Deploying {project_name}...")
# Actual deployment would happen here
print(f"{project_name} deployed to Cloudflare")
return True
else:
print(f"⏸️ Deployment cancelled: {project_name}")
return False
def scale_railway_service(self, service_name, instances):
"""Scale Railway service"""
approved = self.ask_approval(
f"Scale {service_name} to {instances} instances",
f"Railway Projects: {INFRA['railway_projects']}"
)
if approved:
print(f"📈 Scaling {service_name} to {instances} instances...")
# Actual scaling would happen here
print(f"{service_name} scaled successfully")
return True
else:
print(f"⏸️ Scaling cancelled: {service_name}")
return False
def deploy_agents(self, count, target="all"):
"""Deploy AI agents"""
approved = self.ask_approval(
f"Deploy {count} AI agents",
f"Target: {target} | Current: {INFRA['total_agents']}"
)
if approved:
print(f"🤖 Deploying {count} agents to {target}...")
# Actual agent deployment would happen here
INFRA['total_agents'] += count
print(f"{count} agents deployed (Total: {INFRA['total_agents']})")
return True
else:
print(f"⏸️ Agent deployment cancelled")
return False
def restart_pi_cluster(self):
"""Restart Pi cluster"""
approved = self.ask_approval(
"Restart entire Pi cluster",
f"Nodes: {', '.join(INFRA['pi_cluster'])}"
)
if approved:
print(f"🔄 Restarting Pi cluster...")
for ip in INFRA['pi_cluster']:
print(f" → Restarting {ip}...")
print(f"✅ Pi cluster restarted")
return True
else:
print(f"⏸️ Cluster restart cancelled")
return False
def merge_pr(self, repo, pr_number):
"""Merge GitHub PR"""
approved = self.ask_approval(
f"Merge PR #{pr_number}",
f"Repo: {repo}"
)
if approved:
print(f"🔀 Merging PR #{pr_number} in {repo}...")
# Actual merge would happen here via gh CLI
print(f"✅ PR #{pr_number} merged")
return True
else:
print(f"⏸️ PR merge cancelled")
return False
def show_stats(self):
"""Show pager statistics"""
print()
print("=" * 60)
print("📊 Infrastructure Pager Statistics")
print("=" * 60)
print(f"Total Approvals: {self.approved_count}")
print(f"Total Rejections: {self.rejected_count}")
print(f"Approval Rate: {self.approved_count / max(1, self.approved_count + self.rejected_count) * 100:.1f}%")
print(f"Active Agents: {INFRA['total_agents']}")
print("=" * 60)
def interactive_mode(self):
"""Interactive pager mode"""
print()
print("🎯 Interactive Mode - Choose operation:")
print()
print("1. Deploy to Cloudflare")
print("2. Scale Railway service")
print("3. Deploy AI agents")
print("4. Restart Pi cluster")
print("5. Merge GitHub PR")
print("6. Show statistics")
print("7. Test hardware (PING)")
print("0. Exit")
print()
while True:
try:
choice = input("Select operation (0-7): ").strip()
if choice == "0":
print("👋 Exiting pager...")
break
elif choice == "1":
project = input("Project name: ")
self.deploy_to_cloudflare(project)
elif choice == "2":
service = input("Service name: ")
instances = input("Number of instances: ")
self.scale_railway_service(service, instances)
elif choice == "3":
count = int(input("Number of agents: "))
target = input("Target (e.g., 'all', 'BlackRoad-OS'): ")
self.deploy_agents(count, target)
elif choice == "4":
self.restart_pi_cluster()
elif choice == "5":
repo = input("Repository: ")
pr = input("PR number: ")
self.merge_pr(repo, pr)
elif choice == "6":
self.show_stats()
elif choice == "7":
print("🏓 Testing hardware...")
response = self.send_command("PING")
print(f"Response: {response}")
except KeyboardInterrupt:
print("\n👋 Exiting pager...")
break
except Exception as e:
print(f"❌ Error: {e}")
def close(self):
"""Cleanup"""
self.show_stats()
self.ser.close()
print("\n✅ Pager disconnected")
def main():
"""Main entry point"""
try:
pager = InfraPager()
# Demo sequence
print("🎬 Running demo sequence...\n")
time.sleep(1)
# 1. Deploy dashboard
pager.deploy_to_cloudflare("blackroad-monitoring-dashboard")
# 2. Scale API
pager.scale_railway_service("blackroad-api", 5)
# 3. Deploy agents
pager.deploy_agents(100, "BlackRoad-OS")
# 4. Merge PR
pager.merge_pr("BlackRoad-OS/blackroad-os-operator", 42)
# Show stats
pager.show_stats()
# Interactive mode
print("\n" + "=" * 60)
print("Demo complete! Starting interactive mode...")
print("=" * 60)
pager.interactive_mode()
pager.close()
except serial.SerialException as e:
print(f"❌ Serial error: {e}")
print(f"Is ESP32 connected to {DEVICE}?")
sys.exit(1)
except KeyboardInterrupt:
print("\n\n👋 Interrupted by user")
sys.exit(0)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,380 @@
#!/usr/bin/env python3
"""
BlackRoad Memory Index Auto-Update Daemon
Watches journal file and automatically updates search index when new entries are added.
"""
import os
import sys
import time
import signal
import subprocess
from pathlib import Path
from datetime import datetime
import hashlib
# Configuration
MEMORY_DIR = Path.home() / ".blackroad" / "memory"
JOURNAL_FILE = MEMORY_DIR / "journals" / "master-journal.jsonl"
INDEX_DB = MEMORY_DIR / "memory-index.db"
PID_FILE = MEMORY_DIR / "memory-index-daemon.pid"
LOG_FILE = MEMORY_DIR / "memory-index-daemon.log"
INDEXER_SCRIPT = Path.home() / "memory-indexer.py"
# Settings
CHECK_INTERVAL = 5 # seconds between checks
BATCH_DELAY = 2 # seconds to wait before indexing (batch multiple writes)
# ANSI Colors
GREEN = '\033[0;32m'
YELLOW = '\033[1;33m'
CYAN = '\033[0;36m'
RED = '\033[0;31m'
NC = '\033[0m'
class MemoryIndexDaemon:
def __init__(self):
self.running = False
self.last_size = 0
self.last_mtime = 0
self.pending_update = False
self.update_scheduled_at = 0
def log(self, message, level="INFO"):
"""Log message to file and optionally stdout"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] [{level}] {message}\n"
# Write to log file
try:
with open(LOG_FILE, 'a') as f:
f.write(log_entry)
except Exception as e:
print(f"Failed to write log: {e}", file=sys.stderr)
# Also print if running in foreground
if not self.is_background():
print(f"{CYAN}[{level}]{NC} {message}")
def is_background(self):
"""Check if running in background"""
return os.getppid() != os.getpgrp()
def write_pid(self):
"""Write PID file"""
try:
with open(PID_FILE, 'w') as f:
f.write(str(os.getpid()))
self.log(f"PID file created: {PID_FILE}")
except Exception as e:
self.log(f"Failed to create PID file: {e}", "ERROR")
def remove_pid(self):
"""Remove PID file"""
try:
if PID_FILE.exists():
PID_FILE.unlink()
self.log("PID file removed")
except Exception as e:
self.log(f"Failed to remove PID file: {e}", "WARNING")
def check_already_running(self):
"""Check if daemon is already running"""
if not PID_FILE.exists():
return False
try:
with open(PID_FILE, 'r') as f:
pid = int(f.read().strip())
# Check if process exists
try:
os.kill(pid, 0) # Doesn't actually kill, just checks
return True
except OSError:
# Process doesn't exist, remove stale PID file
PID_FILE.unlink()
return False
except Exception:
return False
def get_file_info(self):
"""Get journal file size and modification time"""
if not JOURNAL_FILE.exists():
return 0, 0
stat = JOURNAL_FILE.stat()
return stat.st_size, stat.st_mtime
def run_indexer(self):
"""Run memory-indexer.py update"""
try:
self.log("Running index update...")
result = subprocess.run(
[sys.executable, str(INDEXER_SCRIPT), 'update'],
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
# Parse output for indexed count
output = result.stdout
if 'new entries' in output:
for line in output.split('\n'):
if 'Indexed' in line and 'new entries' in line:
self.log(f"{line.strip()}")
break
else:
self.log("✓ Index updated (no new entries)")
else:
self.log(f"Indexer failed: {result.stderr}", "ERROR")
except subprocess.TimeoutExpired:
self.log("Indexer timeout", "ERROR")
except Exception as e:
self.log(f"Indexer error: {e}", "ERROR")
def check_and_update(self):
"""Check for changes and schedule update if needed"""
current_size, current_mtime = self.get_file_info()
# Check if file has changed
if current_size != self.last_size or current_mtime != self.last_mtime:
if not self.pending_update:
self.log(f"Journal changed (size: {current_size} bytes)")
self.pending_update = True
self.update_scheduled_at = time.time() + BATCH_DELAY
self.last_size = current_size
self.last_mtime = current_mtime
# Check if it's time to run scheduled update
if self.pending_update and time.time() >= self.update_scheduled_at:
self.run_indexer()
self.pending_update = False
self.update_scheduled_at = 0
def signal_handler(self, signum, frame):
"""Handle shutdown signals"""
self.log(f"Received signal {signum}, shutting down...")
self.running = False
def start(self, foreground=False):
"""Start the daemon"""
# Check if already running
if self.check_already_running():
print(f"{RED}[✗]{NC} Daemon already running")
return 1
# Check dependencies
if not INDEXER_SCRIPT.exists():
print(f"{RED}[✗]{NC} Indexer script not found: {INDEXER_SCRIPT}")
return 1
if not JOURNAL_FILE.exists():
print(f"{YELLOW}[!]{NC} Journal file not found, will wait for it")
# Setup signal handlers
signal.signal(signal.SIGTERM, self.signal_handler)
signal.signal(signal.SIGINT, self.signal_handler)
# Write PID file
self.write_pid()
# Initialize state
self.last_size, self.last_mtime = self.get_file_info()
self.running = True
self.log("=" * 60)
self.log("Memory Index Daemon Started")
self.log(f"Watching: {JOURNAL_FILE}")
self.log(f"Check interval: {CHECK_INTERVAL}s")
self.log(f"Batch delay: {BATCH_DELAY}s")
self.log("=" * 60)
if not foreground:
print(f"{GREEN}[✓]{NC} Memory index daemon started")
print(f"{CYAN}[→]{NC} PID: {os.getpid()}")
print(f"{CYAN}[→]{NC} Log: {LOG_FILE}")
# Main loop
try:
while self.running:
self.check_and_update()
time.sleep(CHECK_INTERVAL)
except KeyboardInterrupt:
self.log("Interrupted by user")
finally:
self.log("Daemon stopped")
self.remove_pid()
return 0
def stop_daemon():
"""Stop running daemon"""
if not PID_FILE.exists():
print(f"{YELLOW}[!]{NC} Daemon not running (no PID file)")
return 1
try:
with open(PID_FILE, 'r') as f:
pid = int(f.read().strip())
print(f"{CYAN}[→]{NC} Stopping daemon (PID: {pid})...")
os.kill(pid, signal.SIGTERM)
# Wait for process to exit
for _ in range(10):
try:
os.kill(pid, 0)
time.sleep(0.5)
except OSError:
break
print(f"{GREEN}[✓]{NC} Daemon stopped")
return 0
except Exception as e:
print(f"{RED}[✗]{NC} Failed to stop daemon: {e}")
return 1
def status_daemon():
"""Check daemon status"""
if not PID_FILE.exists():
print(f"{YELLOW}[STATUS]{NC} Daemon is not running")
return 1
try:
with open(PID_FILE, 'r') as f:
pid = int(f.read().strip())
try:
os.kill(pid, 0)
print(f"{GREEN}[STATUS]{NC} Daemon is running")
print(f"{CYAN}[→]{NC} PID: {pid}")
print(f"{CYAN}[→]{NC} Log: {LOG_FILE}")
# Show recent log entries
if LOG_FILE.exists():
print(f"\n{CYAN}Recent log entries:{NC}")
with open(LOG_FILE, 'r') as f:
lines = f.readlines()
for line in lines[-5:]:
print(f" {line.rstrip()}")
return 0
except OSError:
print(f"{YELLOW}[STATUS]{NC} Daemon not running (stale PID file)")
PID_FILE.unlink()
return 1
except Exception as e:
print(f"{RED}[✗]{NC} Error checking status: {e}")
return 1
def show_logs(follow=False, lines=20):
"""Show daemon logs"""
if not LOG_FILE.exists():
print(f"{YELLOW}[!]{NC} No log file found: {LOG_FILE}")
return 1
if follow:
# Follow mode (like tail -f)
try:
with open(LOG_FILE, 'r') as f:
# Go to end
f.seek(0, 2)
print(f"{CYAN}Following log (Ctrl+C to stop):{NC}\n")
while True:
line = f.readline()
if line:
print(line.rstrip())
else:
time.sleep(0.1)
except KeyboardInterrupt:
print(f"\n{CYAN}[→]{NC} Stopped following")
return 0
else:
# Show last N lines
with open(LOG_FILE, 'r') as f:
all_lines = f.readlines()
recent = all_lines[-lines:] if len(all_lines) > lines else all_lines
print(f"{CYAN}Last {len(recent)} log entries:{NC}\n")
for line in recent:
print(line.rstrip())
return 0
def main():
import argparse
parser = argparse.ArgumentParser(
description="Memory Index Auto-Update Daemon",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
memory-index-daemon.py start # Start daemon in background
memory-index-daemon.py start --foreground # Start in foreground (testing)
memory-index-daemon.py stop # Stop daemon
memory-index-daemon.py status # Check if running
memory-index-daemon.py logs # Show recent logs
memory-index-daemon.py logs --follow # Follow logs in real-time
"""
)
parser.add_argument('command', choices=['start', 'stop', 'status', 'logs', 'restart'],
help='Command to execute')
parser.add_argument('--foreground', '-f', action='store_true',
help='Run in foreground (for start command)')
parser.add_argument('--follow', action='store_true',
help='Follow logs in real-time (for logs command)')
parser.add_argument('--lines', '-n', type=int, default=20,
help='Number of log lines to show (default: 20)')
args = parser.parse_args()
# Ensure memory directory exists
MEMORY_DIR.mkdir(parents=True, exist_ok=True)
if args.command == 'start':
daemon = MemoryIndexDaemon()
if args.foreground:
return daemon.start(foreground=True)
else:
# Daemonize
pid = os.fork()
if pid > 0:
# Parent process
return 0
# Child process continues
os.setsid()
return daemon.start(foreground=False)
elif args.command == 'stop':
return stop_daemon()
elif args.command == 'status':
return status_daemon()
elif args.command == 'logs':
return show_logs(follow=args.follow, lines=args.lines)
elif args.command == 'restart':
print(f"{CYAN}[→]{NC} Restarting daemon...")
stop_daemon()
time.sleep(1)
daemon = MemoryIndexDaemon()
# Fork for background
pid = os.fork()
if pid > 0:
return 0
os.setsid()
return daemon.start(foreground=False)
if __name__ == '__main__':
sys.exit(main())

348
scripts/python/memory-search.py Executable file
View File

@@ -0,0 +1,348 @@
#!/usr/bin/env python3
"""
BlackRoad Memory Search
Fast full-text search across 4,000+ PS-SHA-∞ memory entries
Supports text search, tag filtering, entity lookup, time-range queries
"""
import sqlite3
import sys
import argparse
from pathlib import Path
from datetime import datetime
import json
# Configuration
MEMORY_DIR = Path.home() / ".blackroad" / "memory"
INDEX_DB = MEMORY_DIR / "memory-index.db"
# ANSI Colors
BLUE = '\033[0;34m'
GREEN = '\033[0;32m'
YELLOW = '\033[1;33m'
RED = '\033[0;31m'
PURPLE = '\033[0;35m'
CYAN = '\033[0;36m'
BOLD = '\033[1m'
DIM = '\033[2m'
NC = '\033[0m'
def highlight(text, term):
"""Highlight search term in text"""
if not term:
return text
# Simple case-insensitive highlight
import re
pattern = re.compile(re.escape(term), re.IGNORECASE)
return pattern.sub(f"{YELLOW}{BOLD}\\g<0>{NC}", text)
def format_timestamp(ts):
"""Format timestamp for display"""
try:
dt = datetime.fromisoformat(ts.replace('Z', '+00:00'))
return dt.strftime('%Y-%m-%d %H:%M:%S')
except:
return ts
def print_result(row, query_term="", show_hash=False, compact=False):
"""Print a single search result"""
timestamp = row['timestamp']
action = row['action']
entity = row['entity']
details = row['details']
sha256 = row['sha256']
formatted_time = format_timestamp(timestamp)
if compact:
# Compact one-line format
details_preview = details[:80] + "..." if len(details) > 80 else details
print(f"{DIM}{formatted_time}{NC} {CYAN}{action}{NC}{GREEN}{entity}{NC} | {details_preview}")
else:
# Full format
print(f"{PURPLE}{'' * 80}{NC}")
print(f"{DIM}Time:{NC} {formatted_time}")
print(f"{DIM}Action:{NC} {CYAN}{action}{NC}")
print(f"{DIM}Entity:{NC} {GREEN}{entity}{NC}")
print(f"{DIM}Details:{NC} {highlight(details, query_term)}")
if show_hash:
print(f"{DIM}SHA256:{NC} {sha256[:16]}...")
print()
def search_text(query, limit=20, compact=False, show_hash=False):
"""Full-text search across all memories"""
if not INDEX_DB.exists():
print(f"{RED}[✗]{NC} Index not found. Run: ./memory-indexer.py rebuild")
return
conn = sqlite3.connect(str(INDEX_DB))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# FTS5 full-text search
cursor.execute("""
SELECT timestamp, action, entity, details, sha256
FROM memories_fts
WHERE memories_fts MATCH ?
ORDER BY rank
LIMIT ?
""", (query, limit))
results = cursor.fetchall()
count = len(results)
if count == 0:
print(f"{YELLOW}[!]{NC} No results found for: {BOLD}{query}{NC}")
conn.close()
return
print(f"\n{BOLD}{GREEN}Found {count} result(s){NC} for: {BOLD}{query}{NC}\n")
for row in results:
print_result(row, query_term=query, show_hash=show_hash, compact=compact)
if count == limit:
print(f"{DIM}Showing first {limit} results. Use --limit to see more.{NC}\n")
conn.close()
def search_by_action(action, limit=20, compact=False):
"""Search by action type"""
if not INDEX_DB.exists():
print(f"{RED}[✗]{NC} Index not found. Run: ./memory-indexer.py rebuild")
return
conn = sqlite3.connect(str(INDEX_DB))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
SELECT timestamp, action, entity, details, sha256
FROM memories_meta
WHERE action = ?
ORDER BY timestamp DESC
LIMIT ?
""", (action, limit))
results = cursor.fetchall()
count = len(results)
if count == 0:
print(f"{YELLOW}[!]{NC} No memories with action: {BOLD}{action}{NC}")
conn.close()
return
print(f"\n{BOLD}{GREEN}Found {count} result(s){NC} with action: {BOLD}{action}{NC}\n")
for row in results:
print_result(row, compact=compact)
conn.close()
def search_by_entity(entity, limit=20, compact=False):
"""Search by entity"""
if not INDEX_DB.exists():
print(f"{RED}[✗]{NC} Index not found. Run: ./memory-indexer.py rebuild")
return
conn = sqlite3.connect(str(INDEX_DB))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
SELECT timestamp, action, entity, details, sha256
FROM memories_meta
WHERE entity LIKE ?
ORDER BY timestamp DESC
LIMIT ?
""", (f'%{entity}%', limit))
results = cursor.fetchall()
count = len(results)
if count == 0:
print(f"{YELLOW}[!]{NC} No memories for entity: {BOLD}{entity}{NC}")
conn.close()
return
print(f"\n{BOLD}{GREEN}Found {count} result(s){NC} for entity: {BOLD}{entity}{NC}\n")
for row in results:
print_result(row, compact=compact)
conn.close()
def search_by_tag(tag, limit=20, compact=False):
"""Search by tag"""
if not INDEX_DB.exists():
print(f"{RED}[✗]{NC} Index not found. Run: ./memory-indexer.py rebuild")
return
conn = sqlite3.connect(str(INDEX_DB))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
SELECT m.timestamp, m.action, m.entity, m.details, m.sha256
FROM memories_meta m
JOIN tags t ON m.sha256 = t.memory_sha256
WHERE t.tag = ?
ORDER BY m.timestamp DESC
LIMIT ?
""", (tag, limit))
results = cursor.fetchall()
count = len(results)
if count == 0:
print(f"{YELLOW}[!]{NC} No memories with tag: {BOLD}#{tag}{NC}")
conn.close()
return
print(f"\n{BOLD}{GREEN}Found {count} result(s){NC} with tag: {BOLD}#{tag}{NC}\n")
for row in results:
print_result(row, compact=compact)
conn.close()
def list_actions():
"""List all unique actions"""
if not INDEX_DB.exists():
print(f"{RED}[✗]{NC} Index not found. Run: ./memory-indexer.py rebuild")
return
conn = sqlite3.connect(str(INDEX_DB))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
SELECT action, COUNT(*) as count
FROM memories_meta
GROUP BY action
ORDER BY count DESC
""")
results = cursor.fetchall()
print(f"\n{BOLD}{PURPLE}All Actions{NC} ({len(results)} unique)\n")
for row in results:
action = row['action']
count = row['count']
print(f"{CYAN}{action:30}{NC} {DIM}({count} entries){NC}")
print()
conn.close()
def list_entities(limit=50):
"""List top entities"""
if not INDEX_DB.exists():
print(f"{RED}[✗]{NC} Index not found. Run: ./memory-indexer.py rebuild")
return
conn = sqlite3.connect(str(INDEX_DB))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
SELECT entity, COUNT(*) as count
FROM memories_meta
GROUP BY entity
ORDER BY count DESC
LIMIT ?
""", (limit,))
results = cursor.fetchall()
print(f"\n{BOLD}{PURPLE}Top Entities{NC} (showing {len(results)})\n")
for row in results:
entity = row['entity']
count = row['count']
print(f"{GREEN}{entity:40}{NC} {DIM}({count} entries){NC}")
print()
conn.close()
def recent_memories(limit=10, compact=True):
"""Show recent memories"""
if not INDEX_DB.exists():
print(f"{RED}[✗]{NC} Index not found. Run: ./memory-indexer.py rebuild")
return
conn = sqlite3.connect(str(INDEX_DB))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
SELECT timestamp, action, entity, details, sha256
FROM memories_meta
ORDER BY timestamp DESC
LIMIT ?
""", (limit,))
results = cursor.fetchall()
print(f"\n{BOLD}{PURPLE}Recent Memories{NC} (last {limit})\n")
for row in results:
print_result(row, compact=compact)
print()
conn.close()
def main():
parser = argparse.ArgumentParser(
description="BlackRoad Memory Search - Search PS-SHA-∞ memory entries",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
memory-search.py "PR templates" # Full-text search
memory-search.py --action completed # Search by action
memory-search.py --entity blackroad-os # Search by entity
memory-search.py --tag deployment # Search by tag
memory-search.py --recent 20 # Show 20 recent entries
memory-search.py --list-actions # List all actions
memory-search.py --list-entities # List top entities
Options:
--compact Show compact one-line results
--limit N Limit results (default: 20)
--hash Show SHA256 hashes
"""
)
parser.add_argument('query', nargs='?', help='Search query for full-text search')
parser.add_argument('--action', help='Search by action type')
parser.add_argument('--entity', help='Search by entity')
parser.add_argument('--tag', help='Search by tag')
parser.add_argument('--recent', type=int, metavar='N', help='Show N recent memories')
parser.add_argument('--list-actions', action='store_true', help='List all unique actions')
parser.add_argument('--list-entities', action='store_true', help='List top entities')
parser.add_argument('--compact', action='store_true', help='Show compact one-line results')
parser.add_argument('--limit', type=int, default=20, help='Limit number of results')
parser.add_argument('--hash', action='store_true', help='Show SHA256 hashes')
args = parser.parse_args()
# Determine which search to perform
if args.list_actions:
list_actions()
elif args.list_entities:
list_entities(limit=50)
elif args.recent:
recent_memories(limit=args.recent, compact=args.compact)
elif args.action:
search_by_action(args.action, limit=args.limit, compact=args.compact)
elif args.entity:
search_by_entity(args.entity, limit=args.limit, compact=args.compact)
elif args.tag:
search_by_tag(args.tag, limit=args.limit, compact=args.compact)
elif args.query:
search_text(args.query, limit=args.limit, compact=args.compact, show_hash=args.hash)
else:
parser.print_help()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,430 @@
#!/usr/bin/env python3
"""
RoadChain Model Verification — SHA-2048 identity for every AI model on the system.
Scans macOS for Apple AI models (.mlmodelc) and frameworks, computes
SHA-2048 fingerprints, and registers them on-chain.
identity > provider — the model's hash IS the model.
Usage:
python3 roadchain-verify-models.py # Full scan + register
python3 roadchain-verify-models.py --stats # Show stats
python3 roadchain-verify-models.py --list # List all verified models
python3 roadchain-verify-models.py --verify # Re-verify all models
BlackRoad OS, Inc. 2026
"""
import sys
import time
from pathlib import Path
from roadchain.identity.model_registry import ModelRegistry
# ── Colors ────────────────────────────────────────────────────────────
PINK = "\033[38;5;205m"
AMBER = "\033[38;5;214m"
BLUE = "\033[38;5;69m"
VIOLET = "\033[38;5;135m"
GREEN = "\033[38;5;82m"
WHITE = "\033[1;37m"
DIM = "\033[2m"
RED = "\033[38;5;196m"
RESET = "\033[0m"
# ══════════════════════════════════════════════════════════════════════
# APPLE AI MODELS — CoreML compiled models (.mlmodelc)
# ══════════════════════════════════════════════════════════════════════
APPLE_MODELS = [
# ── Body Pose Detection ──
("2DHumanPoseDetectorFull", "/System/Library/PrivateFrameworks/AltruisticBodyPoseKit.framework/2DHumanPoseDetectorFull.mlmodelc", "pose-detection", "AltruisticBodyPoseKit"),
("2DHumanPoseDetectorFull_H13", "/System/Library/PrivateFrameworks/AltruisticBodyPoseKit.framework/H13/2DHumanPoseDetectorFull_H13.mlmodelc", "pose-detection", "AltruisticBodyPoseKit"),
("2DHumanPoseDetectorFull_H14", "/System/Library/PrivateFrameworks/AltruisticBodyPoseKit.framework/H14/2DHumanPoseDetectorFull_H14.mlmodelc", "pose-detection", "AltruisticBodyPoseKit"),
("2DHumanPoseDetectorFull_H15", "/System/Library/PrivateFrameworks/AltruisticBodyPoseKit.framework/H15/2DHumanPoseDetectorFull_H15.mlmodelc", "pose-detection", "AltruisticBodyPoseKit"),
("3DHumanPoseLiftingSequenceFirstStage", "/System/Library/PrivateFrameworks/AltruisticBodyPoseKit.framework/3DHumanPoseLiftingSequenceFirstStage.mlmodelc", "pose-detection", "AltruisticBodyPoseKit"),
("3DHumanPoseLiftingSequenceFirstStage_H13", "/System/Library/PrivateFrameworks/AltruisticBodyPoseKit.framework/H13/3DHumanPoseLiftingSequenceFirstStage_H13.mlmodelc", "pose-detection", "AltruisticBodyPoseKit"),
("3DHumanPoseLiftingSequenceFirstStage_H14", "/System/Library/PrivateFrameworks/AltruisticBodyPoseKit.framework/H14/3DHumanPoseLiftingSequenceFirstStage_H14.mlmodelc", "pose-detection", "AltruisticBodyPoseKit"),
("3DHumanPoseLiftingSequenceFirstStage_H15", "/System/Library/PrivateFrameworks/AltruisticBodyPoseKit.framework/H15/3DHumanPoseLiftingSequenceFirstStage_H15.mlmodelc", "pose-detection", "AltruisticBodyPoseKit"),
# ── Voice / Speech ──
("AcousticLID", "/System/Library/PrivateFrameworks/CoreSpeech.framework/Resources/AcousticLID.mlmodelc", "speech", "CoreSpeech"),
("AutoG2P8B", "/System/Library/PrivateFrameworks/VoiceActions.framework/Versions/A/Resources/AutoG2P8B.mlmodelc", "speech", "VoiceActions"),
# ── Nearby Interaction / Antenna ──
("AntennaMask_1_NN_V5_Model_DeviceType_201", "/System/Library/NearbyInteractionBundles/BiasEstimatorResourceBundle.bundle/Contents/Resources/AntennaMask_1_NN_V5_Model_DeviceType_201.mlmodelc", "spatial", "NearbyInteraction"),
("AntennaMask_1_NN_V5_ScalingModel_DeviceType_201", "/System/Library/NearbyInteractionBundles/BiasEstimatorResourceBundle.bundle/Contents/Resources/AntennaMask_1_NN_V5_ScalingModel_DeviceType_201.mlmodelc", "spatial", "NearbyInteraction"),
("AntennaMask_2_NN_V5_Model_DeviceType_201", "/System/Library/NearbyInteractionBundles/BiasEstimatorResourceBundle.bundle/Contents/Resources/AntennaMask_2_NN_V5_Model_DeviceType_201.mlmodelc", "spatial", "NearbyInteraction"),
("AntennaMask_2_NN_V5_ScalingModel_DeviceType_201", "/System/Library/NearbyInteractionBundles/BiasEstimatorResourceBundle.bundle/Contents/Resources/AntennaMask_2_NN_V5_ScalingModel_DeviceType_201.mlmodelc", "spatial", "NearbyInteraction"),
# ── Messaging / Contacts ──
("AutoSendModel", "/System/Library/PrivateFrameworks/CoreSuggestions.framework/Resources/AutoSendModel.mlmodelc", "suggestions", "CoreSuggestions"),
("AutoSendPrivateNoOp", "/System/Library/PrivateFrameworks/CoreSuggestions.framework/Resources/AutoSendPrivateNoOp.mlmodelc", "suggestions", "CoreSuggestions"),
("ContactRanker", "/System/Library/PrivateFrameworks/PeopleSuggester.framework/Versions/A/Resources/ContactRanker.mlmodelc", "contacts", "PeopleSuggester"),
("ContactRankerModel", "/System/Library/PrivateFrameworks/PeopleSuggester.framework/Versions/A/Resources/ContactRankerModel.mlmodelc", "contacts", "PeopleSuggester"),
("ContactRanker_watchos_ios_baxter", "/System/Library/PrivateFrameworks/PeopleSuggester.framework/Versions/A/Resources/ContactRanker_watchos_ios_baxter.mlmodelc", "contacts", "PeopleSuggester"),
("MentionGenerationModel", "/System/Library/PrivateFrameworks/CoreSuggestions.framework/Resources/MentionGenerationModel.mlmodelc", "nlp", "CoreSuggestions"),
("MessageAppPredictorPeopleCentric", "/System/Library/PrivateFrameworks/CoreSuggestions.framework/Resources/MessageAppPredictorPeopleCentric.mlmodelc", "suggestions", "CoreSuggestions"),
("MDNameToEmailPersonLinker", "/System/Library/PrivateFrameworks/PeopleSuggester.framework/Versions/A/Resources/MDNameToEmailPersonLinker.mlmodelc", "contacts", "PeopleSuggester"),
("MDNameToNamePersonLinker", "/System/Library/PrivateFrameworks/PeopleSuggester.framework/Versions/A/Resources/MDNameToNamePersonLinker.mlmodelc", "contacts", "PeopleSuggester"),
# ── Dining / Maps ──
("DiningOutModel", "/System/Library/PrivateFrameworks/CoreSuggestions.framework/Resources/DiningOutModel.mlmodelc", "suggestions", "CoreSuggestions"),
("MapsSuggestionsTransportModePrediction", "/System/Library/CoreServices/MapsSuggestionsTransportModePrediction.mlmodelc", "maps", "MapsSuggestions"),
# ── NLP / Entity ──
("EEPmodel_Dictation_v1_hallucination_1", "/System/Library/PrivateFrameworks/CoreSpeech.framework/Resources/EEPmodel_Dictation_v1_hallucination_1.mlmodelc", "speech", "CoreSpeech"),
("EEPmodel_v8_hallucination_1", "/System/Library/PrivateFrameworks/CoreSpeech.framework/Resources/EEPmodel_v8_hallucination_1.mlmodelc", "speech", "CoreSpeech"),
("EntityRelevanceModel", "/System/Library/PrivateFrameworks/CoreSuggestions.framework/Resources/EntityRelevanceModel.mlmodelc", "nlp", "CoreSuggestions"),
("EntityRerankerModel", "/System/Library/PrivateFrameworks/CoreSuggestions.framework/Resources/EntityRerankerModel.mlmodelc", "nlp", "CoreSuggestions"),
("EntityTagging_Family", "/System/Library/PrivateFrameworks/CoreSuggestions.framework/Resources/EntityTagging_Family.mlmodelc", "nlp", "CoreSuggestions"),
("EntityTagging_FamilyAndFriends", "/System/Library/PrivateFrameworks/CoreSuggestions.framework/Resources/EntityTagging_FamilyAndFriends.mlmodelc", "nlp", "CoreSuggestions"),
("PPModel_NE_Filtering", "/System/Library/PrivateFrameworks/CoreSuggestions.framework/Resources/PPModel_NE_Filtering.mlmodelc", "nlp", "CoreSuggestions"),
("PSC", "/System/Library/PrivateFrameworks/CoreSuggestions.framework/Resources/PSC.mlmodelc", "nlp", "CoreSuggestions"),
# ── Vision / Image ──
("ETShadowModel", "/System/Library/PrivateFrameworks/CoreSuggestions.framework/Resources/ETShadowModel.mlmodelc", "vision", "CoreSuggestions"),
("ImageClassifier", "/System/Library/PrivateFrameworks/CoreSuggestions.framework/Resources/ImageClassifier.mlmodelc", "vision", "CoreSuggestions"),
("Image_Estimator_HEIF", "/System/iOSSupport/System/Library/PrivateFrameworks/IMTranscoderAgent.framework/Versions/A/Resources/Image_Estimator_HEIF.mlmodelc", "vision", "IMTranscoderAgent"),
("MonzaV4_1", "/System/Library/PrivateFrameworks/CoreSuggestions.framework/Resources/MonzaV4_1.mlmodelc", "vision", "CoreSuggestions"),
# ── Search / Ranking ──
("FCUserVectorModel", "/System/Library/PrivateFrameworks/CoreSuggestions.framework/Resources/FCUserVectorModel.mlmodelc", "search", "CoreSuggestions"),
("L2XGBRegressor", "/System/Library/PrivateFrameworks/SpotlightServices.framework/Versions/A/Resources/L2XGBRegressor.mlmodelc", "search", "SpotlightServices"),
("LOITypeToOneHotTransformer", "/System/Library/PrivateFrameworks/CoreSuggestions.framework/Resources/LOITypeToOneHotTransformer.mlmodelc", "search", "CoreSuggestions"),
# ── Sound Analysis ──
("SNAudioQualityModel", "/System/Library/Frameworks/SoundAnalysis.framework/Versions/A/Resources/SNAudioQualityModel.mlmodelc", "audio", "SoundAnalysis"),
("SNSoundPrintAEmbeddingModel", "/System/Library/Frameworks/SoundAnalysis.framework/Versions/A/Resources/SNSoundPrintAEmbeddingModel.mlmodelc", "audio", "SoundAnalysis"),
("SNSoundPrintKEmbeddingModel", "/System/Library/Frameworks/SoundAnalysis.framework/Versions/A/Resources/SNSoundPrintKEmbeddingModel.mlmodelc", "audio", "SoundAnalysis"),
("SNVGGishBabbleModel", "/System/Library/Frameworks/SoundAnalysis.framework/Versions/A/Resources/SNVGGishBabbleModel.mlmodelc", "audio", "SoundAnalysis"),
("SNVGGishCheeringModel", "/System/Library/Frameworks/SoundAnalysis.framework/Versions/A/Resources/SNVGGishCheeringModel.mlmodelc", "audio", "SoundAnalysis"),
("SNVGGishEmbeddingModel", "/System/Library/Frameworks/SoundAnalysis.framework/Versions/A/Resources/SNVGGishEmbeddingModel.mlmodelc", "audio", "SoundAnalysis"),
("SNVGGishFireAlarmModel", "/System/Library/Frameworks/SoundAnalysis.framework/Versions/A/Resources/SNVGGishFireAlarmModel.mlmodelc", "audio", "SoundAnalysis"),
("SNVGGishLaughterModel", "/System/Library/Frameworks/SoundAnalysis.framework/Versions/A/Resources/SNVGGishLaughterModel.mlmodelc", "audio", "SoundAnalysis"),
("SNVGGishMusicModel", "/System/Library/Frameworks/SoundAnalysis.framework/Versions/A/Resources/SNVGGishMusicModel.mlmodelc", "audio", "SoundAnalysis"),
("SNVGGishSpeechModel", "/System/Library/Frameworks/SoundAnalysis.framework/Versions/A/Resources/SNVGGishSpeechModel.mlmodelc", "audio", "SoundAnalysis"),
]
# ══════════════════════════════════════════════════════════════════════
# APPLE AI FRAMEWORKS
# ══════════════════════════════════════════════════════════════════════
APPLE_FRAMEWORKS = [
# ── AIML Infrastructure ──
("AIMLExperimentationAnalytics", "aiml-infra"),
("AIMLInstrumentationStreams", "aiml-infra"),
# ── Suggestions ──
("AccountSuggestions", "suggestions"),
("CoreSuggestions", "suggestions"),
("CoreSuggestionsInternals", "suggestions"),
("CoreSuggestionsML", "suggestions"),
("CoreSuggestionsUI", "suggestions"),
("MapsSuggestions", "suggestions"),
("Suggestions", "suggestions"),
("SuggestionsSpotlightMetrics", "suggestions"),
("PeopleSuggester", "suggestions"),
("ProactiveML", "suggestions"),
("ProactiveSuggestionClientModel", "suggestions"),
# ── Neural Engine / ML Runtime ──
("AppleNeuralEngine", "neural-engine"),
("NeuralNetworks", "neural-engine"),
("CPMLBestShim", "ml-runtime"),
("CipherML", "ml-runtime"),
("MLAssetIO", "ml-runtime"),
("MLCompilerRuntime", "ml-runtime"),
("MLCompilerServices", "ml-runtime"),
("MLModelSpecification", "ml-runtime"),
("MLRuntime", "ml-runtime"),
("CoreMLTestFramework", "ml-runtime"),
("LighthouseCoreMLFeatureStore", "ml-runtime"),
("LighthouseCoreMLModelAnalysis", "ml-runtime"),
("LighthouseCoreMLModelStore", "ml-runtime"),
("RemoteCoreML", "ml-runtime"),
("MediaML", "ml-runtime"),
("MediaMLServices", "ml-runtime"),
("SAML", "ml-runtime"),
# ── Speech / TTS ──
("CoreSpeech", "speech"),
("CoreSpeechExclave", "speech"),
("CoreSpeechFoundation", "speech"),
("CoreEmbeddedSpeechRecognition", "speech"),
("LocalSpeechRecognitionBridge", "speech"),
("LiveSpeechServices", "speech"),
("LiveSpeechUI", "speech"),
("SpeechDetector", "speech"),
("SpeechDictionary", "speech"),
("SpeechObjects", "speech"),
("SpeechRecognitionCommandServices", "speech"),
("SpeechRecognitionCore", "speech"),
("SpeechRecognitionSharedSupport", "speech"),
("TextToSpeech", "tts"),
("TextToSpeechBundleSupport", "tts"),
("TextToSpeechKonaSupport", "tts"),
("TextToSpeechMauiSupport", "tts"),
("TextToSpeechVoiceBankingSupport", "tts"),
("TextToSpeechVoiceBankingUI", "tts"),
("DataDetectorsNaturalLanguage", "nlp"),
# ── Apple Intelligence ──
("IntelligenceEngine", "apple-intelligence"),
("IntelligencePlatform", "apple-intelligence"),
("IntelligencePlatformCompute", "apple-intelligence"),
("IntelligencePlatformCore", "apple-intelligence"),
("IntelligencePlatformLibrary", "apple-intelligence"),
("OSIntelligence", "apple-intelligence"),
("PersonalIntelligenceCore", "apple-intelligence"),
# ── Siri ──
("SiriActivationFoundation", "siri"),
("SiriAnalytics", "siri"),
("SiriAppLaunchIntents", "siri"),
("SiriAppResolution", "siri"),
("SiriAudioIntentUtils", "siri"),
("SiriAudioInternal", "siri"),
("SiriAudioSnippetKit", "siri"),
("SiriAudioSupport", "siri"),
("SiriCalendarIntents", "siri"),
("SiriCalendarUI", "siri"),
("SiriCam", "siri"),
("SiriContactsIntents", "siri"),
("SiriCore", "siri"),
("SiriCoreMetrics", "siri"),
("SiriCorrections", "siri"),
("SiriCrossDeviceArbitration", "siri"),
("SiriCrossDeviceArbitrationFeedback", "siri"),
("SiriDailyBriefingInternal", "siri"),
("SiriDialogEngine", "siri"),
("SiriEmergencyIntents", "siri"),
("SiriEntityMatcher", "siri"),
("SiriFindMy", "siri"),
("SiriFlowEnvironment", "siri"),
("SiriFoundation", "siri"),
("SiriGeo", "siri"),
("SiriHomeAccessoryFramework", "siri"),
("SiriIdentityInternal", "siri"),
("SiriInCall", "siri"),
("SiriInference", "siri"),
("SiriInferenceFlow", "siri"),
("SiriInferenceIntents", "siri"),
("SiriInformationSearch", "siri"),
("SiriInformationTypes", "siri"),
("SiriInstrumentation", "siri"),
("SiriIntentEvents", "siri"),
("SiriInteractive", "siri"),
("SiriKitFlow", "siri"),
("SiriKitInvocation", "siri"),
("SiriKitRuntime", "siri"),
("SiriLiminal", "siri"),
("SiriMailInternal", "siri"),
("SiriMailUI", "siri"),
("SiriMessageBus", "siri"),
("SiriMessageTypes", "siri"),
("SiriMessagesCommon", "siri"),
("SiriMessagesFlow", "siri"),
("SiriMessagesUI", "siri"),
("SiriNLUOverrides", "siri"),
("SiriNLUTypes", "siri"),
("SiriNaturalLanguageGeneration", "siri"),
("SiriNaturalLanguageParsing", "siri"),
("SiriNetwork", "siri"),
("SiriNotebook", "siri"),
("SiriNotificationsIntents", "siri"),
("SiriObservation", "siri"),
("SiriOntology", "siri"),
("SiriOntologyProtobuf", "siri"),
("SiriPaymentsIntents", "siri"),
("SiriPlaybackControlIntents", "siri"),
("SiriPlaybackControlSupport", "siri"),
("SiriPowerInstrumentation", "siri"),
("SiriPrivateLearningAnalytics", "siri"),
("SiriPrivateLearningInference", "siri"),
("SiriPrivateLearningLogging", "siri"),
("SiriReferenceResolution", "siri"),
("SiriReferenceResolutionDataModel", "siri"),
("SiriReferenceResolver", "siri"),
("SiriRemembers", "siri"),
("SiriRequestDispatcher", "siri"),
("SiriSettingsIntents", "siri"),
("SiriSetup", "siri"),
("SiriSharedUI", "siri"),
("SiriSignals", "siri"),
("SiriSocialConversation", "siri"),
("SiriSpeechSynthesis", "siri"),
("SiriSuggestions", "siri"),
("SiriSuggestionsAPI", "siri"),
("SiriSuggestionsIntelligence", "siri"),
("SiriSuggestionsKit", "siri"),
("SiriSuggestionsSupport", "siri"),
("SiriTTS", "siri"),
("SiriTTSService", "siri"),
("SiriTTSTraining", "siri"),
("SiriTaskEngagement", "siri"),
("SiriTasks", "siri"),
("SiriTimeAlarmInternal", "siri"),
("SiriTimeInternal", "siri"),
("SiriTimeTimerInternal", "siri"),
("SiriTranslationIntents", "siri"),
("SiriUI", "siri"),
("SiriUIBridge", "siri"),
("SiriUICore", "siri"),
("SiriUIFoundation", "siri"),
("SiriUserSegments", "siri"),
("SiriUtilities", "siri"),
("SiriVOX", "siri"),
("SiriVideoIntents", "siri"),
("SiriVirtualDeviceResolution", "siri"),
("SiriWellnessIntents", "siri"),
# ── Parsec (Search) ──
("CoreParsec", "search"),
("ParsecModel", "search"),
("ParsecSubscriptionServiceSupport", "search"),
# ── Knowledge / Vision ──
("CoreKnowledge", "knowledge"),
("KnowledgeGraphKit", "knowledge"),
("KnowledgeMonitor", "knowledge"),
("PhotosIntelligence", "vision"),
("PhotosKnowledgeGraph", "vision"),
("VisionCore", "vision"),
("VisionKitCore", "vision"),
("VisualIntelligence", "vision"),
("SensitiveContentAnalysisML", "vision"),
("PostSiriEngagement", "engagement"),
]
def scan_and_register():
"""Scan all Apple AI models and frameworks, register with SHA-2048."""
registry = ModelRegistry()
print(f"""
{PINK}╔══════════════════════════════════════════════════════════════╗{RESET}
{PINK}{RESET} {WHITE}ROADCHAIN MODEL VERIFICATION{RESET}{AMBER}SHA-2048{RESET} {PINK}{RESET}
{PINK}{RESET} {DIM}identity > provider{RESET} {PINK}{RESET}
{PINK}╚══════════════════════════════════════════════════════════════╝{RESET}
""")
# ── Register CoreML Models ──
print(f"{WHITE}Verifying Apple AI Models (.mlmodelc)...{RESET}")
print(f"{'' * 70}")
model_count = 0
for name, path, category, framework in APPLE_MODELS:
exists = Path(path).exists()
record = registry.register_model(
name=name,
path=path,
model_type="mlmodelc",
vendor="apple",
category=category,
framework=framework,
)
status = f"{GREEN}VERIFIED{RESET}" if exists else f"{AMBER}INDEXED{RESET}"
size_str = f"{record.size_bytes / 1024:.0f}KB" if record.size_bytes > 0 else "---"
print(f" {status} {name:<50} {record.short_id} {size_str}")
model_count += 1
print(f"\n{GREEN}{model_count} models registered{RESET}\n")
# ── Register Frameworks ──
print(f"{WHITE}Verifying Apple AI Frameworks...{RESET}")
print(f"{'' * 70}")
fw_count = 0
for name, category in APPLE_FRAMEWORKS:
# Try both Frameworks and PrivateFrameworks
fw_path = f"/System/Library/PrivateFrameworks/{name}.framework"
if not Path(fw_path).exists():
fw_path = f"/System/Library/Frameworks/{name}.framework"
record = registry.register_model(
name=name,
path=fw_path,
model_type="framework",
vendor="apple",
category=category,
framework=name,
)
exists = Path(fw_path).exists()
status = f"{GREEN}VERIFIED{RESET}" if exists else f"{AMBER}INDEXED{RESET}"
print(f" {status} {name:<50} {record.short_id}")
fw_count += 1
print(f"\n{GREEN}{fw_count} frameworks registered{RESET}\n")
# ── Stats ──
stats = registry.stats()
print(f"{PINK}{'' * 70}{RESET}")
print(f"{WHITE}SHA-2048 MODEL VERIFICATION COMPLETE{RESET}")
print(f"{PINK}{'' * 70}{RESET}")
print(f" Total: {stats['total_models']} models + frameworks")
print(f" Verified: {GREEN}{stats['verified']}{RESET}")
print(f" Size: {stats['total_size_mb']} MB indexed")
print()
print(f" {WHITE}By Vendor:{RESET}")
for v, c in stats["vendors"].items():
print(f" {v:<20} {c}")
print()
print(f" {WHITE}By Type:{RESET}")
for t, c in stats["types"].items():
print(f" {t:<20} {c}")
print()
print(f" {WHITE}By Category:{RESET}")
for cat, c in stats["categories"].items():
print(f" {cat:<24} {c}")
print()
print(f" {DIM}identity > provider — every model has a 2048-bit fingerprint{RESET}")
registry.close()
return stats
def show_stats():
registry = ModelRegistry()
stats = registry.stats()
registry.close()
print(f"{WHITE}Model Registry Stats:{RESET}")
for k, v in stats.items():
print(f" {k}: {v}")
def list_models():
registry = ModelRegistry()
records = registry.list_all()
registry.close()
print(f"{WHITE}Verified Models ({len(records)}):{RESET}")
print(f"{'' * 80}")
print(f" {'Name':<45} {'Type':<12} {'ID':<18} {'Category'}")
print(f"{'' * 80}")
for r in records:
print(f" {r.name:<45} {r.model_type:<12} {r.short_id:<18} {r.metadata.get('category', '')}")
if __name__ == "__main__":
args = sys.argv[1:]
if "--stats" in args:
show_stats()
elif "--list" in args:
list_models()
elif "--verify" in args:
print("Re-verifying all models...")
registry = ModelRegistry()
records = registry.list_all()
for r in records:
ok = registry.verify_model(r.name)
status = f"{GREEN}OK{RESET}" if ok else f"{RED}CHANGED{RESET}"
print(f" {status} {r.name}")
registry.close()
else:
scan_and_register()