Files
blackroad-operating-system/qlm_lab/state.py
Claude e478add607 Add complete QLM (Quantum Language Model) implementation
This commit introduces the QLM system - a stateful semantic layer for
tracking HI (Human Intelligence), AI (Agent Intelligence), and QI
(Quantum/Emergent Intelligence) in BlackRoad OS.

Core Features:
- HI/AI/QI intelligence layer modeling
- Event-driven state management
- QI emergence detection (agent self-correction, feedback loops, etc.)
- HI-AI alignment scoring
- Operator-facing query interface
- Reality ingestion (git, CI, agent logs)

Components Added:
- qlm_lab/models.py: Core data models (Actor, QLMEvent, QIEmergence, etc.)
- qlm_lab/state.py: State management and transition tracking
- qlm_lab/api.py: Public QLMInterface API
- qlm_lab/ingestion/: Git, CI, and agent log connectors
- qlm_lab/experiments/: Alignment and emergence validation
- qlm_lab/visualization.py: Timeline, actor graph, alignment plots
- qlm_lab/demo.py: Interactive demo script
- tests/test_qlm_core.py: Comprehensive test suite
- docs/QLM.md: Complete documentation (concepts, API, integration)

Usage:
  from qlm_lab.api import QLMInterface

  qlm = QLMInterface()
  qlm.record_operator_intent("Build feature X")
  qlm.record_agent_execution("agent-1", "Implement X", "task-1")
  summary = qlm.get_summary(days=7)

Run:
  python -m qlm_lab.demo
  python -m qlm_lab.experiments.alignment_detection
  pytest tests/test_qlm_core.py -v

Integrates with:
- cognitive/intent_graph.py (intent tracking)
- cognitive/agent_coordination.py (multi-agent coordination)
- operator_engine/scheduler.py (background analysis)

Next steps: Integrate with FastAPI backend, add Prism Console UI,
implement Lucidia language runtime.
2025-11-18 08:15:06 +00:00

511 lines
17 KiB
Python

