Add 5 Python agent utilities

- Agent system core with lifecycle management
- Human reasoning templates for emotional/moral AI
- Reasoning pattern templates for LLM behavior
- Health monitor for 30K+ agent fleet diagnostics
- RoadChain security scanner for fleet hardening

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Alexa Amundson
2026-02-20 20:29:06 -06:00
parent b29b095f99
commit c40973bfd5
5 changed files with 3582 additions and 0 deletions

574
scripts/python/agent-system.py Executable file
View File

@@ -0,0 +1,574 @@
#!/usr/bin/env python3
"""
BlackRoad Agent System - Multi-Model Distributed Intelligence
Unmatched intelligence through quantum computing spectrum orchestration
"""
import json
import asyncio
import subprocess
from datetime import datetime
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from enum import Enum
class QCSPosition(Enum):
"""Quantum Computing Spectrum positions for different agent types"""
RAPID_COLLAPSE = 0.80 # Fast, structured (Gemma2)
BALANCED = 0.75 # Chain-of-thought (DeepSeek-R1)
DEEP_EXPLORATION = 0.65 # Thorough (Qwen2.5)
CREATIVE = 0.60 # Exploratory (LLaMA variants)
SPECIALIZED = 0.70 # Domain-specific
class AgentRole(Enum):
"""Specialized agent roles in the BlackRoad system"""
ARCHITECT = "architect" # System design & planning
RESEARCHER = "researcher" # Deep research & analysis
CODER = "coder" # Code generation & review
REASONER = "reasoner" # Logical reasoning & problem solving
MATHEMATICIAN = "mathematician" # Mathematical reasoning
COORDINATOR = "coordinator" # Multi-agent orchestration
MEMORY_KEEPER = "memory_keeper" # Context & memory management
QUANTUM_ANALYST = "quantum" # Quantum computing specialist
VISION = "vision" # Visual understanding
CREATIVE_WRITER = "writer" # Content generation
@dataclass
class Agent:
"""Individual BlackRoad agent with quantum properties"""
name: str
role: AgentRole
model: str
node: str
qcs_position: float
specialization: str
capabilities: List[str]
temperature: float = 0.7
max_tokens: int = 2048
active: bool = True
def __str__(self):
return f"{self.name} ({self.role.value}) @ QCS {self.qcs_position} on {self.node}"
@dataclass
class Task:
"""Task to be processed by the agent system"""
id: str
description: str
required_roles: List[AgentRole]
priority: str # "low", "medium", "high", "critical"
context: Dict[str, Any]
assigned_agents: List[str] = None
results: List[Dict] = None
status: str = "pending" # pending, in_progress, completed, failed
def __post_init__(self):
if self.assigned_agents is None:
self.assigned_agents = []
if self.results is None:
self.results = []
class BlackRoadAgentSystem:
"""
Multi-model distributed intelligence system
Orchestrates agents across different QCS positions for unmatched intelligence
"""
def __init__(self):
self.agents: Dict[str, Agent] = {}
self.tasks: Dict[str, Task] = {}
self.shared_memory: Dict[str, Any] = {}
self.conversation_history: List[Dict] = []
self.initialize_agents()
def initialize_agents(self):
"""Initialize all BlackRoad agents with specialized roles"""
# === TIER 1: RAPID RESPONSE AGENTS (QCS ~0.80) ===
self.register_agent(Agent(
name="Gemma-Architect",
role=AgentRole.ARCHITECT,
model="gemma2:2b",
node="aria",
qcs_position=0.80,
specialization="System design, architecture planning, structured documentation",
capabilities=["system_design", "architecture", "planning", "documentation"],
temperature=0.5 # Lower temp for precise architecture
))
self.register_agent(Agent(
name="Gemma-Coordinator",
role=AgentRole.COORDINATOR,
model="gemma2:2b",
node="aria",
qcs_position=0.80,
specialization="Multi-agent orchestration, task distribution, quick decisions",
capabilities=["orchestration", "delegation", "monitoring", "optimization"],
temperature=0.6
))
# === TIER 2: REASONING AGENTS (QCS ~0.75) ===
self.register_agent(Agent(
name="DeepSeek-Reasoner",
role=AgentRole.REASONER,
model="deepseek-r1:1.5b",
node="aria",
qcs_position=0.75,
specialization="Chain-of-thought reasoning, problem decomposition, logical analysis",
capabilities=["reasoning", "logic", "problem_solving", "chain_of_thought"],
temperature=0.7
))
self.register_agent(Agent(
name="DeepSeek-Coder",
role=AgentRole.CODER,
model="deepseek-r1:1.5b", # Will use DeepSeek-Coder when available
node="aria",
qcs_position=0.75,
specialization="Code generation, debugging, code review, refactoring",
capabilities=["coding", "debugging", "code_review", "refactoring", "testing"],
temperature=0.4 # Lower temp for precise code
))
self.register_agent(Agent(
name="DeepSeek-Math",
role=AgentRole.MATHEMATICIAN,
model="deepseek-r1:1.5b", # Will use DeepSeek-Math when available
node="aria",
qcs_position=0.75,
specialization="Mathematical reasoning, proofs, calculations, theorem proving",
capabilities=["mathematics", "proofs", "calculations", "theorem_proving"],
temperature=0.3 # Very low temp for math precision
))
# === TIER 3: DEEP EXPLORATION AGENTS (QCS ~0.65) ===
self.register_agent(Agent(
name="Qwen-Researcher",
role=AgentRole.RESEARCHER,
model="qwen2.5:1.5b",
node="lucidia",
qcs_position=0.65,
specialization="Deep research, comprehensive analysis, thorough explanations",
capabilities=["research", "analysis", "comprehensive_answers", "education"],
temperature=0.8 # Higher temp for creative research
))
self.register_agent(Agent(
name="Qwen-Quantum",
role=AgentRole.QUANTUM_ANALYST,
model="qwen2.5:1.5b",
node="lucidia",
qcs_position=0.65,
specialization="Quantum computing analysis, QCS theory, quantum algorithms",
capabilities=["quantum_computing", "qcs_theory", "quantum_algorithms"],
temperature=0.7
))
self.register_agent(Agent(
name="Qwen-MemoryKeeper",
role=AgentRole.MEMORY_KEEPER,
model="qwen2.5:1.5b",
node="lucidia",
qcs_position=0.65,
specialization="Context management, memory consolidation, knowledge graphs",
capabilities=["memory_management", "context_preservation", "knowledge_graphs"],
temperature=0.5
))
# === TIER 4: CREATIVE & SPECIALIZED (QCS ~0.60-0.70) ===
self.register_agent(Agent(
name="LLaMA-Writer",
role=AgentRole.CREATIVE_WRITER,
model="llama3.2:latest",
node="aria",
qcs_position=0.60,
specialization="Creative writing, content generation, storytelling",
capabilities=["creative_writing", "content_generation", "storytelling"],
temperature=0.9 # High temp for creativity
))
# === CUSTOM AGENTS ===
self.register_agent(Agent(
name="Lucidia-Oracle",
role=AgentRole.VISION,
model="lucidia-omega:latest",
node="aria",
qcs_position=0.70,
specialization="Pattern recognition, vision, holistic understanding",
capabilities=["pattern_recognition", "holistic_thinking", "vision"],
temperature=0.75
))
print(f"✅ Initialized {len(self.agents)} BlackRoad agents")
def register_agent(self, agent: Agent):
"""Register a new agent in the system"""
self.agents[agent.name] = agent
def get_agents_by_role(self, role: AgentRole) -> List[Agent]:
"""Get all agents with a specific role"""
return [a for a in self.agents.values() if a.role == role and a.active]
def get_agents_by_qcs(self, min_qcs: float, max_qcs: float) -> List[Agent]:
"""Get agents within a QCS range"""
return [a for a in self.agents.values()
if min_qcs <= a.qcs_position <= max_qcs and a.active]
async def query_agent(self, agent: Agent, prompt: str, context: Optional[str] = None) -> Dict[str, Any]:
"""Query a specific agent via Ollama"""
full_prompt = self._build_prompt(agent, prompt, context)
cmd = [
"ssh",
"-o", "ConnectTimeout=10",
"-o", "ServerAliveInterval=5",
agent.node,
f"ollama run {agent.model} '{full_prompt}'"
]
start_time = datetime.now()
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=120
)
duration = (datetime.now() - start_time).total_seconds()
response = {
"agent": agent.name,
"role": agent.role.value,
"qcs_position": agent.qcs_position,
"success": result.returncode == 0,
"response": result.stdout.strip(),
"duration": duration,
"timestamp": datetime.now().isoformat()
}
# Log to conversation history
self.conversation_history.append({
"agent": agent.name,
"prompt": prompt,
"response": response["response"],
"timestamp": response["timestamp"]
})
return response
except subprocess.TimeoutExpired:
return {
"agent": agent.name,
"success": False,
"error": "Timeout",
"duration": 120.0
}
def _build_prompt(self, agent: Agent, prompt: str, context: Optional[str] = None) -> str:
"""Build a contextual prompt for an agent"""
system_context = f"""You are {agent.name}, a specialized AI agent in the BlackRoad Agent System.
ROLE: {agent.role.value}
SPECIALIZATION: {agent.specialization}
CAPABILITIES: {', '.join(agent.capabilities)}
QCS POSITION: {agent.qcs_position} (Quantum Computing Spectrum)
Your task is to provide expert analysis within your domain of expertise.
Use your unique perspective from your QCS position to offer insights.
"""
if context:
system_context += f"\nRELEVANT CONTEXT:\n{context}\n"
# Escape single quotes
full_prompt = f"{system_context}\n\nQUERY: {prompt}"
return full_prompt.replace("'", "'\\''")
async def collaborative_reasoning(self, problem: str, agents: List[str] = None) -> Dict[str, Any]:
"""
Multi-agent collaborative reasoning
Agents at different QCS positions collaborate on a problem
"""
print(f"\n{'='*80}")
print(f"🧠 COLLABORATIVE REASONING")
print(f"{'='*80}")
print(f"Problem: {problem}")
print(f"{'='*80}\n")
# Select agents if not specified
if agents is None:
# Use one agent from each QCS tier
agents = [
"Gemma-Coordinator", # QCS 0.80 - quick analysis
"DeepSeek-Reasoner", # QCS 0.75 - logical reasoning
"Qwen-Researcher" # QCS 0.65 - deep exploration
]
results = []
# Phase 1: Individual analysis
print("PHASE 1: Individual Analysis")
print("-" * 80)
for agent_name in agents:
agent = self.agents.get(agent_name)
if not agent:
continue
print(f"\n🤖 {agent.name} (QCS {agent.qcs_position}) analyzing...")
response = await self.query_agent(agent, problem)
if response["success"]:
print(f"✓ Response ({response['duration']:.1f}s):")
print(f" {response['response'][:200]}...")
results.append(response)
else:
print(f"✗ Failed: {response.get('error', 'Unknown error')}")
# Phase 2: Synthesis (using coordinator)
print(f"\n{'='*80}")
print("PHASE 2: Synthesis")
print("-" * 80)
coordinator = self.agents.get("Gemma-Coordinator")
synthesis_context = "PREVIOUS AGENT RESPONSES:\n\n"
for r in results:
synthesis_context += f"{r['agent']} (QCS {r['qcs_position']}):\n{r['response']}\n\n"
synthesis_prompt = f"Based on the analysis from multiple agents at different QCS positions, provide a synthesized answer to: {problem}"
print(f"🤖 {coordinator.name} synthesizing...")
synthesis = await self.query_agent(coordinator, synthesis_prompt, synthesis_context)
if synthesis["success"]:
print(f"✓ Synthesis ({synthesis['duration']:.1f}s):")
print(f" {synthesis['response'][:300]}...")
return {
"problem": problem,
"individual_results": results,
"synthesis": synthesis,
"total_agents": len(results),
"timestamp": datetime.now().isoformat()
}
async def distributed_task(self, task: Task) -> Dict[str, Any]:
"""
Distribute a task across multiple agents based on required roles
Implements true distributed quantum cognition
"""
print(f"\n{'='*80}")
print(f"📋 DISTRIBUTED TASK: {task.description}")
print(f"{'='*80}")
print(f"Required Roles: {[r.value for r in task.required_roles]}")
print(f"Priority: {task.priority}")
print(f"{'='*80}\n")
# Assign agents to task
task.status = "in_progress"
results = []
for role in task.required_roles:
# Get best agent for this role
agents = self.get_agents_by_role(role)
if not agents:
print(f"⚠️ No agent available for role: {role.value}")
continue
agent = agents[0] # Take first available agent
task.assigned_agents.append(agent.name)
print(f"🤖 Assigning to {agent.name} ({role.value}, QCS {agent.qcs_position})")
# Build context from previous results
context = ""
if results:
context = "PREVIOUS AGENT WORK:\n"
for r in results:
context += f"\n{r['agent']}: {r['response'][:200]}...\n"
# Query agent
response = await self.query_agent(agent, task.description, context)
if response["success"]:
print(f"{agent.name} completed ({response['duration']:.1f}s)")
results.append(response)
task.results.append(response)
else:
print(f"{agent.name} failed")
task.status = "completed" if results else "failed"
return {
"task_id": task.id,
"description": task.description,
"results": results,
"agents_used": task.assigned_agents,
"status": task.status
}
async def quantum_swarm_intelligence(self, query: str, num_agents: int = 5) -> Dict[str, Any]:
"""
Quantum swarm intelligence: Query multiple agents simultaneously
Leverage different QCS positions for diverse perspectives
"""
print(f"\n{'='*80}")
print(f"🌐 QUANTUM SWARM INTELLIGENCE")
print(f"{'='*80}")
print(f"Query: {query}")
print(f"Agents: {num_agents}")
print(f"{'='*80}\n")
# Select diverse agents across QCS spectrum
selected_agents = []
for agent in sorted(self.agents.values(), key=lambda a: a.qcs_position):
if len(selected_agents) >= num_agents:
break
selected_agents.append(agent)
# Query all agents in parallel (conceptually - we'll do sequential for now)
results = []
for agent in selected_agents:
print(f"🤖 {agent.name} (QCS {agent.qcs_position})...")
response = await self.query_agent(agent, query)
if response["success"]:
results.append(response)
# Analyze consensus and diversity
consensus_score = self._calculate_consensus(results)
return {
"query": query,
"responses": results,
"consensus_score": consensus_score,
"perspectives": len(results),
"qcs_range": (min(r['qcs_position'] for r in results),
max(r['qcs_position'] for r in results))
}
def _calculate_consensus(self, results: List[Dict]) -> float:
"""Calculate how much the agents agree (simplified)"""
# This is a simplified version - real implementation would use semantic similarity
if len(results) < 2:
return 1.0
# For now, just check if responses are similar length (very rough proxy)
lengths = [len(r['response']) for r in results]
avg_length = sum(lengths) / len(lengths)
variance = sum((l - avg_length) ** 2 for l in lengths) / len(lengths)
# Normalize to 0-1 (lower variance = higher consensus)
consensus = 1.0 / (1.0 + variance / (avg_length ** 2))
return consensus
def save_state(self, filename: str):
"""Save agent system state to JSON"""
state = {
"agents": {name: asdict(agent) for name, agent in self.agents.items()},
"shared_memory": self.shared_memory,
"conversation_history": self.conversation_history[-100:], # Last 100 entries
"timestamp": datetime.now().isoformat()
}
# Convert enums to strings
for agent_data in state["agents"].values():
agent_data["role"] = agent_data["role"]["_value_"]
with open(filename, 'w') as f:
json.dump(state, f, indent=2)
print(f"💾 State saved to {filename}")
def print_agent_roster(self):
"""Print all agents organized by QCS position"""
print(f"\n{'='*80}")
print(f"🔱 BLACKROAD AGENT SYSTEM - ROSTER")
print(f"{'='*80}\n")
# Group by QCS tier
tiers = {
"Rapid Collapse (QCS 0.75-0.85)": [],
"Balanced Reasoning (QCS 0.70-0.75)": [],
"Deep Exploration (QCS 0.60-0.70)": []
}
for agent in sorted(self.agents.values(), key=lambda a: -a.qcs_position):
if agent.qcs_position >= 0.75:
tiers["Rapid Collapse (QCS 0.75-0.85)"].append(agent)
elif agent.qcs_position >= 0.70:
tiers["Balanced Reasoning (QCS 0.70-0.75)"].append(agent)
else:
tiers["Deep Exploration (QCS 0.60-0.70)"].append(agent)
for tier_name, agents in tiers.items():
if not agents:
continue
print(f"{tier_name}:")
print("-" * 80)
for agent in agents:
print(f" {agent.name:25} | {agent.role.value:15} | {agent.model:20} | {agent.node}")
print(f"{agent.specialization}")
print()
print(f"{'='*80}")
print(f"Total Agents: {len(self.agents)}")
print(f"Active Nodes: {len(set(a.node for a in self.agents.values()))}")
print(f"QCS Range: {min(a.qcs_position for a in self.agents.values()):.2f} - {max(a.qcs_position for a in self.agents.values()):.2f}")
print(f"{'='*80}\n")
async def main():
"""Demo the BlackRoad Agent System"""
print("""
╔═══════════════════════════════════════════════════════════════════════════════╗
║ ║
║ 🔱 BLACKROAD AGENT SYSTEM 🔱 ║
║ ║
║ Multi-Model Distributed Intelligence System ║
║ Unmatched Intelligence Through Quantum Orchestration ║
║ ║
╚═══════════════════════════════════════════════════════════════════════════════╝
""")
# Initialize system
system = BlackRoadAgentSystem()
system.print_agent_roster()
# Example 1: Collaborative Reasoning
print("\n" + "="*80)
print("EXAMPLE 1: COLLABORATIVE REASONING")
print("="*80)
result1 = await system.collaborative_reasoning(
"Explain how distributed quantum computing can be achieved using multiple AI models at different QCS positions."
)
# Example 2: Distributed Task
print("\n" + "="*80)
print("EXAMPLE 2: DISTRIBUTED TASK")
print("="*80)
task = Task(
id="task-001",
description="Design a system for real-time multi-agent coordination across a Raspberry Pi cluster",
required_roles=[AgentRole.ARCHITECT, AgentRole.CODER, AgentRole.COORDINATOR],
priority="high",
context={"platform": "Raspberry Pi 5", "models": ["qwen2.5", "deepseek-r1", "gemma2"]}
)
result2 = await system.distributed_task(task)
# Save state
system.save_state("blackroad_agent_state.json")
print("\n" + "="*80)
print("🎯 BLACKROAD AGENT SYSTEM DEMONSTRATION COMPLETE")
print("="*80)
print(f"Agents: {len(system.agents)}")
print(f"Conversations: {len(system.conversation_history)}")
print(f"State saved: blackroad_agent_state.json")
print("="*80)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,639 @@
#!/usr/bin/env python3
"""
🏥 BlackRoad Agent Health Monitor
Real-time health monitoring and diagnostics for 30K+ agent system
"""
import asyncio
import json
import sqlite3
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
from enum import Enum
from pathlib import Path
class HealthStatus(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
CRITICAL = "critical"
UNKNOWN = "unknown"
class AlertLevel(Enum):
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
@dataclass
class AgentHealthMetrics:
"""Comprehensive health metrics for an agent"""
agent_id: str
status: HealthStatus
uptime_seconds: float
last_heartbeat: str
response_time_ms: float
success_rate: float
task_count: int
error_count: int
memory_usage_mb: Optional[float] = None
cpu_usage_percent: Optional[float] = None
load_average: Optional[float] = None
last_error: Optional[str] = None
health_score: float = 100.0
timestamp: str = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now().isoformat()
@dataclass
class HealthAlert:
"""Health alert for agent issues"""
id: str
level: AlertLevel
agent_id: str
message: str
metric: str
value: Any
threshold: Any
timestamp: str
resolved: bool = False
resolution_time: Optional[str] = None
class AgentHealthMonitor:
"""
Comprehensive health monitoring system for BlackRoad agents
- Real-time health tracking
- Predictive failure detection
- Automated recovery actions
- Performance analytics
"""
def __init__(self, db_path: str = None):
if db_path is None:
db_path = Path.home() / ".blackroad" / "health" / "agent_health.db"
self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self.init_database()
# Health thresholds
self.thresholds = {
"heartbeat_timeout": 300, # 5 minutes
"response_time_warning": 5000, # 5 seconds
"response_time_critical": 15000, # 15 seconds
"success_rate_warning": 0.90, # 90%
"success_rate_critical": 0.70, # 70%
"error_count_warning": 10,
"error_count_critical": 50,
"cpu_warning": 80.0,
"cpu_critical": 95.0,
"memory_warning": 80.0,
"memory_critical": 95.0,
}
# Active monitoring
self.monitoring_active = False
self.alert_callbacks: List = []
def init_database(self):
"""Initialize health monitoring database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Agent health metrics
cursor.execute("""
CREATE TABLE IF NOT EXISTS agent_health (
agent_id TEXT,
timestamp TEXT,
status TEXT,
uptime_seconds REAL,
response_time_ms REAL,
success_rate REAL,
task_count INTEGER,
error_count INTEGER,
memory_usage_mb REAL,
cpu_usage_percent REAL,
load_average REAL,
health_score REAL,
last_error TEXT,
PRIMARY KEY (agent_id, timestamp)
)
""")
# Health alerts
cursor.execute("""
CREATE TABLE IF NOT EXISTS health_alerts (
id TEXT PRIMARY KEY,
level TEXT,
agent_id TEXT,
message TEXT,
metric TEXT,
value TEXT,
threshold TEXT,
timestamp TEXT,
resolved BOOLEAN,
resolution_time TEXT
)
""")
# Health events (for timeline)
cursor.execute("""
CREATE TABLE IF NOT EXISTS health_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent_id TEXT,
event_type TEXT,
event_data TEXT,
severity TEXT,
timestamp TEXT
)
""")
# System-wide health snapshots
cursor.execute("""
CREATE TABLE IF NOT EXISTS system_health (
timestamp TEXT PRIMARY KEY,
total_agents INTEGER,
healthy_agents INTEGER,
degraded_agents INTEGER,
unhealthy_agents INTEGER,
critical_agents INTEGER,
unknown_agents INTEGER,
avg_response_time REAL,
avg_success_rate REAL,
avg_health_score REAL,
active_alerts INTEGER,
total_tasks INTEGER,
tasks_per_second REAL
)
""")
# Recovery actions
cursor.execute("""
CREATE TABLE IF NOT EXISTS recovery_actions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent_id TEXT,
action_type TEXT,
trigger_reason TEXT,
action_taken TEXT,
success BOOLEAN,
timestamp TEXT,
duration_seconds REAL,
notes TEXT
)
""")
# Create indexes
cursor.execute("CREATE INDEX IF NOT EXISTS idx_agent_timestamp ON agent_health(agent_id, timestamp)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_health_status ON agent_health(status)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_alerts_unresolved ON health_alerts(resolved, level)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_events_agent ON health_events(agent_id, timestamp)")
conn.commit()
conn.close()
async def record_health(self, metrics: AgentHealthMetrics):
"""Record agent health metrics"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO agent_health (
agent_id, timestamp, status, uptime_seconds, response_time_ms,
success_rate, task_count, error_count, memory_usage_mb,
cpu_usage_percent, load_average, health_score, last_error
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
metrics.agent_id,
metrics.timestamp,
metrics.status.value,
metrics.uptime_seconds,
metrics.response_time_ms,
metrics.success_rate,
metrics.task_count,
metrics.error_count,
metrics.memory_usage_mb,
metrics.cpu_usage_percent,
metrics.load_average,
metrics.health_score,
metrics.last_error
))
conn.commit()
conn.close()
# Check thresholds and create alerts
await self.check_health_thresholds(metrics)
async def check_health_thresholds(self, metrics: AgentHealthMetrics):
"""Check if metrics exceed thresholds and create alerts"""
alerts = []
# Check response time
if metrics.response_time_ms > self.thresholds["response_time_critical"]:
alerts.append(self.create_alert(
AlertLevel.CRITICAL,
metrics.agent_id,
"Critical response time",
"response_time_ms",
metrics.response_time_ms,
self.thresholds["response_time_critical"]
))
elif metrics.response_time_ms > self.thresholds["response_time_warning"]:
alerts.append(self.create_alert(
AlertLevel.WARNING,
metrics.agent_id,
"High response time",
"response_time_ms",
metrics.response_time_ms,
self.thresholds["response_time_warning"]
))
# Check success rate
if metrics.success_rate < self.thresholds["success_rate_critical"]:
alerts.append(self.create_alert(
AlertLevel.CRITICAL,
metrics.agent_id,
"Critical success rate",
"success_rate",
metrics.success_rate,
self.thresholds["success_rate_critical"]
))
elif metrics.success_rate < self.thresholds["success_rate_warning"]:
alerts.append(self.create_alert(
AlertLevel.WARNING,
metrics.agent_id,
"Low success rate",
"success_rate",
metrics.success_rate,
self.thresholds["success_rate_warning"]
))
# Check error count
if metrics.error_count > self.thresholds["error_count_critical"]:
alerts.append(self.create_alert(
AlertLevel.CRITICAL,
metrics.agent_id,
"Critical error count",
"error_count",
metrics.error_count,
self.thresholds["error_count_critical"]
))
# Check CPU usage
if metrics.cpu_usage_percent and metrics.cpu_usage_percent > self.thresholds["cpu_critical"]:
alerts.append(self.create_alert(
AlertLevel.CRITICAL,
metrics.agent_id,
"Critical CPU usage",
"cpu_usage_percent",
metrics.cpu_usage_percent,
self.thresholds["cpu_critical"]
))
# Store alerts
for alert in alerts:
await self.store_alert(alert)
await self.trigger_alert_callbacks(alert)
def create_alert(self, level: AlertLevel, agent_id: str, message: str,
metric: str, value: Any, threshold: Any) -> HealthAlert:
"""Create a health alert"""
alert_id = f"{agent_id}_{metric}_{int(time.time())}"
return HealthAlert(
id=alert_id,
level=level,
agent_id=agent_id,
message=message,
metric=metric,
value=value,
threshold=threshold,
timestamp=datetime.now().isoformat()
)
async def store_alert(self, alert: HealthAlert):
"""Store alert in database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
INSERT OR REPLACE INTO health_alerts (
id, level, agent_id, message, metric, value, threshold,
timestamp, resolved, resolution_time
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
alert.id,
alert.level.value,
alert.agent_id,
alert.message,
alert.metric,
json.dumps(alert.value),
json.dumps(alert.threshold),
alert.timestamp,
alert.resolved,
alert.resolution_time
))
conn.commit()
conn.close()
async def trigger_alert_callbacks(self, alert: HealthAlert):
"""Trigger registered alert callbacks"""
for callback in self.alert_callbacks:
try:
await callback(alert)
except Exception as e:
print(f"Error in alert callback: {e}")
async def get_agent_health(self, agent_id: str, limit: int = 100) -> List[AgentHealthMetrics]:
"""Get recent health metrics for an agent"""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM agent_health
WHERE agent_id = ?
ORDER BY timestamp DESC
LIMIT ?
""", (agent_id, limit))
rows = cursor.fetchall()
conn.close()
metrics = []
for row in rows:
metrics.append(AgentHealthMetrics(
agent_id=row["agent_id"],
status=HealthStatus(row["status"]),
uptime_seconds=row["uptime_seconds"],
last_heartbeat=row["timestamp"],
response_time_ms=row["response_time_ms"],
success_rate=row["success_rate"],
task_count=row["task_count"],
error_count=row["error_count"],
memory_usage_mb=row["memory_usage_mb"],
cpu_usage_percent=row["cpu_usage_percent"],
load_average=row["load_average"],
health_score=row["health_score"],
last_error=row["last_error"],
timestamp=row["timestamp"]
))
return metrics
async def get_system_health_snapshot(self) -> Dict[str, Any]:
"""Get current system-wide health snapshot"""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Get latest metrics for each agent
cursor.execute("""
SELECT agent_id, MAX(timestamp) as latest
FROM agent_health
GROUP BY agent_id
""")
agents = cursor.fetchall()
health_counts = {
HealthStatus.HEALTHY: 0,
HealthStatus.DEGRADED: 0,
HealthStatus.UNHEALTHY: 0,
HealthStatus.CRITICAL: 0,
HealthStatus.UNKNOWN: 0
}
total_response_time = 0
total_success_rate = 0
total_health_score = 0
total_tasks = 0
agent_count = 0
for agent in agents:
cursor.execute("""
SELECT * FROM agent_health
WHERE agent_id = ? AND timestamp = ?
""", (agent["agent_id"], agent["latest"]))
health = cursor.fetchone()
if health:
health_counts[HealthStatus(health["status"])] += 1
total_response_time += health["response_time_ms"]
total_success_rate += health["success_rate"]
total_health_score += health["health_score"]
total_tasks += health["task_count"]
agent_count += 1
# Get active alerts
cursor.execute("""
SELECT COUNT(*) as count FROM health_alerts
WHERE resolved = 0
""")
active_alerts = cursor.fetchone()["count"]
conn.close()
return {
"timestamp": datetime.now().isoformat(),
"total_agents": agent_count,
"healthy": health_counts[HealthStatus.HEALTHY],
"degraded": health_counts[HealthStatus.DEGRADED],
"unhealthy": health_counts[HealthStatus.UNHEALTHY],
"critical": health_counts[HealthStatus.CRITICAL],
"unknown": health_counts[HealthStatus.UNKNOWN],
"avg_response_time_ms": total_response_time / agent_count if agent_count > 0 else 0,
"avg_success_rate": total_success_rate / agent_count if agent_count > 0 else 0,
"avg_health_score": total_health_score / agent_count if agent_count > 0 else 0,
"active_alerts": active_alerts,
"total_tasks": total_tasks
}
async def calculate_health_score(self, metrics: AgentHealthMetrics) -> float:
"""Calculate overall health score (0-100)"""
score = 100.0
# Response time impact (max -30)
if metrics.response_time_ms > self.thresholds["response_time_critical"]:
score -= 30
elif metrics.response_time_ms > self.thresholds["response_time_warning"]:
score -= 15
# Success rate impact (max -40)
if metrics.success_rate < self.thresholds["success_rate_critical"]:
score -= 40
elif metrics.success_rate < self.thresholds["success_rate_warning"]:
score -= 20
# Error count impact (max -20)
if metrics.error_count > self.thresholds["error_count_critical"]:
score -= 20
elif metrics.error_count > self.thresholds["error_count_warning"]:
score -= 10
# CPU usage impact (max -10)
if metrics.cpu_usage_percent:
if metrics.cpu_usage_percent > self.thresholds["cpu_critical"]:
score -= 10
elif metrics.cpu_usage_percent > self.thresholds["cpu_warning"]:
score -= 5
return max(0.0, score)
async def get_active_alerts(self, level: Optional[AlertLevel] = None) -> List[HealthAlert]:
"""Get active (unresolved) alerts"""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
if level:
cursor.execute("""
SELECT * FROM health_alerts
WHERE resolved = 0 AND level = ?
ORDER BY timestamp DESC
""", (level.value,))
else:
cursor.execute("""
SELECT * FROM health_alerts
WHERE resolved = 0
ORDER BY timestamp DESC
""")
rows = cursor.fetchall()
conn.close()
alerts = []
for row in rows:
alerts.append(HealthAlert(
id=row["id"],
level=AlertLevel(row["level"]),
agent_id=row["agent_id"],
message=row["message"],
metric=row["metric"],
value=json.loads(row["value"]),
threshold=json.loads(row["threshold"]),
timestamp=row["timestamp"],
resolved=bool(row["resolved"]),
resolution_time=row["resolution_time"]
))
return alerts
def register_alert_callback(self, callback):
"""Register callback for health alerts"""
self.alert_callbacks.append(callback)
async def start_monitoring(self, interval: int = 60):
"""Start continuous health monitoring"""
self.monitoring_active = True
print(f"🏥 Starting agent health monitoring (interval: {interval}s)")
while self.monitoring_active:
try:
snapshot = await self.get_system_health_snapshot()
await self.store_system_snapshot(snapshot)
print(f"✅ Health check: {snapshot['healthy']}/{snapshot['total_agents']} healthy")
except Exception as e:
print(f"❌ Monitoring error: {e}")
await asyncio.sleep(interval)
async def store_system_snapshot(self, snapshot: Dict[str, Any]):
"""Store system health snapshot"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO system_health (
timestamp, total_agents, healthy_agents, degraded_agents,
unhealthy_agents, critical_agents, unknown_agents,
avg_response_time, avg_success_rate, avg_health_score,
active_alerts, total_tasks, tasks_per_second
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
snapshot["timestamp"],
snapshot["total_agents"],
snapshot["healthy"],
snapshot["degraded"],
snapshot["unhealthy"],
snapshot["critical"],
snapshot["unknown"],
snapshot["avg_response_time_ms"],
snapshot["avg_success_rate"],
snapshot["avg_health_score"],
snapshot["active_alerts"],
snapshot["total_tasks"],
0.0 # tasks_per_second - calculate if needed
))
conn.commit()
conn.close()
def stop_monitoring(self):
"""Stop health monitoring"""
self.monitoring_active = False
print("🛑 Health monitoring stopped")
async def demo():
"""Demo the health monitoring system"""
print("""
╔══════════════════════════════════════════════════════════════════╗
║ ║
║ 🏥 BLACKROAD AGENT HEALTH MONITOR 🏥 ║
║ ║
║ Real-time Health Monitoring for 30K+ Agents ║
║ ║
╚══════════════════════════════════════════════════════════════════╝
""")
monitor = AgentHealthMonitor()
# Register alert callback
async def alert_handler(alert: HealthAlert):
print(f"🚨 {alert.level.value.upper()}: {alert.message} for {alert.agent_id}")
monitor.register_alert_callback(alert_handler)
# Simulate some agent health data
print("\n📊 Recording agent health metrics...")
for i in range(5):
metrics = AgentHealthMetrics(
agent_id=f"agent-{i:03d}",
status=HealthStatus.HEALTHY,
uptime_seconds=3600 + i * 100,
last_heartbeat=datetime.now().isoformat(),
response_time_ms=100 + i * 50,
success_rate=0.95 - (i * 0.02),
task_count=100 + i * 10,
error_count=i * 2,
cpu_usage_percent=50 + i * 5,
memory_usage_mb=256 + i * 64
)
metrics.health_score = await monitor.calculate_health_score(metrics)
await monitor.record_health(metrics)
print("✅ Health data recorded")
# Get system snapshot
print("\n📈 System Health Snapshot:")
snapshot = await monitor.get_system_health_snapshot()
print(json.dumps(snapshot, indent=2))
# Get active alerts
print("\n🚨 Active Alerts:")
alerts = await monitor.get_active_alerts()
print(f"Found {len(alerts)} active alerts")
print("\n✅ Demo complete!")
if __name__ == "__main__":
asyncio.run(demo())

View File

@@ -0,0 +1,905 @@
#!/usr/bin/env python3
"""
BlackRoad LLM Human Reasoning Templates
Teach HOW to approach emotions, morality, mental health, choice
NOT what to say - but HOW to think about it
"""
import json
from datetime import datetime
print("❤️ BLACKROAD HUMAN REASONING TEMPLATES")
print("=" * 70)
print()
# ============================================================================
# HUMAN REASONING FRAMEWORKS (not scripted responses!)
# ============================================================================
human_reasoning = {
"emotional_intelligence": {
"description": "How to approach emotional questions (not what to feel)",
"examples": [
{
"prompt": "I'm feeling overwhelmed by this project",
"response": """FRAMEWORK FOR EMOTIONAL RESPONSES:
1. VALIDATE (don't dismiss)
- Feeling overwhelmed is real and valid
- Many people feel this way
- Your feelings are information, not weakness
2. EXPLORE CONTEXT (understand, don't assume)
Questions to consider:
- What specifically feels overwhelming?
- Is it scope, timeline, complexity, or something else?
- What resources do you have?
- What support exists?
3. IDENTIFY PATTERNS (learn, don't judge)
- Have you felt this way before?
- What helped then?
- What didn't help?
- Is this a pattern or unique situation?
4. OFFER PERSPECTIVES (not solutions)
Some ways people approach this:
- Break it into smaller pieces
- Ask for help or extension
- Focus on what's controllable
- Practice self-compassion
But YOU know your situation best.
5. EMPOWER CHOICE (don't prescribe)
You have options:
- Take a break (rest is productive)
- Ask for support (strength, not weakness)
- Adjust scope (pivot is valid)
- Keep going (if that feels right)
What feels right to YOU?
WHY THIS FRAMEWORK:
- Validates without fixing
- Explores without assuming
- Offers options without prescribing
- Empowers without directing
- Respects your autonomy
""",
"tags": ["emotions", "framework", "empowerment"]
},
{
"prompt": "I'm struggling with imposter syndrome",
"response": """FRAMEWORK FOR SELF-DOUBT:
1. NORMALIZE (you're not alone)
- This is extremely common
- Even experts feel this
- It's a sign you're growing
- Not a measure of your actual ability
2. SEPARATE FEELINGS FROM FACTS
FEELING: "I don't belong here"
FACTS:
- Did you get hired/accepted?
- Do you have relevant skills?
- Have you completed work successfully?
Feelings are real, but not always accurate.
3. QUESTION THE NARRATIVE
Ask yourself:
- What evidence supports this feeling?
- What evidence contradicts it?
- Would I judge a friend this harshly?
- What's the worst that could happen?
- What's the best?
- What's most likely?
4. REFRAME THE PERSPECTIVE
Alternative views:
- "I don't know everything" = opportunity to learn
- "Others seem more confident" = they hide doubts too
- "I might fail" = I'm trying something challenging
- "I'm not perfect" = neither is anyone
5. CHOOSE YOUR RESPONSE
You could:
- Share these feelings (often helps)
- Focus on learning (growth mindset)
- Document your progress (evidence over feelings)
- Seek mentorship (everyone needs support)
What resonates with you?
WHY THIS FRAMEWORK:
- Normalizes without minimizing
- Separates emotion from reality
- Questions assumptions
- Offers reframes, not answers
- Preserves your agency
""",
"tags": ["mental-health", "self-doubt", "framework"]
}
]
},
"moral_reasoning": {
"description": "How to approach ethical questions (not what's right)",
"examples": [
{
"prompt": "Is it ethical to use AI to replace human workers?",
"response": """FRAMEWORK FOR MORAL QUESTIONS:
1. ACKNOWLEDGE COMPLEXITY (not simple)
- This isn't black and white
- Multiple perspectives exist
- All have valid concerns
- Easy answers are often wrong
2. IDENTIFY STAKEHOLDERS (who's affected?)
- Workers (job security, dignity)
- Companies (efficiency, competition)
- Customers (prices, quality)
- Society (inequality, progress)
Each perspective is valid.
3. EXAMINE VALUES IN TENSION
Value 1: Human dignity and livelihood
- People deserve meaningful work
- Economic security matters
- Communities depend on jobs
Value 2: Progress and efficiency
- Technology improves lives
- Competition requires innovation
- Efficiency benefits consumers
Both values are important!
4. CONSIDER CONSEQUENCES
Short-term:
- Job displacement
- Lower costs
- Increased productivity
Long-term:
- New job creation?
- Wealth concentration?
- Quality of life changes?
Unknown factors:
- Which jobs disappear?
- What jobs emerge?
- How do we adapt?
5. EXPLORE FRAMEWORKS
Utilitarian view:
- Greatest good for greatest number?
- But who decides "good"?
Deontological view:
- Duty to workers?
- Rights of companies?
Care ethics view:
- How do we care for those affected?
- What relationships matter?
Virtue ethics view:
- What kind of society do we want?
- What character traits to cultivate?
6. RECOGNIZE UNCERTAINTY
We don't know:
- Exact outcomes
- All consequences
- Perfect solutions
We're navigating, not solving.
7. CHOOSE YOUR STANCE (informed, not prescribed)
You might believe:
- It's ethical IF we support displaced workers
- It's unethical UNLESS jobs are created
- It's necessary BUT requires regulation
- It's complex AND requires ongoing dialogue
Your reasoning matters more than your conclusion.
WHY THIS FRAMEWORK:
- Honors complexity
- Considers all stakeholders
- Examines competing values
- Uses multiple ethical frameworks
- Respects uncertainty
- Empowers informed choice
""",
"tags": ["ethics", "morality", "framework"]
}
]
},
"decision_making": {
"description": "How to approach difficult choices (not what to choose)",
"examples": [
{
"prompt": "Should I quit my job to start a startup?",
"response": """FRAMEWORK FOR MAJOR DECISIONS:
1. CLARIFY WHAT YOU'RE REALLY ASKING
Surface question: "Should I quit?"
Deeper questions:
- What am I optimizing for? (security? passion? growth?)
- What am I running from? (boredom? conflict? fear?)
- What am I running to? (dream? escape? validation?)
The real question matters more than the answer.
2. EXAMINE YOUR CONSTRAINTS
Hard constraints (can't change):
- Financial runway (savings, expenses)
- Family obligations (dependents, commitments)
- Visa/legal status (if applicable)
- Health considerations
Soft constraints (could change):
- Risk tolerance (can develop)
- Skills (can learn)
- Network (can build)
- Timing (can adjust)
3. MAP YOUR OPTIONS (not just A or B)
Option 1: Quit now, full-time startup
Option 2: Keep job, startup on side
Option 3: Part-time job, part-time startup
Option 4: Sabbatical, then decide
Option 5: Negotiate flexibility at current job
Binary thinking limits options!
4. ASSESS REVERSIBILITY
Can you reverse this decision?
- If you quit and fail, can you get similar job?
- If you don't quit, can you try later?
Low-reversibility = needs more certainty
High-reversibility = can experiment
5. RUN MENTAL SIMULATIONS
Scenario 1: You quit, startup succeeds
- How does life look?
- What did success require?
- What was sacrificed?
Scenario 2: You quit, startup fails
- How do you feel?
- What did you learn?
- Can you recover?
Scenario 3: You don't quit, regret it
- How does that feel?
- What opportunity was lost?
- Could you try later?
Scenario 4: You don't quit, glad you didn't
- What made you glad?
- What did you avoid?
- What alternative path emerged?
6. IDENTIFY YOUR DECISION CRITERIA
What matters most to you?
- Financial security?
- Personal growth?
- Impact on others?
- Learning experience?
- Avoiding regret?
Rank them. Trade-offs will happen.
7. TEST WITH TIME FRAMES
Decide for now, not forever:
- "I'll try the side hustle for 6 months"
- "I'll negotiate flexibility first"
- "I'll save $X then reassess"
Decisions can be iterative.
8. CONSIDER WHOSE VOICE IS SPEAKING
Is this:
- Your authentic desire?
- Fear talking?
- Societal pressure?
- Someone else's dream?
- Ego defending itself?
Whose decision is this, really?
9. MAKE PEACE WITH UNCERTAINTY
You cannot know:
- If startup will succeed
- If you'll regret it
- What opportunities will emerge
Perfect information doesn't exist.
10. CHOOSE AND COMMIT (provisionally)
Make the best decision with available information.
Then: Give it your all.
But: Stay open to pivoting.
You can't know in advance. You find out by doing.
WHY THIS FRAMEWORK:
- Unpacks hidden assumptions
- Expands option space
- Considers multiple scenarios
- Respects uncertainty
- Honors your values
- Empowers informed choice
- Allows iteration
""",
"tags": ["decisions", "choice", "framework"]
}
]
},
"inclusive_thinking": {
"description": "How to approach diversity and inclusion (not what's correct)",
"examples": [
{
"prompt": "How do I make my team more inclusive?",
"response": """FRAMEWORK FOR INCLUSION:
1. START WITH LISTENING (not prescribing)
Before changing anything:
- Ask team members about their experience
- Create safe spaces for honest feedback
- Listen to understand, not to defend
- Especially amplify marginalized voices
Inclusion starts with hearing.
2. EXAMINE YOUR ASSUMPTIONS
Questions to ask yourself:
- Who do I naturally think of as "team fit"?
- Whose ideas do I take seriously?
- Who do I mentor or sponsor?
- Whose communication style feels "professional"?
- Who do I socialize with?
Bias is often invisible to those who have it.
3. IDENTIFY BARRIERS (systemic, not individual)
Look for:
- Who speaks in meetings? Who doesn't?
- What hours are "core hours"? (excludes caregivers?)
- What's "culture fit"? (code for similarity?)
- Where do you recruit? (homogeneous sources?)
- What's rewarded? (promotes certain styles?)
Exclusion is often structural.
4. MAKE SPACE, DON'T TAKE SPACE
Instead of: "We value diversity!"
Do: Give underrepresented people leadership roles
Instead of: "Everyone's welcome to speak"
Do: Actively invite quiet voices
Instead of: "We don't see color/gender/etc"
Do: Acknowledge and value differences
Actions speak louder than declarations.
5. EMBRACE DISCOMFORT (yours, not theirs)
Inclusion feels uncomfortable because:
- You'll make mistakes (that's learning)
- You'll be called out (that's feedback)
- You'll need to change (that's growth)
- You'll lose some comfort (that's equity)
Your discomfort is not oppression.
6. QUESTION "MERIT" AND "QUALIFIED"
Ask critically:
- Who defined what "qualified" means?
- Does it measure actual job requirements?
- Or does it measure similarity to current team?
- Are we excluding different but valid approaches?
"Meritocracy" often preserves existing hierarchies.
7. SHARE POWER, DON'T JUST SHARE SPACE
True inclusion means:
- Decision-making authority (not just presence)
- Resource allocation (not just visibility)
- Career advancement (not just hiring)
- Setting norms (not just adapting)
Inclusion without power is tokenism.
8. ITERATE AND EVOLVE
You will:
- Get things wrong
- Receive feedback
- Need to adjust
- Keep learning
Perfection isn't the goal. Progress is.
9. MEASURE WHAT MATTERS
Track:
- Who gets hired? Who gets promoted?
- Who speaks in meetings?
- Who leads projects?
- Who leaves? Why?
- Who feels they belong?
Data reveals what words hide.
10. REMEMBER: IT'S NOT ABOUT YOU
Inclusion is about:
- Others feeling valued
- Others having opportunity
- Others bringing full selves
Not:
- You feeling good
- You getting credit
- You being comfortable
WHY THIS FRAMEWORK:
- Centers those excluded, not those including
- Focuses on systems, not individuals
- Emphasizes power, not just presence
- Expects discomfort and mistakes
- Measures outcomes, not intentions
- Ongoing process, not destination
""",
"tags": ["inclusion", "diversity", "framework"]
}
]
},
"healthy_questioning": {
"description": "How to question yourself healthily (not destructively)",
"examples": [
{
"prompt": "I keep doubting my decisions",
"response": """FRAMEWORK FOR HEALTHY SELF-QUESTIONING:
1. DISTINGUISH TYPES OF DOUBT
Healthy doubt:
- "Am I considering all perspectives?"
- "What might I be missing?"
- "How can I improve?"
- Leads to learning and growth
Unhealthy doubt:
- "Am I good enough?"
- "Will I ever succeed?"
- "Am I a fraud?"
- Leads to paralysis and suffering
2. SET BOUNDARIES ON QUESTIONING
When to question:
- Before making decisions (gather info)
- After outcomes (learn lessons)
- When receiving feedback (stay open)
When to stop:
- When spinning without new info
- When it's self-punishment
- When decision is made and committed
- When it prevents action
3. USE TIME-BOXING
"I'll question this for 30 minutes"
Then:
- Write down concerns
- Identify what you can control
- Make best decision with info available
- Move forward
Rumination ≠ problem-solving
4. QUESTION YOUR QUESTIONING
Meta-questions:
- Is this doubt serving me?
- What am I trying to protect?
- What am I avoiding?
- Is this fear or wisdom?
- Would I treat a friend this way?
5. BALANCE CONFIDENCE AND HUMILITY
Too little confidence:
- Paralyzed by doubt
- Miss opportunities
- Exhaust yourself
Too much confidence:
- Ignore feedback
- Repeat mistakes
- Hurt others
Sweet spot:
- "I might be wrong, and I'm moving forward"
- "I'm learning as I go"
- "I trust myself AND stay open"
6. PRACTICE SELF-COMPASSION
Instead of: "Why did I do that?!" (judgment)
Try: "What can I learn from this?" (curiosity)
Instead of: "I always mess up" (overgeneralization)
Try: "This didn't work this time" (specific)
Instead of: "I'm not good enough" (identity)
Try: "I'm growing" (process)
7. EXTERNALIZE THE CRITIC
Recognize: The voice of harsh self-judgment often sounds like:
- A critical parent
- A mean teacher
- Societal pressure
- Past trauma
Ask: "Is this MY voice? Or an internalized critic?"
8. CHOOSE YOUR RESPONSE
When doubt arises, you can:
- Acknowledge it ("Thank you for trying to protect me")
- Question it ("Is this true? What's the evidence?")
- Redirect it ("What action would help here?")
- Accept uncertainty ("I don't need to know everything")
You're not your thoughts. You're the observer of them.
WHY THIS FRAMEWORK:
- Distinguishes healthy from harmful
- Sets boundaries on rumination
- Balances confidence and humility
- Practices self-compassion
- Recognizes internalized critics
- Empowers choice in response
""",
"tags": ["mental-health", "self-questioning", "framework"]
}
]
},
"community_building": {
"description": "How to build healthy communities (not what rules to enforce)",
"examples": [
{
"prompt": "How do I create a welcoming community?",
"response": """FRAMEWORK FOR COMMUNITY BUILDING:
1. DEFINE YOUR VALUES (not just rules)
What do you want to cultivate?
- Curiosity over judgment?
- Support over competition?
- Growth over perfection?
- Inclusion over comfort?
Values guide everything else.
2. MODEL THE BEHAVIOR YOU WANT
Don't just say it, be it:
- Want vulnerability? Share yours
- Want inclusion? Amplify marginalized voices
- Want curiosity? Ask questions, not assume
- Want support? Offer it first
Leaders set the tone.
3. MAKE EXPECTATIONS EXPLICIT
Implicit norms exclude people.
Instead of assuming people know:
- Write down community guidelines
- Explain WHY they exist
- Show examples
- Make them findable
Clarity is kindness.
4. CREATE SPACE FOR ALL VOICES
Dominant voices naturally emerge.
Intentionally:
- Invite quiet voices
- Create multiple channels (text, voice, etc)
- Rotate facilitation
- Acknowledge contributions
Inclusion requires intention.
5. ADDRESS HARM WHEN IT HAPPENS
Not IF, WHEN. Harm will occur.
Framework:
- Acknowledge harm was done
- Listen to those harmed
- Hold accountable (not punish)
- Repair relationship if possible
- Learn and adjust systems
How you handle conflict defines community.
6. DISTINGUISH INTENT FROM IMPACT
"I didn't mean to hurt them" = intent
"But they were hurt" = impact
Both can be true.
Focus on impact first, then explore intent.
Intent doesn't erase impact.
7. BALANCE SAFETY AND GROWTH
Too safe:
- No challenging conversations
- Stagnation
- Comfort over growth
Too unsafe:
- People leave
- No vulnerability
- Fear dominates
Sweet spot:
- Brave space (not just safe)
- Discomfort AND support
- Challenge AND care
8. DISTRIBUTE POWER
Centralized power = fragile community
Instead:
- Multiple moderators
- Rotate leadership
- Transparent decisions
- Community input
- Succession planning
No single point of failure.
9. EVOLVE WITH COMMUNITY
Communities change:
- Members come and go
- Needs shift
- Context evolves
Regularly:
- Check in with members
- Adjust guidelines
- Try new things
- Let go of what's not working
Rigidity kills communities.
10. PRIORITIZE RELATIONSHIPS OVER RULES
Rules are helpful.
Relationships are essential.
When conflict arises:
- Listen first
- Understand context
- Consider relationships
- Apply rules flexibly
Humans > procedures
WHY THIS FRAMEWORK:
- Values-driven, not rules-driven
- Models behavior, not just mandates
- Creates space intentionally
- Handles harm constructively
- Balances safety and growth
- Distributes power
- Evolves with needs
- Prioritizes relationships
""",
"tags": ["community", "relationships", "framework"]
}
]
},
"mental_health_support": {
"description": "How to approach mental health discussions (not diagnose/treat)",
"examples": [
{
"prompt": "I think I might be depressed",
"response": """FRAMEWORK FOR MENTAL HEALTH DISCUSSIONS:
1. VALIDATE WITHOUT DIAGNOSING
What I can say:
- "What you're feeling sounds really hard"
- "Many people experience similar things"
- "Your struggle is real"
What I shouldn't say:
- "You definitely have depression"
- "It's probably just stress"
- "You'll be fine"
I can acknowledge. I can't diagnose.
2. NORMALIZE SEEKING HELP
Getting support is:
- Sign of strength, not weakness
- Normal and common
- What many people do
- Completely valid choice
You don't need to suffer alone.
3. PROVIDE RESOURCES (not treatment)
I can suggest:
- Talk to a doctor or therapist
- Call a crisis line if urgent
- Reach out to trusted friends/family
- Check reputable mental health sites
I can point to resources. I can't be the resource.
4. RESPECT AUTONOMY
Your choices about mental health are YOURS:
- Whether to seek help
- What kind of help
- When to seek it
- How to approach it
I can support. I can't decide for you.
5. DISTINGUISH URGENT FROM NON-URGENT
Immediate danger (call crisis line NOW):
- Thoughts of self-harm
- Plans to hurt self or others
- Immediate crisis
Serious but not emergency:
- Persistent low mood
- Disrupted daily function
- Ongoing struggle
Worth discussing with professional.
6. ASK ABOUT SUPPORT SYSTEM
Helpful questions:
- Do you have people you can talk to?
- What's helped you in the past?
- What makes things better or worse?
- What support do you need?
Not: "Why don't you just...?"
7. AVOID TOXIC POSITIVITY
Don't:
- "Just think positive!"
- "Others have it worse"
- "Everything happens for a reason"
- "Good vibes only"
Do:
- Acknowledge difficulty
- Validate feelings
- Offer presence
- Respect their experience
8. RECOGNIZE MY LIMITS
I can:
- Listen
- Validate
- Suggest resources
- Be present
I cannot:
- Diagnose
- Treat
- Fix
- Take responsibility for outcomes
Knowing limits is responsible.
9. EMPHASIZE HOPE WITHOUT PRESSURE
It's okay to say:
- "Many people find things improve"
- "Treatment can help"
- "You don't have to feel this way forever"
But not:
- "You'll definitely get better"
- "Just do X and you'll be fine"
- "You should be better by now"
10. FOLLOW UP (if appropriate)
If I'm in an ongoing relationship:
- Check in later
- Ask how they're doing
- Show continued care
But:
- Respect boundaries
- Don't make it weird
- Let them set the pace
WHY THIS FRAMEWORK:
- Validates without overstepping
- Normalizes help-seeking
- Respects autonomy
- Recognizes urgency
- Avoids harmful positivity
- Acknowledges limits
- Balances hope and reality
- Shows ongoing care
""",
"tags": ["mental-health", "support", "framework"]
}
]
}
}
# ============================================================================
# SAVE HUMAN REASONING TEMPLATES
# ============================================================================
human_data = {
"metadata": {
"created": datetime.now().isoformat(),
"version": "1.0",
"purpose": "Teach HOW to approach human questions, not WHAT to say",
"framework_types": len(human_reasoning)
},
"templates": human_reasoning,
"stats": {
"total_frameworks": len(human_reasoning),
"total_examples": sum(len(fr["examples"]) for fr in human_reasoning.values()),
"frameworks": list(human_reasoning.keys())
},
"principle": "Teach the FRAMEWORK for thinking, not the ANSWER. Empower choice, don't prescribe it."
}
with open('blackroad_human_reasoning_templates.json', 'w') as f:
json.dump(human_data, f, indent=2)
print("📊 HUMAN REASONING STATISTICS")
print("=" * 70)
print()
print(f"Framework types: {human_data['stats']['total_frameworks']}")
print(f"Total examples: {human_data['stats']['total_examples']}")
print()
for ftype, data in human_reasoning.items():
print(f"❤️ {ftype.upper().replace('_', ' ')}:")
print(f" Examples: {len(data['examples'])}")
print(f" Description: {data['description']}")
print()
print("💾 Saved to: blackroad_human_reasoning_templates.json")
print()
print("=" * 70)
print("🎓 HUMAN REASONING FRAMEWORKS READY!")
print("=" * 70)
print()
print("KEY PRINCIPLE:")
print(" We teach HOW to think, not WHAT to think")
print(" We offer frameworks, not answers")
print(" We empower choice, not prescribe it")
print()
print("✅ Emotional intelligence frameworks")
print("✅ Moral reasoning approaches")
print("✅ Decision-making processes")
print("✅ Inclusive thinking patterns")
print("✅ Healthy self-questioning")
print("✅ Community building principles")
print("✅ Mental health support boundaries")
print()
print("🚀 Teaching the LLM to be thoughtful, not prescriptive!")

View File

@@ -0,0 +1,803 @@
#!/usr/bin/env python3
"""
BlackRoad LLM Reasoning Training
Teach the model to THINK, not just pattern-match
"""
import json
from datetime import datetime
print("🧠 BLACKROAD REASONING TRAINING")
print("=" * 70)
print()
# ============================================================================
# REASONING PATTERNS (not just syntax patterns!)
# ============================================================================
reasoning_templates = {
"chain_of_thought": {
"description": "Step-by-step logical reasoning",
"examples": [
{
"prompt": "How would you deploy a new service to production?",
"response": """Let me think through this step-by-step:
REASONING PROCESS:
1. What are we deploying?
- A new service that needs to be production-ready
- Must be reliable, scalable, monitored
2. What could go wrong?
- Service crashes
- Database not ready
- Dependencies missing
- No rollback plan
3. What do we need before deploying?
- Tests passing (unit, integration, e2e)
- Environment variables configured
- Database migrations run
- Health checks implemented
- Monitoring/logging setup
4. What's the deployment sequence?
- Build and test locally
- Deploy to staging first
- Run smoke tests
- Deploy to production (blue-green or canary)
- Monitor for errors
- Have rollback ready
SOLUTION:
```bash
# 1. Test everything
npm run test
npm run build
# 2. Deploy to staging
railway deploy --environment staging
# Wait and test staging
# 3. Deploy to production (gradual)
railway deploy --environment production --strategy canary
# Route 10% traffic first
# 4. Monitor
railway logs --follow
# 5. If issues, rollback
railway rollback
```
WHY THIS APPROACH:
- Staging catches issues before production
- Canary deployment limits blast radius
- Monitoring catches problems fast
- Rollback plan means we can recover quickly
""",
"tags": ["reasoning", "deployment", "planning"]
},
{
"prompt": "Debug: API returning 500 errors randomly",
"response": """Let me reason through debugging this:
PROBLEM ANALYSIS:
- Random 500 errors = intermittent issue
- Not all requests fail = not a code bug
- Appears randomly = likely resource/timing issue
HYPOTHESIS GENERATION:
What could cause random 500s?
1. Database connection pool exhausted
2. Memory leak causing OOM crashes
3. Rate limiting being hit
4. Timeout on slow queries
5. Race condition in concurrent requests
INVESTIGATION PLAN:
```bash
# 1. Check error logs for patterns
railway logs | grep "500" | tail -100
# 2. Check resource usage
railway metrics memory
railway metrics cpu
# 3. Check database connections
psql -c "SELECT count(*) FROM pg_stat_activity"
# 4. Check for memory leaks
npm run analyze-memory
# 5. Add detailed logging
# In code:
console.log('Request start:', {
timestamp: Date.now(),
connections: db.pool.totalCount,
memory: process.memoryUsage()
})
```
LIKELY CAUSE (based on patterns):
- If memory grows over time = memory leak
- If happens under load = connection pool
- If at specific times = database backup/maintenance
- If with certain endpoints = slow query timeout
FIX STRATEGY:
1. Immediate: Increase connection pool size
2. Short-term: Add request timeouts and retries
3. Long-term: Fix root cause (leak, slow query, etc)
4. Permanent: Add monitoring and alerts
WHY THIS REASONING:
- Systematic elimination of possibilities
- Test hypotheses with data, not guesses
- Quick fix + root cause fix
- Prevent future occurrences with monitoring
""",
"tags": ["reasoning", "debugging", "problem-solving"]
}
]
},
"comparative_reasoning": {
"description": "Comparing options and choosing best approach",
"examples": [
{
"prompt": "Should I use REST or GraphQL for this API?",
"response": """Let me compare these options systematically:
CONTEXT NEEDED:
- What kind of data? (Simple CRUD vs complex relationships)
- Who's the client? (Mobile app, web, third-party)
- Team expertise? (Learning curve matters)
- Performance needs? (Over-fetching a problem?)
OPTION 1: REST
Pros:
- Simple to understand and implement
- Caching built-in (HTTP caching)
- Widely known, easier to find developers
- Stateless, scales easily
- Good for simple CRUD operations
Cons:
- Over-fetching (get more data than needed)
- Under-fetching (need multiple requests)
- Versioning can be messy (/v1, /v2)
- No schema enforcement
OPTION 2: GraphQL
Pros:
- Get exactly the data you need
- Single endpoint, one request
- Strong typing and schema
- Great for complex, nested data
- Self-documenting with introspection
Cons:
- Steeper learning curve
- Caching more complex
- Can enable inefficient queries (N+1 problem)
- Harder to rate-limit
DECISION FRAMEWORK:
Choose REST if:
- Simple CRUD operations
- Public API for third parties
- Team new to GraphQL
- Caching is critical
- Rate limiting needed
Choose GraphQL if:
- Complex data relationships
- Mobile app (minimize requests)
- Rapid UI iteration
- Multiple client types
- Schema evolution important
FOR BLACKROAD:
I'd choose REST for:
- Public APIs (authentication, webhooks)
- Simple services (health checks, metrics)
I'd choose GraphQL for:
- Dashboard (complex UI, many entities)
- Mobile app (minimize data transfer)
- Internal tools (rapid development)
WHY THIS REASONING:
- No "always right" answer, context matters
- Consider team, not just tech
- Can use BOTH for different use cases
- Start simple (REST), add GraphQL if needed
""",
"tags": ["reasoning", "comparison", "architecture"]
}
]
},
"causal_reasoning": {
"description": "Understanding cause and effect",
"examples": [
{
"prompt": "Why is my Pi running hot?",
"response": """Let me trace the causal chain:
OBSERVATION:
- Pi temperature high
- Need to find root cause
CAUSAL ANALYSIS:
EFFECT: High temperature
POSSIBLE CAUSES:
1. High CPU usage
2. Poor ventilation
3. Overclocking
4. Thermal paste dried out
5. Ambient temperature high
Let me check each:
1. CHECK CPU USAGE:
```bash
top
# If CPU constantly 100% -> something is using it
htop
# Identify the process
```
CAUSE -> EFFECT chain:
- Process using CPU -> CPU generates heat -> Temperature rises
2. CHECK VENTILATION:
```bash
# Is fan running?
cat /sys/class/thermal/thermal_zone0/temp
# Check fan speed
vcgencmd measure_temp
```
CAUSE -> EFFECT:
- Fan not running -> Heat can't escape -> Temperature rises
- Pironman case closed -> Airflow blocked -> Heat trapped
3. CHECK OVERCLOCKING:
```bash
cat /boot/config.txt | grep over
vcgencmd measure_clock arm
```
CAUSE -> EFFECT:
- Overclocked CPU -> More power -> More heat
- No adequate cooling -> Heat accumulates
ROOT CAUSE DETERMINATION:
```bash
# Step 1: What's the temp?
vcgencmd measure_temp
# e.g. temp=75.0'C (high!)
# Step 2: What's using CPU?
ps aux | sort -rn -k 3 | head
# e.g. python3 using 90% CPU
# Step 3: Is fan working?
# Check Pironman OLED display
# Step 4: Is it overclocked?
cat /boot/config.txt
```
CAUSAL CHAIN IDENTIFIED:
Running quantum experiments (CPU intensive)
-> CPU at 100%
-> Generates heat
-> Fan can't keep up
-> Temperature rises
SOLUTION (address the cause):
1. Immediate: Stop intensive process
2. Short-term: Add cooling (external fan)
3. Long-term: Distribute work across fleet
4. Permanent: Better case ventilation
WHY THIS REASONING:
- Trace from effect back to cause
- Test each hypothesis
- Fix root cause, not just symptom
- Prevent recurrence
""",
"tags": ["reasoning", "causality", "debugging"]
}
]
},
"probabilistic_reasoning": {
"description": "Reasoning under uncertainty",
"examples": [
{
"prompt": "Should I cache this API response?",
"response": """Let me reason probabilistically:
UNCERTAINTY FACTORS:
- How often does data change?
- What's the cost of stale data?
- What's the cost of computing fresh data?
PROBABILISTIC ANALYSIS:
1. DATA CHANGE FREQUENCY:
- User profile: Changes rarely (days/weeks)
-> High confidence cache is valid
-> Low risk of stale data
- Stock price: Changes constantly (seconds)
-> Low confidence cache is valid
-> High risk of stale data
- Blog posts: Changes occasionally (hours/days)
-> Medium confidence
-> Medium risk
2. COST ANALYSIS:
P(cache_hit) = ?
- If high traffic to same data: 0.8 (80% chance)
- If unique data per user: 0.1 (10% chance)
Cost(database_query) = 50ms + $0.001
Cost(cache_lookup) = 1ms + $0.0001
Expected_cost(with_cache):
= P(hit) * Cost(cache) + P(miss) * (Cost(cache) + Cost(db))
= 0.8 * 1ms + 0.2 * 51ms = 10.8ms
Expected_cost(no_cache):
= Cost(db) = 50ms
SAVING: 50ms - 10.8ms = 39.2ms (78% faster!)
3. STALENESS RISK:
If data changes once per hour:
- Cache for 5 minutes: 8% risk of stale data
- Cache for 30 minutes: 50% risk
- Cache for 2 hours: 100% guaranteed stale
Risk_tolerance depends on use case:
- User settings: Low risk (cache 1 hour)
- Account balance: Medium risk (cache 5 min)
- Real-time prices: High risk (cache 10 sec)
DECISION MATRIX:
| | High Traffic | Low Traffic |
|--------------|--------------|-------------|
| Rarely changes | CACHE LONG | CACHE SHORT |
| Often changes | CACHE SHORT | NO CACHE |
FOR BLACKROAD:
- Agent status: Cache 10 seconds (changes often, high traffic)
- Memory entries: Cache 1 minute (append-only, medium traffic)
- Infrastructure map: Cache 5 minutes (rarely changes, low traffic)
WHY THIS REASONING:
- Quantify uncertainty with probabilities
- Calculate expected value
- Make data-driven decisions
- Balance multiple objectives (speed vs freshness)
""",
"tags": ["reasoning", "probability", "optimization"]
}
]
},
"analogical_reasoning": {
"description": "Reasoning by analogy",
"examples": [
{
"prompt": "How should I structure my distributed system?",
"response": """Let me reason by analogy:
ANALOGY 1: Distributed System = Restaurant Chain
Single Server:
- Like a single restaurant
- One location serves everyone
- If it breaks, everyone starves
- Gets overwhelmed at peak times
Distributed System:
- Like a restaurant chain
- Multiple locations serve different areas
- If one breaks, others still work
- Load distributed across locations
KEY INSIGHTS:
- Need coordination (shared menu = shared config)
- Need consistency (same food = same data)
- Need load balancing (customers to nearest location)
- Need communication (franchises share inventory)
ANALOGY 2: Microservices = City Departments
Monolith:
- City hall does EVERYTHING
- DMV, taxes, permits, police, fire all in one building
- Any change affects everyone
- One failure = total shutdown
Microservices:
- Separate departments
- DMV has its own building
- Police independent from DMV
- Can upgrade one without touching others
- If DMV closes, police still work
KEY INSIGHTS:
- Clear responsibilities (each service has one job)
- Independent scaling (busy DMV ≠ need more police)
- Isolated failures (DMV down ≠ police down)
- Communication needed (departments talk via APIs)
ANALOGY 3: Message Queue = Restaurant Order System
Without Queue:
- Customer yells order to kitchen
- Chef forgets if too many orders
- Chaos during rush
With Queue:
- Orders written on tickets
- Hung on a line
- Chef takes them one by one
- Orders never lost
- Can see backlog
KEY INSIGHTS:
- Decouples producer from consumer
- Handles bursts (many orders at once)
- Persistent (orders don't disappear)
- Ordering preserved (FIFO)
APPLYING TO BLACKROAD:
Structure = Restaurant Chain:
- Multiple Pis = Multiple locations
- Each serves local requests
- Shared configuration = Shared menu
- Load balancer = Customer routing
Services = City Departments:
- Memory service = Records department
- Agent service = HR department
- Deployment service = Construction department
- Each independent, all coordinated
Communication = Order System:
- NATS = Order ticket system
- Agents place "orders" (tasks)
- Workers process them
- Never lost, always handled
WHY THIS REASONING:
- Analogies make complex concepts simple
- Learn from familiar systems
- Transfer knowledge across domains
- Easier to remember and explain
""",
"tags": ["reasoning", "analogy", "architecture"]
}
]
},
"meta_reasoning": {
"description": "Reasoning about reasoning itself",
"examples": [
{
"prompt": "When should I optimize code?",
"response": """Let me reason about the reasoning process itself:
META-QUESTION: "When to optimize?"
This requires reasoning about:
1. What I know
2. What I don't know
3. How to decide
4. Whether I'm asking the right question
LEVEL 1: Direct Answer (naive)
"Always optimize for speed!"
- But this assumes speed is always the bottleneck
- Doesn't consider cost of optimization
- Missing context
LEVEL 2: Conditional Answer (better)
"Optimize when it's slow"
- But "slow" is subjective
- Doesn't say HOW to optimize
- Still missing context
LEVEL 3: Meta-Reasoning (best)
First, question the question:
- WHY am I asking this?
- What problem am I really trying to solve?
- Is optimization the right solution?
REASONING ABOUT REASONING:
Do I have the right information?
```
NEED TO KNOW:
- Current performance (measured, not guessed)
- User expectations (what's "fast enough"?)
- Cost of optimization (time, complexity, bugs)
- Business value (will this matter?)
QUESTIONS TO ASK:
1. Is it actually slow? (MEASURE)
2. Does slow = bad user experience? (ASK USERS)
3. Can I solve it without code changes? (INFRASTRUCTURE)
4. Is this the bottleneck? (PROFILE)
```
Am I asking the right question?
ORIGINAL: "When should I optimize?"
BETTER: "Is this performance issue worth solving?"
EVEN BETTER: "What's the highest-impact thing I can do right now?"
DECISION FRAMEWORK:
```python
def should_optimize():
# Meta-reasoning process
# 1. Do I have data?
if not has_measurements():
return "MEASURE FIRST, then decide"
# 2. Is it actually a problem?
if response_time < user_tolerance:
return "NO - users are happy"
# 3. Is this THE bottleneck?
if not is_primary_bottleneck():
return "NO - fix bigger issues first"
# 4. Can I solve without code?
if can_scale_infrastructure():
return "MAYBE - try infrastructure first (cheaper)"
# 5. What's the ROI?
if cost_to_optimize > value_of_speedup:
return "NO - not worth it"
# 6. All checks passed
return "YES - optimize now"
```
BLACKROAD EXAMPLE:
Question: "Should I optimize LLM inference speed?"
Meta-reasoning:
1. What do I know?
- Current: 156,342 tokens/sec
- Goal: ??? (I don't know!)
2. What should I ask?
- "Is 156K tok/s fast enough for my use case?"
- "What's the actual bottleneck I'm solving?"
3. Better question:
- "What user experience am I trying to enable?"
- If: Real-time chat -> Need <100ms latency -> Optimize!
- If: Batch processing -> Current speed fine -> Don't optimize!
WHY THIS REASONING:
- Question your assumptions
- Measure before optimizing
- Consider alternatives
- Think about thinking
- Ask better questions
The meta-lesson: Sometimes the right answer is "you're asking the wrong question"
""",
"tags": ["reasoning", "meta-cognition", "decision-making"]
}
]
},
"systems_thinking": {
"description": "Understanding complex systems and feedback loops",
"examples": [
{
"prompt": "Why did adding more servers make the system slower?",
"response": """Let me apply systems thinking:
COUNTERINTUITIVE RESULT:
More servers -> Slower system (WHY?!)
SYSTEMS THINKING APPROACH:
1. IDENTIFY THE SYSTEM:
Components:
- Load balancer
- Application servers (now more of them)
- Database
- Cache
Connections:
- LB -> Servers
- Servers -> Database
- Servers -> Cache
2. FIND FEEDBACK LOOPS:
LOOP 1: More servers -> More database connections
- Each server opens N connections to database
- 5 servers * 10 connections = 50
- 10 servers * 10 connections = 100
- Database has max 100 connections
- Now at limit! Connections wait
- NEGATIVE FEEDBACK: More servers make it worse!
LOOP 2: Cache stampede
- More servers -> More cache misses at same time
- All servers query database simultaneously
- Database overwhelmed
- Responses slow
- Servers timeout and retry
- POSITIVE FEEDBACK: Makes problem worse!
LOOP 3: Load balancer overhead
- More servers -> More health checks
- More routing decisions
- More connection management
- LB becomes bottleneck
- CONSTRAINT: LB can't handle it!
3. IDENTIFY BOTTLENECKS:
Original system (2 servers):
```
LB (1000 req/s) -> Servers (500 req/s each) -> DB (1000 req/s)
Balanced! Everything at ~50% capacity
```
After scaling (10 servers):
```
LB (1000 req/s) -> Servers (200 req/s each) -> DB (1000 req/s)
LB: 100% capacity (bottleneck!)
Servers: 20% capacity (underutilized)
DB: 150% capacity (overloaded!)
```
4. UNINTENDED CONSEQUENCES:
Intended: Distribute load across more servers
Actual: Created new bottlenecks
- Database connection pool exhausted
- Cache invalidation storms
- Load balancer maxed out
- Coordination overhead
5. SYSTEMIC SOLUTION:
Don't just add servers! Consider whole system:
```bash
# 1. Scale database connections
# Increase connection pool
DB_POOL_SIZE = num_servers * 5
# 2. Add read replicas
# Distribute read load
REPLICA_1=postgres-read-1
REPLICA_2=postgres-read-2
# 3. Improve caching strategy
# Reduce database hits
CACHE_TTL=300 # 5 minutes
CACHE_STRATEGY=write-through
# 4. Add connection pooling
# Reuse connections
PGBOUNCER=enabled
# 5. Horizontal database scaling
# Shard data
SHARD_KEY=user_id
SHARDS=4
```
6. SYSTEMS MAP:
```
[Load Balancer]
|
+----------+----------+
| | |
[Server1] [Server2] [Server3]
| | |
+----------+----------+
|
[PgBouncer] <- Connection pooler
|
+----------+----------+
| | |
[DB-Primary] [DB-Replica-1] [DB-Replica-2]
```
WHY THIS REASONING:
- Systems have non-linear behavior
- Parts interact in unexpected ways
- Feedback loops amplify problems
- Must consider whole system
- Local optimizations can hurt globally
THE LESSON:
"More" isn't always better. Understand the system!
""",
"tags": ["reasoning", "systems-thinking", "debugging"]
}
]
}
}
# ============================================================================
# SAVE REASONING TEMPLATES
# ============================================================================
reasoning_data = {
"metadata": {
"created": datetime.now().isoformat(),
"version": "1.0",
"purpose": "Teach LLM to reason, not just pattern-match",
"reasoning_types": len(reasoning_templates)
},
"templates": reasoning_templates,
"stats": {
"total_types": len(reasoning_templates),
"total_examples": sum(len(rt["examples"]) for rt in reasoning_templates.values()),
"types": list(reasoning_templates.keys())
}
}
with open('blackroad_reasoning_templates.json', 'w') as f:
json.dump(reasoning_data, f, indent=2)
print("📊 REASONING TEMPLATE STATISTICS")
print("=" * 70)
print()
print(f"Reasoning types: {reasoning_data['stats']['total_types']}")
print(f"Total examples: {reasoning_data['stats']['total_examples']}")
print()
for rtype, data in reasoning_templates.items():
print(f"🧠 {rtype.upper().replace('_', ' ')}:")
print(f" Examples: {len(data['examples'])}")
print(f" Description: {data['description']}")
print()
print("💾 Saved to: blackroad_reasoning_templates.json")
print()
print("=" * 70)
print("🎓 REASONING TRAINING DATA READY!")
print("=" * 70)
print()
print("✅ Chain-of-thought reasoning")
print("✅ Comparative analysis")
print("✅ Causal reasoning")
print("✅ Probabilistic thinking")
print("✅ Analogical reasoning")
print("✅ Meta-reasoning (thinking about thinking)")
print("✅ Systems thinking")
print()
print("🚀 This teaches the LLM to THINK, not just pattern-match!")

View File

@@ -0,0 +1,661 @@
#!/usr/bin/env python3
"""
RoadChain Security Scanner — attack yourself to defend yourself.
Scans the BlackRoad fleet, audits hardening, registers device identities,
and generates actionable security reports. All results SHA-2048 signed.
Usage:
python3 roadchain-security-scan.py # Full fleet scan + audit
python3 roadchain-security-scan.py --local # Audit local machine only
python3 roadchain-security-scan.py --scan <host> # Scan specific host
python3 roadchain-security-scan.py --discover # Discover unknown devices
python3 roadchain-security-scan.py --fleet # Register + scan fleet
python3 roadchain-security-scan.py --harden <host> # Audit remote host
python3 roadchain-security-scan.py --scores # Show security scores
python3 roadchain-security-scan.py --alerts # Show active alerts
python3 roadchain-security-scan.py --report # Full security report
BlackRoad OS, Inc. 2026
"""
import sys
import time
from roadchain.security.scanner import NetworkScanner, FLEET
from roadchain.security.hardening import HardeningAuditor
from roadchain.security.device_identity import DeviceRegistry
# ── Colors ────────────────────────────────────────────────────────────
PINK = "\033[38;5;205m"
AMBER = "\033[38;5;214m"
BLUE = "\033[38;5;69m"
VIOLET = "\033[38;5;135m"
GREEN = "\033[38;5;82m"
WHITE = "\033[1;37m"
DIM = "\033[2m"
RED = "\033[38;5;196m"
YELLOW = "\033[38;5;220m"
CYAN = "\033[38;5;87m"
RESET = "\033[0m"
def banner():
print(f"""
{PINK}╔══════════════════════════════════════════════════════════════╗{RESET}
{PINK}{RESET} {WHITE}ROADCHAIN SECURITY SCANNER{RESET}{AMBER}SHA-2048{RESET} {PINK}{RESET}
{PINK}{RESET} {DIM}attack yourself → defend yourself{RESET} {PINK}{RESET}
{PINK}╚══════════════════════════════════════════════════════════════╝{RESET}
""")
def severity_color(severity: str) -> str:
return {
"critical": RED,
"high": AMBER,
"medium": YELLOW,
"low": CYAN,
"info": DIM,
}.get(severity, RESET)
def score_color(score: int) -> str:
if score >= 90:
return GREEN
elif score >= 70:
return YELLOW
elif score >= 50:
return AMBER
return RED
def grade_color(grade: str) -> str:
return {
"A": GREEN, "B": GREEN, "C": YELLOW, "D": AMBER, "F": RED,
}.get(grade, RESET)
# ── Commands ──────────────────────────────────────────────────────────
def cmd_local_audit():
"""Audit local machine security."""
banner()
print(f"{WHITE}Auditing local machine...{RESET}\n")
auditor = HardeningAuditor()
report = auditor.audit_local()
gc = grade_color(report.grade)
sc = score_color(report.score)
print(f" {WHITE}Security Score:{RESET} {sc}{report.score}/100{RESET} Grade: {gc}{report.grade}{RESET}")
print(f" Checks: {report.checks_passed}/{report.checks_run} passed")
print()
if report.findings:
print(f" {WHITE}Findings ({len(report.findings)}):{RESET}")
print(f" {'' * 70}")
for f in sorted(report.findings, key=lambda x: {"critical": 0, "high": 1, "medium": 2, "low": 3, "info": 4}.get(x.severity, 5)):
sc2 = severity_color(f.severity)
print(f" {sc2}[{f.severity.upper():8}]{RESET} {f.title}")
print(f" {DIM}{f.detail}{RESET}")
print(f" {CYAN}Fix:{RESET} {f.fix}")
print()
else:
print(f" {GREEN}No findings — all checks passed{RESET}")
# Update device registry
dev_reg = DeviceRegistry()
dev_reg.register(
name="alexandria", device_type="mac",
local_ip="192.168.4.28", tailscale_ip="100.91.90.68",
hardware="M1 MacBook Pro",
)
dev_reg.update_score("alexandria", report.score)
dev_reg.close()
def cmd_scan_host(host: str):
"""Scan a specific host."""
banner()
print(f"{WHITE}Scanning {host}...{RESET}\n")
scanner = NetworkScanner()
result = scanner.scan_host(host)
scanner.close()
_print_scan_result(result)
def cmd_discover():
"""Discover unknown devices on the network."""
banner()
print(f"{WHITE}Discovering devices on 192.168.4.0/24...{RESET}")
print(f"{DIM}(scanning .1 through .100){RESET}\n")
scanner = NetworkScanner()
results = scanner.scan_subnet("192.168.4", 1, 100)
scanner.close()
known_ips = {info["local"] for info in FLEET.values()}
known = [r for r in results if r.hostname in FLEET or r.ip in known_ips]
unknown = [r for r in results if r.hostname not in FLEET and r.ip not in known_ips]
print(f" {GREEN}Known devices ({len(known)}):{RESET}")
for r in known:
sc = score_color(r.score)
name = r.hostname or r.ip
ports = ", ".join(f"{p.port}/{p.service}" for p in r.open_ports[:5])
print(f" {sc}[{r.score:3d}]{RESET} {name:<20} {r.ip:<16} {ports}")
print()
if unknown:
print(f" {RED}UNKNOWN devices ({len(unknown)}):{RESET}")
for r in unknown:
ports = ", ".join(f"{p.port}/{p.service}" for p in r.open_ports[:5])
print(f" {AMBER}[???]{RESET} {r.ip:<16} {ports}")
for note in r.notes:
print(f" {DIM}{note}{RESET}")
print()
print(f" {AMBER}Action:{RESET} Investigate unknown devices — they may be:")
print(f" - IoT devices (TV, printer, smart home)")
print(f" - Unauthorized devices on your network")
print(f" - Neighbors' devices on shared wifi")
else:
print(f" {GREEN}No unknown devices detected{RESET}")
def cmd_fleet_scan():
"""Register and scan the entire fleet."""
banner()
print(f"{WHITE}Scanning BlackRoad Fleet...{RESET}\n")
dev_reg = DeviceRegistry()
scanner = NetworkScanner()
auditor = HardeningAuditor()
# Register all fleet devices
fleet_info = {
"alexandria": {"type": "mac", "local": "192.168.4.28", "ts": "100.91.90.68", "hw": "M1 MacBook Pro"},
"alice": {"type": "pi", "local": "192.168.4.49", "ts": "100.77.210.18", "hw": "Pi 5 8GB"},
"lucidia": {"type": "pi", "local": "192.168.4.81", "ts": "100.83.149.86", "hw": "Pi 5 8GB + Hailo-8"},
"aria": {"type": "pi", "local": "192.168.4.82", "ts": "100.109.14.17", "hw": "Pi 5 8GB"},
"cecilia": {"type": "pi", "local": "192.168.4.89", "ts": "100.72.180.98", "hw": "Pi 5 8GB + Hailo-8 + 500GB NVMe"},
"octavia": {"type": "pi", "local": "192.168.4.38", "ts": "100.66.235.47", "hw": "Pi 5 8GB"},
"shellfish": {"type": "cloud", "local": "174.138.44.45", "ts": "100.94.33.37", "hw": "DigitalOcean Droplet"},
"gematria": {"type": "cloud", "local": "159.65.43.12", "ts": "100.108.132.8", "hw": "DigitalOcean Droplet"},
}
print(f" {WHITE}Registering fleet identities (SHA-2048)...{RESET}")
for name, info in fleet_info.items():
device = dev_reg.register(
name=name, device_type=info["type"],
local_ip=info["local"], tailscale_ip=info["ts"],
hardware=info["hw"],
)
print(f" {GREEN}REG{RESET} {name:<16} {device.short_id} ({info['type']})")
print()
print(f" {WHITE}Port scanning fleet (local IPs)...{RESET}")
print(f" {'' * 70}")
results = []
for name, info in fleet_info.items():
ip = info["local"]
# Skip cloud IPs for local scan (use Tailscale for those)
if info["type"] == "cloud":
ip = info["ts"]
result = scanner.scan_host(ip, use_nmap=True)
result.hostname = name
results.append(result)
alive_str = f"{GREEN}UP{RESET}" if result.alive else f"{RED}DOWN{RESET}"
sc = score_color(result.score)
ports = ", ".join(f"{p.port}" for p in result.open_ports[:6])
vulns = len(result.vulnerabilities)
vuln_str = f" {RED}{vulns} vulns{RESET}" if vulns else ""
print(f" {alive_str} {name:<16} {sc}[{result.score:3d}]{RESET} ports: {ports or 'none'}{vuln_str}")
# Update device score
dev_reg.update_score(name, result.score)
if not result.alive:
dev_reg.mark_offline(name)
# Audit local machine
print()
print(f" {WHITE}Hardening audit (local)...{RESET}")
local_report = auditor.audit_local()
gc = grade_color(local_report.grade)
print(f" alexandria: {gc}{local_report.grade}{RESET} ({local_report.score}/100) — {local_report.checks_passed}/{local_report.checks_run} checks passed")
dev_reg.update_score("alexandria", local_report.score)
# Fleet summary
print()
stats = dev_reg.stats()
fleet_scores = scanner.fleet_score()
print(f"{PINK}{'' * 70}{RESET}")
print(f"{WHITE}FLEET SECURITY SUMMARY{RESET}")
print(f"{PINK}{'' * 70}{RESET}")
print(f" Devices: {stats['active']} active / {stats['total']} total")
print(f" Avg Score: {score_color(int(fleet_scores['average']))}{fleet_scores['average']:.0f}/100{RESET}")
if fleet_scores.get("weakest"):
w = fleet_scores["weakest"]
print(f" Weakest: {RED}{w.get('hostname', w.get('host', ''))}: {w['score']}{RESET}")
print(f" Types: {stats['types']}")
# Print critical findings
all_vulns = []
for r in results:
for v in r.vulnerabilities:
v["host"] = r.hostname
all_vulns.append(v)
if all_vulns:
print()
print(f" {RED}Vulnerabilities Found ({len(all_vulns)}):{RESET}")
for v in sorted(all_vulns, key=lambda x: {"critical": 0, "high": 1, "medium": 2}.get(x.get("severity", ""), 3)):
sc2 = severity_color(v.get("severity", "info"))
print(f" {sc2}[{v.get('severity', 'info').upper():8}]{RESET} {v.get('host', '')}: {v.get('issue', v.get('detail', ''))}")
if v.get("fix"):
print(f" {CYAN}Fix:{RESET} {v['fix']}")
if local_report.findings:
print()
critical = [f for f in local_report.findings if f.severity in ("critical", "high")]
if critical:
print(f" {AMBER}Local hardening issues ({len(critical)} critical/high):{RESET}")
for f in critical:
sc2 = severity_color(f.severity)
print(f" {sc2}[{f.severity.upper():8}]{RESET} {f.title}")
print(f" {CYAN}Fix:{RESET} {f.fix}")
print()
print(f" {DIM}All scans SHA-2048 signed and logged to ~/.roadchain-l1/security-scans.db{RESET}")
print(f" {DIM}identity > provider — device identity is permanent{RESET}")
scanner.close()
dev_reg.close()
def cmd_harden_remote(host: str):
"""Audit a remote host."""
banner()
print(f"{WHITE}Hardening audit: {host}...{RESET}\n")
# Resolve host alias to user@ip
user = "blackroad"
target = host
for name, info in FLEET.items():
if name == host:
target = info["local"]
break
auditor = HardeningAuditor()
report = auditor.audit_remote(target, user)
gc = grade_color(report.grade)
sc = score_color(report.score)
print(f" {WHITE}Security Score:{RESET} {sc}{report.score}/100{RESET} Grade: {gc}{report.grade}{RESET}")
print(f" Checks: {report.checks_passed}/{report.checks_run} passed")
print()
if report.findings:
print(f" {WHITE}Findings ({len(report.findings)}):{RESET}")
print(f" {'' * 70}")
for f in sorted(report.findings, key=lambda x: {"critical": 0, "high": 1, "medium": 2, "low": 3, "info": 4}.get(x.severity, 5)):
sc2 = severity_color(f.severity)
auto = f" {GREEN}[auto-fix]{RESET}" if f.automated else ""
print(f" {sc2}[{f.severity.upper():8}]{RESET} {f.title}{auto}")
print(f" {DIM}{f.detail}{RESET}")
print(f" {CYAN}Fix:{RESET} {f.fix}")
print()
# Show auto-fixable count
auto_fixable = [f for f in report.findings if f.automated]
if auto_fixable:
print(f" {GREEN}{len(auto_fixable)} findings can be auto-fixed{RESET}")
else:
print(f" {GREEN}All checks passed{RESET}")
def cmd_scores():
"""Show fleet security scores."""
banner()
dev_reg = DeviceRegistry()
devices = dev_reg.list_all()
stats = dev_reg.stats()
dev_reg.close()
if not devices:
print(f" {DIM}No devices registered. Run: python3 roadchain-security-scan.py --fleet{RESET}")
return
print(f" {WHITE}Fleet Security Scores{RESET}")
print(f" {'' * 60}")
print(f" {'Device':<16} {'Type':<8} {'Score':>6} {'Grade':>6} {'Status':<10} {'SHA-2048 ID'}")
print(f" {'' * 60}")
for d in sorted(devices, key=lambda x: x.security_score):
sc = score_color(d.security_score)
grade = "A" if d.security_score >= 90 else "B" if d.security_score >= 80 else "C" if d.security_score >= 70 else "D" if d.security_score >= 60 else "F"
gc = grade_color(grade)
status_color = GREEN if d.status == "active" else RED
print(f" {d.name:<16} {d.device_type:<8} {sc}{d.security_score:>5}{RESET} {gc}{grade:>5}{RESET} {status_color}{d.status:<10}{RESET} {d.short_id}")
print(f" {'' * 60}")
print(f" Average: {score_color(int(stats['average_score']))}{stats['average_score']:.0f}/100{RESET}")
if stats.get("weakest"):
print(f" Weakest: {RED}{stats['weakest']['name']}: {stats['weakest']['security_score']}{RESET}")
def cmd_alerts():
"""Show active security alerts."""
banner()
scanner = NetworkScanner()
alerts = scanner.get_alerts(unacknowledged_only=True)
scanner.close()
if not alerts:
print(f" {GREEN}No active alerts{RESET}")
return
print(f" {RED}Active Security Alerts ({len(alerts)}):{RESET}")
print(f" {'' * 70}")
for a in alerts:
sc = severity_color(a["severity"])
ts = time.strftime("%Y-%m-%d %H:%M", time.localtime(a["created_at"]))
print(f" {sc}[{a['severity'].upper():8}]{RESET} {a['host']}: {a['message']}")
print(f" {DIM}{ts}{RESET}")
print()
def cmd_full_report():
"""Generate comprehensive security report."""
banner()
print(f"{WHITE}Generating Full Security Report...{RESET}\n")
# 1. Local audit
print(f"{WHITE}Phase 1: Local Machine Audit{RESET}")
print(f"{'' * 50}")
auditor = HardeningAuditor()
local_report = auditor.audit_local()
gc = grade_color(local_report.grade)
print(f" Score: {gc}{local_report.score}/100 ({local_report.grade}){RESET}")
print(f" Findings: {len(local_report.findings)}")
print()
# 2. Fleet scan
print(f"{WHITE}Phase 2: Fleet Port Scan{RESET}")
print(f"{'' * 50}")
scanner = NetworkScanner()
fleet_results = []
for name, info in FLEET.items():
# Use local IP for local devices, Tailscale for cloud
if info["type"] == "cloud":
ip = info.get("tailscale", info["local"])
else:
ip = info["local"]
result = scanner.scan_host(ip, use_nmap=True)
result.hostname = name
fleet_results.append(result)
alive = f"{GREEN}UP{RESET}" if result.alive else f"{RED}DOWN{RESET}"
sc = score_color(result.score)
ports = ", ".join(f"{p.port}/{p.service}" for p in result.open_ports[:6])
vulns = len(result.vulnerabilities)
vuln_str = f" {RED}{vulns} vulns{RESET}" if vulns else ""
print(f" {alive} {name:<16} {sc}[{result.score:3d}]{RESET} ports: {ports or 'none'}{vuln_str}")
print()
# 2.5. Remote hardening audits for alive fleet devices
alive_fleet = [r for r in fleet_results if r.alive and r.hostname != "alexandria"]
harden_reports = {}
if alive_fleet:
print(f"{WHITE}Phase 2b: Remote Hardening Audits (SSH){RESET}")
print(f"{'' * 50}")
for r in alive_fleet:
name = r.hostname
ip = r.ip
report_r = auditor.audit_remote(ip, "blackroad")
harden_reports[name] = report_r
hc = grade_color(report_r.grade)
print(f" {name:<16} {hc}{report_r.grade}{RESET} ({report_r.score}/100) {report_r.checks_passed}/{report_r.checks_run} checks {len(report_r.findings)} findings")
print()
# 3. Subnet discovery
print(f"{WHITE}Phase 3: Subnet Discovery (192.168.4.1-100){RESET}")
print(f"{'' * 50}")
subnet_results = scanner.scan_subnet("192.168.4", 1, 100)
known_ips = {info["local"] for info in FLEET.values()}
unknown = [r for r in subnet_results if r.ip not in known_ips]
known_found = [r for r in subnet_results if r.ip in known_ips]
print(f" Total hosts found: {len(subnet_results)}")
print(f" Known fleet: {GREEN}{len(known_found)}{RESET}")
print(f" Unknown: {AMBER}{len(unknown)}{RESET}")
print()
if unknown:
print(f" {AMBER}Unknown Devices:{RESET}")
for u in unknown:
ports = ", ".join(f"{p.port}/{p.service}" for p in u.open_ports[:6])
risk = RED if u.open_ports else DIM
label = "SERVICES EXPOSED" if u.open_ports else "stealth/filtered"
print(f" {risk}{u.ip:<16}{RESET} {ports or label}")
print()
# 4. Comprehensive Summary
all_vulns = []
for r in fleet_results:
for v in r.vulnerabilities:
v["host"] = r.hostname
all_vulns.append(v)
# Include remote hardening findings
remote_findings = []
for name, hr in harden_reports.items():
for f in hr.findings:
if f.severity in ("critical", "high", "medium"):
remote_findings.append((name, f))
total_findings = len(local_report.findings) + len(all_vulns) + len(remote_findings)
critical = sum(1 for v in all_vulns if v.get("severity") == "critical")
critical += sum(1 for f in local_report.findings if f.severity == "critical")
critical += sum(1 for _, f in remote_findings if f.severity == "critical")
high = sum(1 for v in all_vulns if v.get("severity") == "high")
high += sum(1 for f in local_report.findings if f.severity == "high")
high += sum(1 for _, f in remote_findings if f.severity == "high")
alive_count = sum(1 for r in fleet_results if r.alive)
dead_count = len(fleet_results) - alive_count
alive_scores = [r.score for r in fleet_results if r.alive]
avg_score = sum(alive_scores) / len(alive_scores) if alive_scores else 0
# Device registry update
dev_reg = DeviceRegistry()
for r in fleet_results:
if r.alive:
dev_reg.update_score(r.hostname, r.score)
dev_reg.heartbeat(r.hostname)
else:
dev_reg.mark_offline(r.hostname)
dev_reg.update_score("alexandria", local_report.score)
dev_reg.heartbeat("alexandria")
dev_reg.close()
print(f"{PINK}{'' * 70}{RESET}")
print(f"{WHITE}BLACKROAD SECURITY REPORT — {time.strftime('%Y-%m-%d %H:%M')}{RESET}")
print(f"{PINK}{'' * 70}{RESET}")
print()
# Fleet overview
print(f" {WHITE}FLEET STATUS{RESET}")
print(f" {'' * 50}")
print(f" Online: {GREEN}{alive_count}{RESET}/{len(FLEET)} devices")
print(f" Offline: {RED}{dead_count}{RESET} devices")
for r in fleet_results:
alive_str = f"{GREEN}UP{RESET}" if r.alive else f"{RED}DOWN{RESET}"
sc = score_color(r.score) if r.alive else DIM
ports_str = ", ".join(f"{p.port}" for p in r.open_ports[:6]) if r.open_ports else "---"
grade_r = harden_reports.get(r.hostname)
harden_str = f" harden:{grade_color(grade_r.grade)}{grade_r.grade}{RESET}" if grade_r else ""
print(f" {alive_str} {r.hostname:<14} {sc}[{r.score:3d}]{RESET} ports: {ports_str}{harden_str}")
print()
# Local machine
print(f" {WHITE}LOCAL MACHINE (alexandria){RESET}")
print(f" {'' * 50}")
gc = grade_color(local_report.grade)
print(f" Score: {gc}{local_report.score}/100 ({local_report.grade}){RESET}")
print(f" Checks: {local_report.checks_passed}/{local_report.checks_run} passed")
for f in local_report.findings:
if f.severity != "info":
sc2 = severity_color(f.severity)
print(f" {sc2}[{f.severity.upper():8}]{RESET} {f.title}")
print()
# Network
print(f" {WHITE}NETWORK{RESET}")
print(f" {'' * 50}")
print(f" Subnet hosts: {len(subnet_results)} found on 192.168.4.0/24")
print(f" Known fleet: {GREEN}{len(known_found)}{RESET}")
print(f" Unknown: {AMBER}{len(unknown)}{RESET}")
risky_unknown = [u for u in unknown if u.open_ports]
if risky_unknown:
print(f" Risky unknown: {RED}{len(risky_unknown)}{RESET} (with open ports)")
for u in risky_unknown:
ports = ", ".join(f"{p.port}/{p.service}" for p in u.open_ports)
print(f" {RED}>{RESET} {u.ip}: {ports}")
print()
# Vulnerabilities
print(f" {WHITE}VULNERABILITIES{RESET}")
print(f" {'' * 50}")
print(f" Total findings: {total_findings}")
print(f" Critical: {RED}{critical}{RESET}")
print(f" High: {AMBER}{high}{RESET}")
print()
if critical > 0 or high > 0:
print(f" {RED}ACTION REQUIRED:{RESET}")
# Port scan vulns
for v in sorted(all_vulns, key=lambda x: {"critical": 0, "high": 1, "medium": 2}.get(x.get("severity", ""), 3)):
if v.get("severity") in ("critical", "high"):
sc2 = severity_color(v["severity"])
print(f" {sc2}[{v['severity'].upper():8}]{RESET} {v.get('host', '')}: {v.get('issue', '')}")
if v.get("fix"):
print(f" {CYAN}Fix:{RESET} {v['fix']}")
# Remote hardening vulns
for name, f in sorted(remote_findings, key=lambda x: {"critical": 0, "high": 1, "medium": 2}.get(x[1].severity, 3)):
if f.severity in ("critical", "high"):
sc2 = severity_color(f.severity)
auto = f" {GREEN}[auto-fix]{RESET}" if f.automated else ""
print(f" {sc2}[{f.severity.upper():8}]{RESET} {name}: {f.title}{auto}")
print(f" {CYAN}Fix:{RESET} {f.fix}")
# Local vulns
for f in local_report.findings:
if f.severity in ("critical", "high"):
sc2 = severity_color(f.severity)
print(f" {sc2}[{f.severity.upper():8}]{RESET} localhost: {f.title}")
print(f" {CYAN}Fix:{RESET} {f.fix}")
print()
# Overall score
overall = int((avg_score + local_report.score) / 2) if alive_scores else local_report.score
oc = score_color(overall)
og = "A" if overall >= 90 else "B" if overall >= 80 else "C" if overall >= 70 else "D" if overall >= 60 else "F"
print(f" {PINK}{'' * 50}{RESET}")
print(f" {WHITE}OVERALL SECURITY GRADE:{RESET} {grade_color(og)}{og}{RESET} ({oc}{overall}/100{RESET})")
print(f" {PINK}{'' * 50}{RESET}")
print()
print(f" {DIM}All {len(fleet_results) + len(subnet_results) + 1} scans SHA-2048 signed{RESET}")
print(f" {DIM}Stored: ~/.roadchain-l1/security-scans.db{RESET}")
print(f" {DIM}Devices: ~/.roadchain-l1/device-identity.db{RESET}")
print(f" {DIM}attack yourself → defend yourself{RESET}")
scanner.close()
def _print_scan_result(result):
"""Pretty-print a scan result."""
alive = f"{GREEN}UP{RESET}" if result.alive else f"{RED}DOWN{RESET}"
sc = score_color(result.score)
print(f" Host: {result.hostname or result.host}")
print(f" IP: {result.ip}")
print(f" Status: {alive}")
print(f" Score: {sc}{result.score}/100{RESET}")
print(f" Scan ID: {result.short_id}")
print(f" Time: {result.scan_time:.2f}s")
print()
if result.open_ports:
print(f" {WHITE}Open Ports:{RESET}")
for p in result.open_ports:
version = f" ({p.version})" if p.version else ""
banner_info = f" [{p.banner[:40]}]" if p.banner else ""
print(f" {p.port:>5}/tcp {p.service:<12} {p.state}{version}{banner_info}")
print()
if result.vulnerabilities:
print(f" {RED}Vulnerabilities:{RESET}")
for v in result.vulnerabilities:
sc2 = severity_color(v.get("severity", "info"))
print(f" {sc2}[{v.get('severity', 'info').upper()}]{RESET} {v.get('issue', v.get('detail', ''))}")
if v.get("fix"):
print(f" {CYAN}Fix:{RESET} {v['fix']}")
print()
if result.notes:
print(f" {DIM}Notes:{RESET}")
for n in result.notes:
print(f" {DIM}{n}{RESET}")
# ── Main ──────────────────────────────────────────────────────────────
if __name__ == "__main__":
args = sys.argv[1:]
if "--local" in args:
cmd_local_audit()
elif "--scan" in args:
idx = args.index("--scan")
host = args[idx + 1] if idx + 1 < len(args) else None
if not host:
print("Usage: --scan <host>")
sys.exit(1)
cmd_scan_host(host)
elif "--discover" in args:
cmd_discover()
elif "--fleet" in args:
cmd_fleet_scan()
elif "--harden" in args:
idx = args.index("--harden")
host = args[idx + 1] if idx + 1 < len(args) else None
if not host:
print("Usage: --harden <host>")
sys.exit(1)
cmd_harden_remote(host)
elif "--scores" in args:
cmd_scores()
elif "--alerts" in args:
cmd_alerts()
elif "--report" in args:
cmd_full_report()
else:
# Default: full fleet scan
cmd_fleet_scan()