Add complete QLM (Quantum Language Model) implementation

This commit introduces the QLM system - a stateful semantic layer for
tracking HI (Human Intelligence), AI (Agent Intelligence), and QI
(Quantum/Emergent Intelligence) in BlackRoad OS.

Core Features:
- HI/AI/QI intelligence layer modeling
- Event-driven state management
- QI emergence detection (agent self-correction, feedback loops, etc.)
- HI-AI alignment scoring
- Operator-facing query interface
- Reality ingestion (git, CI, agent logs)

Components Added:
- qlm_lab/models.py: Core data models (Actor, QLMEvent, QIEmergence, etc.)
- qlm_lab/state.py: State management and transition tracking
- qlm_lab/api.py: Public QLMInterface API
- qlm_lab/ingestion/: Git, CI, and agent log connectors
- qlm_lab/experiments/: Alignment and emergence validation
- qlm_lab/visualization.py: Timeline, actor graph, alignment plots
- qlm_lab/demo.py: Interactive demo script
- tests/test_qlm_core.py: Comprehensive test suite
- docs/QLM.md: Complete documentation (concepts, API, integration)

Usage:
  from qlm_lab.api import QLMInterface

  qlm = QLMInterface()
  qlm.record_operator_intent("Build feature X")
  qlm.record_agent_execution("agent-1", "Implement X", "task-1")
  summary = qlm.get_summary(days=7)

Run:
  python -m qlm_lab.demo
  python -m qlm_lab.experiments.alignment_detection
  pytest tests/test_qlm_core.py -v

Integrates with:
- cognitive/intent_graph.py (intent tracking)
- cognitive/agent_coordination.py (multi-agent coordination)
- operator_engine/scheduler.py (background analysis)

Next steps: Integrate with FastAPI backend, add Prism Console UI,
implement Lucidia language runtime.
This commit is contained in:
Claude
2025-11-18 08:15:06 +00:00
parent 1733282bb2
commit e478add607
15 changed files with 4534 additions and 0 deletions

View File

@@ -0,0 +1,17 @@
"""
QLM Ingestion - Wire QLM to Reality
This module contains connectors that ingest real system data into QLM:
- Git commits → QLMEvents
- CI test results → System events
- Agent logs → Agent execution events
- Deployment events → System events
Each connector transforms external data into QLMEvents.
"""
from qlm_lab.ingestion.git import GitConnector
from qlm_lab.ingestion.ci import CIConnector
from qlm_lab.ingestion.agent_logs import AgentLogConnector
__all__ = ["GitConnector", "CIConnector", "AgentLogConnector"]

View File