"""
QLM State Management - The core state machine
QLMState represents the complete state of the Quantum Intelligence system at a point in time.
State includes:
- All intelligence layers (HI, AI, QI)
- All actors and their current state
- Event history
- Detected QI emergences
- Metrics
State transitions happen when events are processed.
Each event can trigger:
- Actor state changes
- New QI emergence detection
- Metric updates
- Causal graph updates
"""
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
import json
from qlm_lab.models import (
IntelligenceType,
IntelligenceLayer,
Actor,
ActorType,
ActorRole,
ActorState,
QLMEvent,
EventType,
QIEmergence,
QLMMetrics,
QI_PATTERNS,
)
@dataclass
class StateTransition:
"""
Represents a state transition caused by an event.
This allows introspection: "What changed when event X happened?"
"""
event_id: str
timestamp: datetime
before_snapshot: Dict[str, Any]
after_snapshot: Dict[str, Any]
changes: List[str] # Human-readable list of changes
def to_dict(self) -> Dict[str, Any]:
return {
"event_id": self.event_id,
"timestamp": self.timestamp.isoformat(),
"before_snapshot": self.before_snapshot,
"after_snapshot": self.after_snapshot,
"changes": self.changes,
}
class QLMState:
"""
The complete state of the Quantum Intelligence system.
This is the "brain" of QLM. It tracks everything:
- All intelligence layers
- All actors
- All events
- All emergences
- All metrics
Key methods:
- ingest_event(): Process a new event and update state
- query(): Answer questions about current state
- explain_transition(): Explain why state changed
- detect_qi_emergence(): Find emergent patterns
- calculate_metrics(): Compute system metrics
"""
def __init__(self, intent_graph=None, agent_coordinator=None):
"""
Initialize QLM state.
Can integrate with existing cognitive systems:
- intent_graph: From cognitive.intent_graph
- agent_coordinator: From cognitive.agent_coordination
"""
# Integration with existing cognitive layer
self.intent_graph = intent_graph
self.agent_coordinator = agent_coordinator
# Intelligence layers
self.layers: Dict[IntelligenceType, IntelligenceLayer] = {
IntelligenceType.HI: IntelligenceLayer(type=IntelligenceType.HI),
IntelligenceType.AI: IntelligenceLayer(type=IntelligenceType.AI),
IntelligenceType.QI: IntelligenceLayer(type=IntelligenceType.QI),
}
# Event history (ordered by timestamp)
self.events: List[QLMEvent] = []
# Detected QI emergences
self.emergences: List[QIEmergence] = []
# State transition history
self.transitions: List[StateTransition] = []
# Metrics
self.metrics = QLMMetrics()
# Timestamps
self.created_at = datetime.now()
self.updated_at = datetime.now()
def register_actor(self, actor: Actor) -> None:
"""
Register an actor in the appropriate intelligence layer.
Human → HI layer
Agent → AI layer
System → (tracked but not in a specific layer)
"""
if actor.actor_type == ActorType.HUMAN:
self.layers[IntelligenceType.HI].add_actor(actor)
self.metrics.hi_actors += 1
elif actor.actor_type == ActorType.AGENT:
self.layers[IntelligenceType.AI].add_actor(actor)
self.metrics.ai_actors += 1
elif actor.actor_type == ActorType.SYSTEM:
# System actors are tracked but don't belong to HI/AI/QI layers
self.metrics.system_actors += 1
def ingest_event(self, event: QLMEvent) -> None:
"""
Ingest a new event and update state.
This is the core state transition function.
Steps:
1. Capture before-state snapshot
2. Add event to history
3. Update actor states
4. Detect QI emergence
5. Update metrics
6. Capture after-state snapshot
7. Record transition
"""
# 1. Before snapshot
before = self._create_snapshot()
# 2. Add event
self.events.append(event)
# 3. Update actor states
self._update_actor_state(event)
# 4. Update layer metrics
if event.source_layer == IntelligenceType.HI:
self.layers[IntelligenceType.HI].total_events += 1
self.metrics.hi_events += 1
elif event.source_layer == IntelligenceType.AI:
self.layers[IntelligenceType.AI].total_events += 1
self.metrics.ai_events += 1
elif event.source_layer == IntelligenceType.QI:
self.layers[IntelligenceType.QI].total_events += 1
self.metrics.qi_events += 1
# 5. Detect QI emergence
emergence = self._detect_qi_emergence(event)
if emergence:
self.emergences.append(emergence)
self.metrics.qi_events += 1
# 6. Update operator metrics
if event.event_type == EventType.OPERATOR_APPROVAL:
self.metrics.operator_approvals += 1
elif event.event_type == EventType.OPERATOR_VETO:
self.metrics.operator_vetoes += 1
elif event.event_type == EventType.OPERATOR_QUERY:
self.metrics.operator_queries += 1
# 7. After snapshot
after = self._create_snapshot()
# 8. Record transition
changes = self._compute_changes(before, after)
transition = StateTransition(
event_id=event.id,
timestamp=event.timestamp,
before_snapshot=before,
after_snapshot=after,
changes=changes,
)
self.transitions.append(transition)
self.updated_at = datetime.now()
def _update_actor_state(self, event: QLMEvent) -> None:
"""Update actor state based on event"""
actor_id = event.actor_id
# Find actor in any layer
actor = None
for layer in self.layers.values():
if actor_id in layer.actors:
actor = layer.actors[actor_id]
break
if not actor:
return
# Update actor state based on event type
if event.event_type in [EventType.AGENT_EXECUTION, EventType.OPERATOR_INTENT]:
actor.state = ActorState.ACTIVE
actor.current_task_id = event.task_id
elif event.event_type in [EventType.AGENT_COMPLETION]:
actor.state = ActorState.IDLE
actor.current_task_id = None
elif event.event_type == EventType.AGENT_ERROR:
actor.state = ActorState.BLOCKED
actor.last_active = event.timestamp
def _detect_qi_emergence(self, event: QLMEvent) -> Optional[QIEmergence]:
"""
Detect if this event (combined with recent events) represents QI emergence.
This is where the magic happens: detecting when 1 + 1 = 3.
"""
# Look at recent events (last 10)
recent_events = self.events[-10:]
# Check each known QI pattern
for pattern_name, pattern_def in QI_PATTERNS.items():
if self._matches_pattern(recent_events, pattern_name, pattern_def):
return QIEmergence(
pattern_name=pattern_name,
trigger_events=[e.id for e in recent_events[-3:]], # Last 3 events
confidence=0.8, # TODO: Implement proper confidence scoring
explanation=pattern_def["description"],
impact_score=self._calculate_impact(pattern_name),
)
return None
def _matches_pattern(
self, events: List[QLMEvent], pattern_name: str, pattern_def: Dict
) -> bool:
"""Check if a sequence of events matches a QI pattern"""
if pattern_name == "agent_self_correction":
# Look for: AGENT_ERROR followed by AGENT_EXECUTION with same task
for i in range(len(events) - 1):
if (
events[i].event_type == EventType.AGENT_ERROR
and events[i + 1].event_type == EventType.AGENT_EXECUTION
and events[i].task_id == events[i + 1].task_id
):
return True
elif pattern_name == "operator_feedback_loop":
# Look for: OPERATOR_INTENT → AGENT_COMPLETION → OPERATOR_APPROVAL
for i in range(len(events) - 2):
if (
events[i].event_type == EventType.OPERATOR_INTENT
and events[i + 1].event_type == EventType.AGENT_COMPLETION
and events[i + 2].event_type == EventType.OPERATOR_APPROVAL
):
self.metrics.feedback_loop_count += 1
return True
# TODO: Implement other pattern detectors
return False
def _calculate_impact(self, pattern_name: str) -> float:
"""Calculate impact score for an emergence pattern"""
significance_map = {
"very_high": 1.0,
"high": 0.8,
"medium": 0.5,
"low": 0.3,
}
pattern_def = QI_PATTERNS.get(pattern_name, {})
significance = pattern_def.get("significance", "medium")
return significance_map.get(significance, 0.5)
def _create_snapshot(self) -> Dict[str, Any]:
"""Create a snapshot of current state"""
return {
"timestamp": datetime.now().isoformat(),
"total_events": len(self.events),
"total_emergences": len(self.emergences),
"metrics": self.metrics.to_dict(),
}
def _compute_changes(
self, before: Dict[str, Any], after: Dict[str, Any]
) -> List[str]:
"""Compute human-readable changes between states"""
changes = []
if after["total_events"] > before["total_events"]:
changes.append(f"New event added (total: {after['total_events']})")
if after["total_emergences"] > before["total_emergences"]:
changes.append(
f"QI emergence detected (total: {after['total_emergences']})"
)
# Compare metrics
before_metrics = before["metrics"]
after_metrics = after["metrics"]
if after_metrics["hi_events"] > before_metrics["hi_events"]:
changes.append("Operator activity increased")
if after_metrics["ai_events"] > before_metrics["ai_events"]:
changes.append("Agent activity increased")
return changes
def query(self, query_type: str, **kwargs) -> Any:
"""
Query the QLM state.
Examples:
- query("active_actors")
- query("events_by_type", event_type=EventType.OPERATOR_INTENT)
- query("emergences_by_pattern", pattern="agent_self_correction")
- query("metrics_summary")
"""
if query_type == "active_actors":
active = []
for layer in self.layers.values():
active.extend(layer.get_active_actors())
return active
elif query_type == "events_by_type":
event_type = kwargs.get("event_type")
return [e for e in self.events if e.event_type == event_type]
elif query_type == "events_by_actor":
actor_id = kwargs.get("actor_id")
return [e for e in self.events if e.actor_id == actor_id]
elif query_type == "events_in_timerange":
start = kwargs.get("start", datetime.now() - timedelta(days=1))
end = kwargs.get("end", datetime.now())
return [e for e in self.events if start <= e.timestamp <= end]
elif query_type == "emergences_by_pattern":
pattern = kwargs.get("pattern")
return [em for em in self.emergences if em.pattern_name == pattern]
elif query_type == "metrics_summary":
return self.metrics.to_dict()
elif query_type == "recent_transitions":
limit = kwargs.get("limit", 10)
return self.transitions[-limit:]
return None
def explain_transition(self, event_id: str) -> Optional[StateTransition]:
"""Explain what happened when a specific event occurred"""
for transition in self.transitions:
if transition.event_id == event_id:
return transition
return None
def calculate_alignment(self) -> float:
"""
Calculate HI-AI alignment.
This measures: "Is AI doing what the Operator intended?"
Approach:
- Look at OPERATOR_INTENT events
- Look at subsequent AGENT_COMPLETION events
- Check if agent actions align with intent
- Return alignment score (0.0 to 1.0)
"""
operator_intents = [
e for e in self.events if e.event_type == EventType.OPERATOR_INTENT
]
if not operator_intents:
return 1.0 # No intents = perfect alignment (vacuous truth)
aligned_count = 0
for intent_event in operator_intents:
# Find completions after this intent
completions_after = [
e
for e in self.events
if e.event_type == EventType.AGENT_COMPLETION
and e.timestamp > intent_event.timestamp
and e.intent_node_id == intent_event.intent_node_id
]
if completions_after:
# Check if any completion was approved
approvals = [
e
for e in self.events
if e.event_type == EventType.OPERATOR_APPROVAL
and e.timestamp > completions_after[0].timestamp
and e.intent_node_id == intent_event.intent_node_id
]
if approvals:
aligned_count += 1
alignment = aligned_count / len(operator_intents) if operator_intents else 1.0
self.metrics.hi_ai_alignment = alignment
return alignment
def summarize_for_operator(self, days: int = 7) -> str:
"""
Create a human-readable summary for the Operator.
This is what Alexa sees when she asks: "What happened this week?"
"""
cutoff = datetime.now() - timedelta(days=days)
recent_events = [e for e in self.events if e.timestamp >= cutoff]
# Count events by type
event_counts = {}
for event in recent_events:
event_type = event.event_type.value
event_counts[event_type] = event_counts.get(event_type, 0) + 1
# Get emergences
recent_emergences = [
em for em in self.emergences if em.timestamp >= cutoff
]
# Calculate alignment
alignment = self.calculate_alignment()
summary = f"""
QLM State Summary (Last {days} Days)
{'=' * 50}
📊 Activity Overview:
Total Events: {len(recent_events)}
HI (Operator) Events: {sum(1 for e in recent_events if e.source_layer == IntelligenceType.HI)}
AI (Agent) Events: {sum(1 for e in recent_events if e.source_layer == IntelligenceType.AI)}
System Events: {sum(1 for e in recent_events if 'system' in e.event_type.value)}
✨ QI Emergence:
Emergent Patterns Detected: {len(recent_emergences)}
"""
if recent_emergences:
summary += " Notable Emergences:\n"
for em in recent_emergences[:5]: # Top 5
summary += f" - {em.pattern_name}: {em.explanation}\n"
summary += f"""
🎯 Alignment:
HI-AI Alignment Score: {alignment:.2%}
Operator Approvals: {self.metrics.operator_approvals}
Operator Vetoes: {self.metrics.operator_vetoes}
Feedback Loops: {self.metrics.feedback_loop_count}
👥 Active Actors:
HI Layer: {len(self.layers[IntelligenceType.HI].get_active_actors())}
AI Layer: {len(self.layers[IntelligenceType.AI].get_active_actors())}
📈 Top Event Types:
"""
for event_type, count in sorted(
event_counts.items(), key=lambda x: x[1], reverse=True
)[:5]:
summary += f" {event_type}: {count}\n"
return summary
def export_json(self, file_path: str) -> None:
"""Export QLM state to JSON"""
data = {
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
"layers": {
layer_type.value: layer.to_dict()
for layer_type, layer in self.layers.items()
},
"events": [e.to_dict() for e in self.events],
"emergences": [em.to_dict() for em in self.emergences],
"metrics": self.metrics.to_dict(),
}
with open(file_path, "w") as f:
json.dump(data, f, indent=2)
def import_json(self, file_path: str) -> None:
"""Import QLM state from JSON"""
with open(file_path, "r") as f:
data = json.load(f)
# TODO: Implement full deserialization
pass