mirror of
https://github.com/blackboxprogramming/BlackRoad-Operating-System.git
synced 2026-03-17 04:57:15 -05:00
This commit introduces the QLM system - a stateful semantic layer for
tracking HI (Human Intelligence), AI (Agent Intelligence), and QI
(Quantum/Emergent Intelligence) in BlackRoad OS.
Core Features:
- HI/AI/QI intelligence layer modeling
- Event-driven state management
- QI emergence detection (agent self-correction, feedback loops, etc.)
- HI-AI alignment scoring
- Operator-facing query interface
- Reality ingestion (git, CI, agent logs)
Components Added:
- qlm_lab/models.py: Core data models (Actor, QLMEvent, QIEmergence, etc.)
- qlm_lab/state.py: State management and transition tracking
- qlm_lab/api.py: Public QLMInterface API
- qlm_lab/ingestion/: Git, CI, and agent log connectors
- qlm_lab/experiments/: Alignment and emergence validation
- qlm_lab/visualization.py: Timeline, actor graph, alignment plots
- qlm_lab/demo.py: Interactive demo script
- tests/test_qlm_core.py: Comprehensive test suite
- docs/QLM.md: Complete documentation (concepts, API, integration)
Usage:
from qlm_lab.api import QLMInterface
qlm = QLMInterface()
qlm.record_operator_intent("Build feature X")
qlm.record_agent_execution("agent-1", "Implement X", "task-1")
summary = qlm.get_summary(days=7)
Run:
python -m qlm_lab.demo
python -m qlm_lab.experiments.alignment_detection
pytest tests/test_qlm_core.py -v
Integrates with:
- cognitive/intent_graph.py (intent tracking)
- cognitive/agent_coordination.py (multi-agent coordination)
- operator_engine/scheduler.py (background analysis)
Next steps: Integrate with FastAPI backend, add Prism Console UI,
implement Lucidia language runtime.
511 lines
17 KiB
Python
511 lines
17 KiB
Python
"""
|
|
QLM State Management - The core state machine
|
|
|
|
QLMState represents the complete state of the Quantum Intelligence system at a point in time.
|
|
|
|
State includes:
|
|
- All intelligence layers (HI, AI, QI)
|
|
- All actors and their current state
|
|
- Event history
|
|
- Detected QI emergences
|
|
- Metrics
|
|
|
|
State transitions happen when events are processed.
|
|
Each event can trigger:
|
|
- Actor state changes
|
|
- New QI emergence detection
|
|
- Metric updates
|
|
- Causal graph updates
|
|
"""
|
|
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Any
|
|
import json
|
|
|
|
from qlm_lab.models import (
|
|
IntelligenceType,
|
|
IntelligenceLayer,
|
|
Actor,
|
|
ActorType,
|
|
ActorRole,
|
|
ActorState,
|
|
QLMEvent,
|
|
EventType,
|
|
QIEmergence,
|
|
QLMMetrics,
|
|
QI_PATTERNS,
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class StateTransition:
|
|
"""
|
|
Represents a state transition caused by an event.
|
|
|
|
This allows introspection: "What changed when event X happened?"
|
|
"""
|
|
event_id: str
|
|
timestamp: datetime
|
|
before_snapshot: Dict[str, Any]
|
|
after_snapshot: Dict[str, Any]
|
|
changes: List[str] # Human-readable list of changes
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"event_id": self.event_id,
|
|
"timestamp": self.timestamp.isoformat(),
|
|
"before_snapshot": self.before_snapshot,
|
|
"after_snapshot": self.after_snapshot,
|
|
"changes": self.changes,
|
|
}
|
|
|
|
|
|
class QLMState:
|
|
"""
|
|
The complete state of the Quantum Intelligence system.
|
|
|
|
This is the "brain" of QLM. It tracks everything:
|
|
- All intelligence layers
|
|
- All actors
|
|
- All events
|
|
- All emergences
|
|
- All metrics
|
|
|
|
Key methods:
|
|
- ingest_event(): Process a new event and update state
|
|
- query(): Answer questions about current state
|
|
- explain_transition(): Explain why state changed
|
|
- detect_qi_emergence(): Find emergent patterns
|
|
- calculate_metrics(): Compute system metrics
|
|
"""
|
|
|
|
def __init__(self, intent_graph=None, agent_coordinator=None):
|
|
"""
|
|
Initialize QLM state.
|
|
|
|
Can integrate with existing cognitive systems:
|
|
- intent_graph: From cognitive.intent_graph
|
|
- agent_coordinator: From cognitive.agent_coordination
|
|
"""
|
|
# Integration with existing cognitive layer
|
|
self.intent_graph = intent_graph
|
|
self.agent_coordinator = agent_coordinator
|
|
|
|
# Intelligence layers
|
|
self.layers: Dict[IntelligenceType, IntelligenceLayer] = {
|
|
IntelligenceType.HI: IntelligenceLayer(type=IntelligenceType.HI),
|
|
IntelligenceType.AI: IntelligenceLayer(type=IntelligenceType.AI),
|
|
IntelligenceType.QI: IntelligenceLayer(type=IntelligenceType.QI),
|
|
}
|
|
|
|
# Event history (ordered by timestamp)
|
|
self.events: List[QLMEvent] = []
|
|
|
|
# Detected QI emergences
|
|
self.emergences: List[QIEmergence] = []
|
|
|
|
# State transition history
|
|
self.transitions: List[StateTransition] = []
|
|
|
|
# Metrics
|
|
self.metrics = QLMMetrics()
|
|
|
|
# Timestamps
|
|
self.created_at = datetime.now()
|
|
self.updated_at = datetime.now()
|
|
|
|
def register_actor(self, actor: Actor) -> None:
|
|
"""
|
|
Register an actor in the appropriate intelligence layer.
|
|
|
|
Human → HI layer
|
|
Agent → AI layer
|
|
System → (tracked but not in a specific layer)
|
|
"""
|
|
if actor.actor_type == ActorType.HUMAN:
|
|
self.layers[IntelligenceType.HI].add_actor(actor)
|
|
self.metrics.hi_actors += 1
|
|
elif actor.actor_type == ActorType.AGENT:
|
|
self.layers[IntelligenceType.AI].add_actor(actor)
|
|
self.metrics.ai_actors += 1
|
|
elif actor.actor_type == ActorType.SYSTEM:
|
|
# System actors are tracked but don't belong to HI/AI/QI layers
|
|
self.metrics.system_actors += 1
|
|
|
|
def ingest_event(self, event: QLMEvent) -> None:
|
|
"""
|
|
Ingest a new event and update state.
|
|
|
|
This is the core state transition function.
|
|
|
|
Steps:
|
|
1. Capture before-state snapshot
|
|
2. Add event to history
|
|
3. Update actor states
|
|
4. Detect QI emergence
|
|
5. Update metrics
|
|
6. Capture after-state snapshot
|
|
7. Record transition
|
|
"""
|
|
# 1. Before snapshot
|
|
before = self._create_snapshot()
|
|
|
|
# 2. Add event
|
|
self.events.append(event)
|
|
|
|
# 3. Update actor states
|
|
self._update_actor_state(event)
|
|
|
|
# 4. Update layer metrics
|
|
if event.source_layer == IntelligenceType.HI:
|
|
self.layers[IntelligenceType.HI].total_events += 1
|
|
self.metrics.hi_events += 1
|
|
elif event.source_layer == IntelligenceType.AI:
|
|
self.layers[IntelligenceType.AI].total_events += 1
|
|
self.metrics.ai_events += 1
|
|
elif event.source_layer == IntelligenceType.QI:
|
|
self.layers[IntelligenceType.QI].total_events += 1
|
|
self.metrics.qi_events += 1
|
|
|
|
# 5. Detect QI emergence
|
|
emergence = self._detect_qi_emergence(event)
|
|
if emergence:
|
|
self.emergences.append(emergence)
|
|
self.metrics.qi_events += 1
|
|
|
|
# 6. Update operator metrics
|
|
if event.event_type == EventType.OPERATOR_APPROVAL:
|
|
self.metrics.operator_approvals += 1
|
|
elif event.event_type == EventType.OPERATOR_VETO:
|
|
self.metrics.operator_vetoes += 1
|
|
elif event.event_type == EventType.OPERATOR_QUERY:
|
|
self.metrics.operator_queries += 1
|
|
|
|
# 7. After snapshot
|
|
after = self._create_snapshot()
|
|
|
|
# 8. Record transition
|
|
changes = self._compute_changes(before, after)
|
|
transition = StateTransition(
|
|
event_id=event.id,
|
|
timestamp=event.timestamp,
|
|
before_snapshot=before,
|
|
after_snapshot=after,
|
|
changes=changes,
|
|
)
|
|
self.transitions.append(transition)
|
|
|
|
self.updated_at = datetime.now()
|
|
|
|
def _update_actor_state(self, event: QLMEvent) -> None:
|
|
"""Update actor state based on event"""
|
|
actor_id = event.actor_id
|
|
|
|
# Find actor in any layer
|
|
actor = None
|
|
for layer in self.layers.values():
|
|
if actor_id in layer.actors:
|
|
actor = layer.actors[actor_id]
|
|
break
|
|
|
|
if not actor:
|
|
return
|
|
|
|
# Update actor state based on event type
|
|
if event.event_type in [EventType.AGENT_EXECUTION, EventType.OPERATOR_INTENT]:
|
|
actor.state = ActorState.ACTIVE
|
|
actor.current_task_id = event.task_id
|
|
|
|
elif event.event_type in [EventType.AGENT_COMPLETION]:
|
|
actor.state = ActorState.IDLE
|
|
actor.current_task_id = None
|
|
|
|
elif event.event_type == EventType.AGENT_ERROR:
|
|
actor.state = ActorState.BLOCKED
|
|
|
|
actor.last_active = event.timestamp
|
|
|
|
def _detect_qi_emergence(self, event: QLMEvent) -> Optional[QIEmergence]:
|
|
"""
|
|
Detect if this event (combined with recent events) represents QI emergence.
|
|
|
|
This is where the magic happens: detecting when 1 + 1 = 3.
|
|
"""
|
|
# Look at recent events (last 10)
|
|
recent_events = self.events[-10:]
|
|
|
|
# Check each known QI pattern
|
|
for pattern_name, pattern_def in QI_PATTERNS.items():
|
|
if self._matches_pattern(recent_events, pattern_name, pattern_def):
|
|
return QIEmergence(
|
|
pattern_name=pattern_name,
|
|
trigger_events=[e.id for e in recent_events[-3:]], # Last 3 events
|
|
confidence=0.8, # TODO: Implement proper confidence scoring
|
|
explanation=pattern_def["description"],
|
|
impact_score=self._calculate_impact(pattern_name),
|
|
)
|
|
|
|
return None
|
|
|
|
def _matches_pattern(
|
|
self, events: List[QLMEvent], pattern_name: str, pattern_def: Dict
|
|
) -> bool:
|
|
"""Check if a sequence of events matches a QI pattern"""
|
|
if pattern_name == "agent_self_correction":
|
|
# Look for: AGENT_ERROR followed by AGENT_EXECUTION with same task
|
|
for i in range(len(events) - 1):
|
|
if (
|
|
events[i].event_type == EventType.AGENT_ERROR
|
|
and events[i + 1].event_type == EventType.AGENT_EXECUTION
|
|
and events[i].task_id == events[i + 1].task_id
|
|
):
|
|
return True
|
|
|
|
elif pattern_name == "operator_feedback_loop":
|
|
# Look for: OPERATOR_INTENT → AGENT_COMPLETION → OPERATOR_APPROVAL
|
|
for i in range(len(events) - 2):
|
|
if (
|
|
events[i].event_type == EventType.OPERATOR_INTENT
|
|
and events[i + 1].event_type == EventType.AGENT_COMPLETION
|
|
and events[i + 2].event_type == EventType.OPERATOR_APPROVAL
|
|
):
|
|
self.metrics.feedback_loop_count += 1
|
|
return True
|
|
|
|
# TODO: Implement other pattern detectors
|
|
|
|
return False
|
|
|
|
def _calculate_impact(self, pattern_name: str) -> float:
|
|
"""Calculate impact score for an emergence pattern"""
|
|
significance_map = {
|
|
"very_high": 1.0,
|
|
"high": 0.8,
|
|
"medium": 0.5,
|
|
"low": 0.3,
|
|
}
|
|
pattern_def = QI_PATTERNS.get(pattern_name, {})
|
|
significance = pattern_def.get("significance", "medium")
|
|
return significance_map.get(significance, 0.5)
|
|
|
|
def _create_snapshot(self) -> Dict[str, Any]:
|
|
"""Create a snapshot of current state"""
|
|
return {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"total_events": len(self.events),
|
|
"total_emergences": len(self.emergences),
|
|
"metrics": self.metrics.to_dict(),
|
|
}
|
|
|
|
def _compute_changes(
|
|
self, before: Dict[str, Any], after: Dict[str, Any]
|
|
) -> List[str]:
|
|
"""Compute human-readable changes between states"""
|
|
changes = []
|
|
|
|
if after["total_events"] > before["total_events"]:
|
|
changes.append(f"New event added (total: {after['total_events']})")
|
|
|
|
if after["total_emergences"] > before["total_emergences"]:
|
|
changes.append(
|
|
f"QI emergence detected (total: {after['total_emergences']})"
|
|
)
|
|
|
|
# Compare metrics
|
|
before_metrics = before["metrics"]
|
|
after_metrics = after["metrics"]
|
|
|
|
if after_metrics["hi_events"] > before_metrics["hi_events"]:
|
|
changes.append("Operator activity increased")
|
|
|
|
if after_metrics["ai_events"] > before_metrics["ai_events"]:
|
|
changes.append("Agent activity increased")
|
|
|
|
return changes
|
|
|
|
def query(self, query_type: str, **kwargs) -> Any:
|
|
"""
|
|
Query the QLM state.
|
|
|
|
Examples:
|
|
- query("active_actors")
|
|
- query("events_by_type", event_type=EventType.OPERATOR_INTENT)
|
|
- query("emergences_by_pattern", pattern="agent_self_correction")
|
|
- query("metrics_summary")
|
|
"""
|
|
if query_type == "active_actors":
|
|
active = []
|
|
for layer in self.layers.values():
|
|
active.extend(layer.get_active_actors())
|
|
return active
|
|
|
|
elif query_type == "events_by_type":
|
|
event_type = kwargs.get("event_type")
|
|
return [e for e in self.events if e.event_type == event_type]
|
|
|
|
elif query_type == "events_by_actor":
|
|
actor_id = kwargs.get("actor_id")
|
|
return [e for e in self.events if e.actor_id == actor_id]
|
|
|
|
elif query_type == "events_in_timerange":
|
|
start = kwargs.get("start", datetime.now() - timedelta(days=1))
|
|
end = kwargs.get("end", datetime.now())
|
|
return [e for e in self.events if start <= e.timestamp <= end]
|
|
|
|
elif query_type == "emergences_by_pattern":
|
|
pattern = kwargs.get("pattern")
|
|
return [em for em in self.emergences if em.pattern_name == pattern]
|
|
|
|
elif query_type == "metrics_summary":
|
|
return self.metrics.to_dict()
|
|
|
|
elif query_type == "recent_transitions":
|
|
limit = kwargs.get("limit", 10)
|
|
return self.transitions[-limit:]
|
|
|
|
return None
|
|
|
|
def explain_transition(self, event_id: str) -> Optional[StateTransition]:
|
|
"""Explain what happened when a specific event occurred"""
|
|
for transition in self.transitions:
|
|
if transition.event_id == event_id:
|
|
return transition
|
|
return None
|
|
|
|
def calculate_alignment(self) -> float:
|
|
"""
|
|
Calculate HI-AI alignment.
|
|
|
|
This measures: "Is AI doing what the Operator intended?"
|
|
|
|
Approach:
|
|
- Look at OPERATOR_INTENT events
|
|
- Look at subsequent AGENT_COMPLETION events
|
|
- Check if agent actions align with intent
|
|
- Return alignment score (0.0 to 1.0)
|
|
"""
|
|
operator_intents = [
|
|
e for e in self.events if e.event_type == EventType.OPERATOR_INTENT
|
|
]
|
|
|
|
if not operator_intents:
|
|
return 1.0 # No intents = perfect alignment (vacuous truth)
|
|
|
|
aligned_count = 0
|
|
|
|
for intent_event in operator_intents:
|
|
# Find completions after this intent
|
|
completions_after = [
|
|
e
|
|
for e in self.events
|
|
if e.event_type == EventType.AGENT_COMPLETION
|
|
and e.timestamp > intent_event.timestamp
|
|
and e.intent_node_id == intent_event.intent_node_id
|
|
]
|
|
|
|
if completions_after:
|
|
# Check if any completion was approved
|
|
approvals = [
|
|
e
|
|
for e in self.events
|
|
if e.event_type == EventType.OPERATOR_APPROVAL
|
|
and e.timestamp > completions_after[0].timestamp
|
|
and e.intent_node_id == intent_event.intent_node_id
|
|
]
|
|
|
|
if approvals:
|
|
aligned_count += 1
|
|
|
|
alignment = aligned_count / len(operator_intents) if operator_intents else 1.0
|
|
self.metrics.hi_ai_alignment = alignment
|
|
return alignment
|
|
|
|
def summarize_for_operator(self, days: int = 7) -> str:
|
|
"""
|
|
Create a human-readable summary for the Operator.
|
|
|
|
This is what Alexa sees when she asks: "What happened this week?"
|
|
"""
|
|
cutoff = datetime.now() - timedelta(days=days)
|
|
recent_events = [e for e in self.events if e.timestamp >= cutoff]
|
|
|
|
# Count events by type
|
|
event_counts = {}
|
|
for event in recent_events:
|
|
event_type = event.event_type.value
|
|
event_counts[event_type] = event_counts.get(event_type, 0) + 1
|
|
|
|
# Get emergences
|
|
recent_emergences = [
|
|
em for em in self.emergences if em.timestamp >= cutoff
|
|
]
|
|
|
|
# Calculate alignment
|
|
alignment = self.calculate_alignment()
|
|
|
|
summary = f"""
|
|
QLM State Summary (Last {days} Days)
|
|
{'=' * 50}
|
|
|
|
📊 Activity Overview:
|
|
Total Events: {len(recent_events)}
|
|
HI (Operator) Events: {sum(1 for e in recent_events if e.source_layer == IntelligenceType.HI)}
|
|
AI (Agent) Events: {sum(1 for e in recent_events if e.source_layer == IntelligenceType.AI)}
|
|
System Events: {sum(1 for e in recent_events if 'system' in e.event_type.value)}
|
|
|
|
✨ QI Emergence:
|
|
Emergent Patterns Detected: {len(recent_emergences)}
|
|
"""
|
|
|
|
if recent_emergences:
|
|
summary += " Notable Emergences:\n"
|
|
for em in recent_emergences[:5]: # Top 5
|
|
summary += f" - {em.pattern_name}: {em.explanation}\n"
|
|
|
|
summary += f"""
|
|
🎯 Alignment:
|
|
HI-AI Alignment Score: {alignment:.2%}
|
|
Operator Approvals: {self.metrics.operator_approvals}
|
|
Operator Vetoes: {self.metrics.operator_vetoes}
|
|
Feedback Loops: {self.metrics.feedback_loop_count}
|
|
|
|
👥 Active Actors:
|
|
HI Layer: {len(self.layers[IntelligenceType.HI].get_active_actors())}
|
|
AI Layer: {len(self.layers[IntelligenceType.AI].get_active_actors())}
|
|
|
|
📈 Top Event Types:
|
|
"""
|
|
|
|
for event_type, count in sorted(
|
|
event_counts.items(), key=lambda x: x[1], reverse=True
|
|
)[:5]:
|
|
summary += f" {event_type}: {count}\n"
|
|
|
|
return summary
|
|
|
|
def export_json(self, file_path: str) -> None:
|
|
"""Export QLM state to JSON"""
|
|
data = {
|
|
"created_at": self.created_at.isoformat(),
|
|
"updated_at": self.updated_at.isoformat(),
|
|
"layers": {
|
|
layer_type.value: layer.to_dict()
|
|
for layer_type, layer in self.layers.items()
|
|
},
|
|
"events": [e.to_dict() for e in self.events],
|
|
"emergences": [em.to_dict() for em in self.emergences],
|
|
"metrics": self.metrics.to_dict(),
|
|
}
|
|
|
|
with open(file_path, "w") as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
def import_json(self, file_path: str) -> None:
|
|
"""Import QLM state from JSON"""
|
|
with open(file_path, "r") as f:
|
|
data = json.load(f)
|
|
|
|
# TODO: Implement full deserialization
|
|
pass
|