@@ -0,0 +1,209 @@
"""
Agent Log Connector - Ingest agent execution logs into QLM
Parses agent logs and converts them into QLM events.
"""
import json
import re
from datetime import datetime
from typing import List, Optional, Dict, Any
import logging
from qlm_lab.models import EventType, ActorRole
from qlm_lab.api import QLMInterface
logger = logging.getLogger(__name__)
class AgentLogConnector:
"""
Connects QLM to agent execution logs.
Usage:
connector = AgentLogConnector(qlm=qlm_interface)
connector.ingest_log_file("/path/to/agent.log")
"""
def __init__(self, qlm: QLMInterface):
"""
Args:
qlm: QLMInterface instance
"""
self.qlm = qlm
def parse_log_line(self, line: str) -> Optional[Dict[str, Any]]:
"""
Parse a log line into structured data.
Expects format: [timestamp] [agent_id] [level] message
Args:
line: Log line string
Returns:
Parsed log data or None
"""
# Example: [2024-01-15 10:30:45] [agent-coder-001] [INFO] Task started: implement login
pattern = r"\[([^\]]+)\]\s*\[([^\]]+)\]\s*\[([^\]]+)\]\s*(.+)"
match = re.match(pattern, line)
if not match:
return None
timestamp_str, agent_id, level, message = match.groups()
try:
timestamp = datetime.fromisoformat(timestamp_str)
except ValueError:
timestamp = datetime.now()
return {
"timestamp": timestamp,
"agent_id": agent_id.strip(),
"level": level.strip(),
"message": message.strip(),
}
def ingest_log_line(self, log_data: Dict[str, Any]) -> Optional[Any]:
"""
Ingest a parsed log line into QLM.
Args:
log_data: Parsed log data from parse_log_line()
Returns:
Created QLMEvent or None
"""
agent_id = log_data["agent_id"]
message = log_data["message"]
# Register agent if not exists
# (In production, would check if already registered)
# Detect event type from message
message_lower = message.lower()
if "task started" in message_lower or "executing" in message_lower:
# Extract task description
task_desc = message.split(":", 1)[1].strip() if ":" in message else message
return self.qlm.record_agent_execution(
agent_id=agent_id,
task_description=task_desc,
metadata={"log_timestamp": log_data["timestamp"].isoformat()},
)
elif "task completed" in message_lower or "finished" in message_lower:
# Extract task ID if present
task_id = self._extract_task_id(message)
return self.qlm.record_agent_completion(
agent_id=agent_id,
task_id=task_id or "unknown",
success=True,
result={"message": message},
)
elif "error" in message_lower or "failed" in message_lower:
task_id = self._extract_task_id(message)
return self.qlm.record_agent_error(
agent_id=agent_id,
task_id=task_id or "unknown",
error=message,
)
elif "handoff" in message_lower or "passing to" in message_lower:
# Extract target agent
to_agent = self._extract_target_agent(message)
task_id = self._extract_task_id(message)
if to_agent:
return self.qlm.record_agent_handoff(
from_agent_id=agent_id,
to_agent_id=to_agent,
task_id=task_id or "unknown",
handoff_message=message,
)
return None
def _extract_task_id(self, message: str) -> Optional[str]:
"""Extract task ID from message if present"""
# Look for task-XXX or task_XXX pattern
match = re.search(r"task[_-](\w+)", message.lower())
if match:
return f"task-{match.group(1)}"
return None
def _extract_target_agent(self, message: str) -> Optional[str]:
"""Extract target agent ID from handoff message"""
# Look for "to agent-XXX" or "→ agent-XXX"
match = re.search(r"(?:to|→)\s+(agent-[\w-]+)", message.lower())
if match:
return match.group(1)
return None
def ingest_log_file(self, file_path: str) -> List[Any]:
"""
Ingest an entire log file.
Args:
file_path: Path to log file
Returns:
List of created QLMEvents
"""
events = []
try:
with open(file_path, "r") as f:
for line in f:
line = line.strip()
if not line:
continue
log_data = self.parse_log_line(line)
if log_data:
event = self.ingest_log_line(log_data)
if event:
events.append(event)
logger.info(f"Ingested {len(events)} events from {file_path}")
return events
except FileNotFoundError:
logger.error(f"Log file not found: {file_path}")
return []
except Exception as e:
logger.error(f"Error ingesting log file: {e}")
return []
def ingest_structured_log(self, log_entries: List[Dict[str, Any]]) -> List[Any]:
"""
Ingest structured log entries (e.g., from JSON logs).
Args:
log_entries: List of log entry dictionaries
Returns:
List of created QLMEvents
"""
events = []
for entry in log_entries:
# Convert to standard format
log_data = {
"timestamp": datetime.fromisoformat(entry.get("timestamp", datetime.now().isoformat())),
"agent_id": entry.get("agent_id", "unknown"),
"level": entry.get("level", "INFO"),
"message": entry.get("message", ""),
}
event = self.ingest_log_line(log_data)
if event:
events.append(event)
logger.info(f"Ingested {len(events)} structured log entries")
return events

224
qlm_lab/ingestion/ci.py Normal file
View File

@@ -0,0 +1,224 @@
"""
CI Connector - Ingest CI/test results into QLM
Converts CI events into QLM system events:
- Test runs → SYSTEM_TEST events
- Build results → SYSTEM_BUILD events
- Deploy actions → SYSTEM_DEPLOY events
"""
import json
from datetime import datetime
from typing import List, Optional, Dict, Any
import logging
from qlm_lab.models import EventType
from qlm_lab.api import QLMInterface
logger = logging.getLogger(__name__)
class CIConnector:
"""
Connects QLM to CI/CD system (GitHub Actions, Jenkins, etc.)
Usage:
connector = CIConnector(qlm=qlm_interface)
connector.ingest_test_result(test_data)
connector.ingest_build_result(build_data)
"""
def __init__(self, qlm: QLMInterface):
"""
Args:
qlm: QLMInterface instance
"""
self.qlm = qlm
def ingest_test_result(
self,
test_name: str,
passed: bool,
duration_seconds: float,
failures: Optional[List[str]] = None,
commit_hash: Optional[str] = None,
task_id: Optional[str] = None,
) -> Any:
"""
Ingest a test run result.
Args:
test_name: Name of test suite
passed: Whether tests passed
duration_seconds: How long tests took
failures: List of failed test names
commit_hash: Related commit
task_id: Related task
Returns:
Created QLMEvent
"""
event_type = EventType.SYSTEM_TEST
description = f"Test '{test_name}': {'PASSED' if passed else 'FAILED'}"
metadata = {
"test_name": test_name,
"passed": passed,
"duration_seconds": duration_seconds,
"failures": failures or [],
"commit_hash": commit_hash,
}
event = self.qlm.record_system_event(
event_type=event_type,
description=description,
task_id=task_id,
metadata=metadata,
)
logger.info(f"Ingested test result: {test_name} - {'PASS' if passed else 'FAIL'}")
return event
def ingest_build_result(
self,
build_name: str,
success: bool,
duration_seconds: float,
artifacts: Optional[List[str]] = None,
commit_hash: Optional[str] = None,
task_id: Optional[str] = None,
) -> Any:
"""
Ingest a build result.
Args:
build_name: Name of build
success: Whether build succeeded
duration_seconds: Build duration
artifacts: List of produced artifacts
commit_hash: Related commit
task_id: Related task
Returns:
Created QLMEvent
"""
event_type = EventType.SYSTEM_BUILD
description = f"Build '{build_name}': {'SUCCESS' if success else 'FAILED'}"
metadata = {
"build_name": build_name,
"success": success,
"duration_seconds": duration_seconds,
"artifacts": artifacts or [],
"commit_hash": commit_hash,
}
event = self.qlm.record_system_event(
event_type=event_type,
description=description,
task_id=task_id,
metadata=metadata,
)
logger.info(f"Ingested build result: {build_name} - {'SUCCESS' if success else 'FAIL'}")
return event
def ingest_deploy_result(
self,
service_name: str,
environment: str,
success: bool,
version: Optional[str] = None,
commit_hash: Optional[str] = None,
task_id: Optional[str] = None,
) -> Any:
"""
Ingest a deployment result.
Args:
service_name: What was deployed
environment: Where (production, staging, etc.)
success: Whether deploy succeeded
version: Version deployed
commit_hash: Related commit
task_id: Related task
Returns:
Created QLMEvent
"""
event_type = EventType.SYSTEM_DEPLOY
description = f"Deploy '{service_name}' to {environment}: {'SUCCESS' if success else 'FAILED'}"
metadata = {
"service": service_name,
"environment": environment,
"success": success,
"version": version,
"commit_hash": commit_hash,
}
event = self.qlm.record_system_event(
event_type=event_type,
description=description,
task_id=task_id,
metadata=metadata,
)
logger.info(f"Ingested deploy: {service_name} to {environment} - {'SUCCESS' if success else 'FAIL'}")
return event
def ingest_from_github_actions(self, workflow_run: Dict[str, Any]) -> List[Any]:
"""
Ingest events from a GitHub Actions workflow run.
Args:
workflow_run: GitHub Actions workflow run data (JSON)
Returns:
List of created QLMEvents
"""
events = []
# Extract data from workflow
name = workflow_run.get("name", "Unknown workflow")
conclusion = workflow_run.get("conclusion", "unknown")
success = conclusion == "success"
# Get commit
head_commit = workflow_run.get("head_commit", {})
commit_hash = head_commit.get("id", None)
# Create test event (assuming workflow is tests)
if "test" in name.lower():
event = self.ingest_test_result(
test_name=name,
passed=success,
duration_seconds=0, # Would need to calculate from timestamps
commit_hash=commit_hash,
)
events.append(event)
# Create build event (assuming workflow builds)
elif "build" in name.lower():
event = self.ingest_build_result(
build_name=name,
success=success,
duration_seconds=0,
commit_hash=commit_hash,
)
events.append(event)
# Create deploy event
elif "deploy" in name.lower():
event = self.ingest_deploy_result(
service_name=name,
environment="production", # Would need to parse from workflow
success=success,
commit_hash=commit_hash,
)
events.append(event)
return events

240
qlm_lab/ingestion/git.py Normal file
View File

@@ -0,0 +1,240 @@
"""
Git Connector - Ingest git history into QLM
Converts git commits into QLM events:
- Commits by humans → OPERATOR_INTENT or HI events
- Commits by bots/agents → AGENT_EXECUTION events
- Merge commits → coordination events
"""
import subprocess
import re
from datetime import datetime
from typing import List, Optional, Dict, Any
import logging
from qlm_lab.models import (
QLMEvent,
EventType,
IntelligenceType,
ActorType,
ActorRole,
Actor,
)
from qlm_lab.api import QLMInterface
logger = logging.getLogger(__name__)
class GitConnector:
"""
Connects QLM to git repository history.
Usage:
connector = GitConnector(repo_path="/path/to/repo", qlm=qlm_interface)
events = connector.ingest_recent_commits(days=7)
"""
def __init__(self, repo_path: str, qlm: QLMInterface):
"""
Args:
repo_path: Path to git repository
qlm: QLMInterface instance
"""
self.repo_path = repo_path
self.qlm = qlm
# Patterns to detect agent commits
self.agent_patterns = [
r"^claude/", # Claude branches
r"^copilot/", # Copilot branches
r"^codex/", # Codex branches
r"\[bot\]", # Bot commit messages
r"\[agent\]", # Agent commit messages
]
def is_agent_commit(self, commit_data: Dict[str, str]) -> bool:
"""Determine if a commit was made by an agent"""
# Check author name/email
author = commit_data.get("author", "").lower()
if any(
pattern in author
for pattern in ["bot", "agent", "claude", "copilot", "codex"]
):
return True
# Check branch name
branch = commit_data.get("branch", "")
for pattern in self.agent_patterns:
if re.search(pattern, branch):
return True
# Check commit message
message = commit_data.get("message", "")
if "[agent]" in message.lower() or "[bot]" in message.lower():
return True
return False
def get_git_log(
self, since: Optional[str] = None, until: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
Get git log as structured data.
Args:
since: Start date (e.g., "7 days ago")
until: End date (e.g., "now")
Returns:
List of commit dictionaries
"""
cmd = [
"git",
"-C",
self.repo_path,
"log",
"--pretty=format:%H|%an|%ae|%at|%s|%b",
"--all",
]
if since:
cmd.append(f"--since={since}")
if until:
cmd.append(f"--until={until}")
try:
result = subprocess.run(
cmd, capture_output=True, text=True, check=True, timeout=30
)
commits = []
for line in result.stdout.strip().split("\n"):
if not line:
continue
parts = line.split("|", 5)
if len(parts) < 5:
continue
commit_hash, author_name, author_email, timestamp, subject = parts[:5]
body = parts[5] if len(parts) > 5 else ""
commits.append(
{
"hash": commit_hash,
"author": author_name,
"email": author_email,
"timestamp": int(timestamp),
"subject": subject,
"body": body,
"message": f"{subject}\n{body}".strip(),
}
)
return commits
except subprocess.CalledProcessError as e:
logger.error(f"Git log failed: {e}")
return []
except subprocess.TimeoutExpired:
logger.error("Git log timed out")
return []
def ingest_commit(self, commit: Dict[str, Any]) -> Optional[QLMEvent]:
"""
Ingest a single commit into QLM.
Args:
commit: Commit data from get_git_log()
Returns:
Created QLMEvent or None
"""
is_agent = self.is_agent_commit(commit)
# Determine actor
author = commit["author"]
actor_id = (
f"agent-{author.lower().replace(' ', '-')}"
if is_agent
else f"human-{author.lower().replace(' ', '-')}"
)
# Register actor if not exists
actor_type = ActorType.AGENT if is_agent else ActorType.HUMAN
# Create event
if is_agent:
# Agent commit = AGENT_EXECUTION
event = self.qlm.record_agent_execution(
agent_id=actor_id,
task_description=commit["subject"],
metadata={
"commit_hash": commit["hash"],
"commit_message": commit["message"],
"timestamp": commit["timestamp"],
"author": author,
},
)
else:
# Human commit = OPERATOR_INTENT (assuming commits reflect intent)
event = self.qlm.record_operator_intent(
intent=commit["subject"],
description=commit["body"],
metadata={
"commit_hash": commit["hash"],
"timestamp": commit["timestamp"],
"author": author,
},
)
logger.info(f"Ingested commit: {commit['hash'][:8]} - {commit['subject']}")
return event
def ingest_recent_commits(self, days: int = 7) -> List[QLMEvent]:
"""
Ingest recent commits into QLM.
Args:
days: Number of days to look back
Returns:
List of created QLMEvents
"""
logger.info(f"Ingesting git commits from last {days} days...")
commits = self.get_git_log(since=f"{days} days ago")
events = []
for commit in commits:
event = self.ingest_commit(commit)
if event:
events.append(event)
logger.info(f"Ingested {len(events)} commits")
return events
def ingest_commit_range(
self, since: str, until: Optional[str] = None
) -> List[QLMEvent]:
"""
Ingest commits in a specific range.
Args:
since: Start date (e.g., "2024-01-01")
until: End date (default: now)
Returns:
List of created QLMEvents
"""
commits = self.get_git_log(since=since, until=until)
events = []
for commit in commits:
event = self.ingest_commit(commit)
if event:
events.append(event)
logger.info(f"Ingested {len(events)} commits from {since} to {until or 'now'}")
return events