mirror of
https://github.com/blackboxprogramming/BlackRoad-Operating-System.git
synced 2026-03-17 00:57:12 -05:00
This is what AI collaboration should have been from day one. A comprehensive cognitive layer that solves the fundamental problems of context loss, information silos, and coordination chaos. ## Core Components **Intent Graph** - Tracks WHY things happen - Every goal, task, and decision has a rationale - Relationships between objectives are explicit - Context is never lost **Semantic File System** - Files that know what they ARE - Auto-classification based on content and purpose - Semantic search (find by meaning, not just name) - Auto-organization (no more downloads folder chaos) - Files suggest where they belong **Living Documents** - Self-updating documentation - Code-aware: understands what code it documents - Detects when code changes and docs are stale - Can auto-generate from code - Always in sync **Context Engine** - Right information at the right time - Provides relevant context based on current task - Integrates intent, code, docs, and decisions - Proactive intelligence (suggests next actions) - Answers: "Why does this exist?" "What's related?" **Agent Coordination Protocol** - Multi-agent collaboration that works - Shared context via cognitive layer - Clear task ownership and handoffs - No duplicate work - Conflict resolution - Progress tracking **Smart Documents** - OCR, templates, auto-formatting - Extract text from PDFs and images - Identify document types automatically - ATS-friendly resume formatting - Business plan templates - Auto-filing based on content - Template matching and application ## What This Solves Traditional problems: ❌ Files in arbitrary folders ❌ Context lives in people's heads ❌ Docs get out of sync ❌ Multi-agent chaos ❌ Downloads folder anarchy ❌ Lost decisions and rationale Cognitive OS solutions: ✅ Files organize by meaning and purpose ✅ Context is captured and connected ✅ Docs update themselves ✅ Agents coordinate cleanly ✅ Everything auto-organizes ✅ Every decision is recorded with WHY ## Architecture cognitive/ ├── __init__.py # Main CognitiveOS integration ├── intent_graph.py # Goals, tasks, decisions, relationships ├── semantic_fs.py # Content-aware file organization ├── living_docs.py # Self-updating documentation ├── context_engine.py # Intelligent context retrieval ├── agent_coordination.py # Multi-agent collaboration ├── smart_documents.py # OCR, templates, auto-format ├── README.md # Vision and philosophy ├── USAGE.md # Complete usage guide ├── quickstart.py # Interactive demo └── requirements.txt # Optional dependencies ## Quick Start ```python from cognitive import CognitiveOS # Initialize cog = CognitiveOS() # Create a goal with rationale goal = cog.create_goal( "Build user authentication", rationale="Users need secure access" ) # Process a document (auto-classify, auto-organize) cog.process_new_file("~/Downloads/resume.pdf") # Get context for what you're working on context = cog.get_context(task_id="current-task") ``` ## Philosophy This is how AI and data should have been handled from the start: - **Semantic over Hierarchical**: Organize by meaning, not folders - **Intent-Preserving**: Capture WHY, not just WHAT - **Auto-Linking**: Related things connect automatically - **Context-Aware**: System knows what you're trying to do - **Agent-First**: Designed for AI-human collaboration Combines the best of Notion + Asana + actual code awareness + auto-organization + OCR + business planning + ATS-friendly formatting. No more hoping the world doesn't catch on fire. No more downloads folder chaos. No more lost context. This is the cognitive layer every OS should have had.
561 lines
19 KiB
Python
561 lines
19 KiB
Python
"""
|
|
Agent Coordination Protocol - Multi-agent collaboration that actually works
|
|
|
|
The problem with current multi-agent systems:
|
|
- Agents duplicate work
|
|
- No shared context
|
|
- Messy handoffs
|
|
- Lost information
|
|
- No clear ownership
|
|
|
|
This protocol solves that by providing:
|
|
- Shared context via the cognitive layer
|
|
- Clear task ownership
|
|
- Handoff protocols
|
|
- Conflict resolution
|
|
- Progress tracking
|
|
- Collaboration patterns
|
|
"""
|
|
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
from typing import Dict, List, Optional, Set, Any
|
|
from uuid import uuid4
|
|
import json
|
|
|
|
|
|
class AgentRole(Enum):
|
|
"""Agent specializations"""
|
|
COORDINATOR = "coordinator" # Manages other agents
|
|
CODER = "coder" # Writes code
|
|
REVIEWER = "reviewer" # Reviews code/docs
|
|
RESEARCHER = "researcher" # Finds information
|
|
DOCUMENTER = "documenter" # Writes documentation
|
|
TESTER = "tester" # Tests code
|
|
PLANNER = "planner" # Creates plans
|
|
EXECUTOR = "executor" # Executes tasks
|
|
|
|
|
|
class TaskStatus(Enum):
|
|
"""Task status in agent workflow"""
|
|
PENDING = "pending"
|
|
CLAIMED = "claimed"
|
|
IN_PROGRESS = "in_progress"
|
|
REVIEW_NEEDED = "review_needed"
|
|
BLOCKED = "blocked"
|
|
COMPLETED = "completed"
|
|
CANCELLED = "cancelled"
|
|
|
|
|
|
class HandoffType(Enum):
|
|
"""Types of handoffs between agents"""
|
|
SEQUENTIAL = "sequential" # A does task, then B does next task
|
|
PARALLEL = "parallel" # A and B work simultaneously
|
|
REVIEW = "review" # A does work, B reviews
|
|
ASSIST = "assist" # B helps A with current task
|
|
DELEGATE = "delegate" # A assigns subtask to B
|
|
|
|
|
|
@dataclass
|
|
class AgentInfo:
|
|
"""Information about an agent"""
|
|
id: str = field(default_factory=lambda: str(uuid4()))
|
|
name: str = ""
|
|
role: AgentRole = AgentRole.EXECUTOR
|
|
capabilities: Set[str] = field(default_factory=set)
|
|
current_task_id: Optional[str] = None
|
|
active: bool = True
|
|
created_at: datetime = field(default_factory=datetime.now)
|
|
last_seen: datetime = field(default_factory=datetime.now)
|
|
metadata: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
@dataclass
|
|
class Handoff:
|
|
"""A handoff between agents"""
|
|
id: str = field(default_factory=lambda: str(uuid4()))
|
|
from_agent_id: str = ""
|
|
to_agent_id: str = ""
|
|
task_id: str = ""
|
|
handoff_type: HandoffType = HandoffType.SEQUENTIAL
|
|
context: Dict[str, Any] = field(default_factory=dict) # Info for receiving agent
|
|
message: str = "" # Handoff message
|
|
created_at: datetime = field(default_factory=datetime.now)
|
|
accepted_at: Optional[datetime] = None
|
|
completed_at: Optional[datetime] = None
|
|
status: str = "pending" # pending, accepted, rejected, completed
|
|
|
|
|
|
@dataclass
|
|
class CollaborationSession:
|
|
"""A multi-agent collaboration session"""
|
|
id: str = field(default_factory=lambda: str(uuid4()))
|
|
goal: str = ""
|
|
description: str = ""
|
|
agents: Set[str] = field(default_factory=set) # Agent IDs
|
|
task_ids: Set[str] = field(default_factory=set) # Tasks in this session
|
|
coordinator_id: Optional[str] = None
|
|
created_at: datetime = field(default_factory=datetime.now)
|
|
started_at: Optional[datetime] = None
|
|
completed_at: Optional[datetime] = None
|
|
status: str = "planning" # planning, active, completed, cancelled
|
|
metadata: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
class AgentCoordinator:
|
|
"""
|
|
Coordinates multiple agents working together.
|
|
|
|
Key responsibilities:
|
|
- Task assignment and load balancing
|
|
- Handoff management
|
|
- Conflict resolution
|
|
- Progress tracking
|
|
- Context sharing
|
|
"""
|
|
|
|
def __init__(self, intent_graph=None, context_engine=None):
|
|
"""
|
|
Initialize with connections to cognitive systems.
|
|
|
|
This allows agents to share context and intent.
|
|
"""
|
|
self.intent_graph = intent_graph
|
|
self.context_engine = context_engine
|
|
self.agents: Dict[str, AgentInfo] = {}
|
|
self.handoffs: Dict[str, Handoff] = {}
|
|
self.sessions: Dict[str, CollaborationSession] = {}
|
|
self.task_ownership: Dict[str, str] = {} # task_id -> agent_id
|
|
|
|
def register_agent(self, agent: AgentInfo) -> AgentInfo:
|
|
"""Register a new agent"""
|
|
self.agents[agent.id] = agent
|
|
return agent
|
|
|
|
def create_session(self, goal: str, description: str = "",
|
|
coordinator_id: Optional[str] = None) -> CollaborationSession:
|
|
"""Create a new collaboration session"""
|
|
session = CollaborationSession(
|
|
goal=goal,
|
|
description=description,
|
|
coordinator_id=coordinator_id
|
|
)
|
|
self.sessions[session.id] = session
|
|
|
|
# Create a goal in the intent graph
|
|
if self.intent_graph:
|
|
from cognitive.intent_graph import IntentType
|
|
goal_node = self.intent_graph.create_goal(
|
|
title=goal,
|
|
description=description,
|
|
metadata={"session_id": session.id}
|
|
)
|
|
session.metadata['intent_node_id'] = goal_node.id
|
|
|
|
return session
|
|
|
|
def assign_task(self, task_id: str, agent_id: str,
|
|
context: Optional[Dict] = None) -> bool:
|
|
"""
|
|
Assign a task to a specific agent.
|
|
|
|
Returns True if assignment successful, False otherwise.
|
|
"""
|
|
if agent_id not in self.agents:
|
|
return False
|
|
|
|
agent = self.agents[agent_id]
|
|
|
|
# Check if agent is available
|
|
if agent.current_task_id and agent.active:
|
|
# Agent is busy - could queue or reject
|
|
return False
|
|
|
|
# Assign the task
|
|
self.task_ownership[task_id] = agent_id
|
|
agent.current_task_id = task_id
|
|
agent.last_seen = datetime.now()
|
|
|
|
# Update task status in intent graph
|
|
if self.intent_graph and task_id in self.intent_graph.nodes:
|
|
task_node = self.intent_graph.nodes[task_id]
|
|
task_node.assigned_to = agent_id
|
|
task_node.status = task_node.status # Keep current status
|
|
|
|
# Provide context to the agent
|
|
if self.context_engine and context:
|
|
# Store context for agent to retrieve
|
|
pass
|
|
|
|
return True
|
|
|
|
def find_available_agent(self, required_role: Optional[AgentRole] = None,
|
|
required_capabilities: Optional[Set[str]] = None) -> Optional[AgentInfo]:
|
|
"""
|
|
Find an available agent with specific requirements.
|
|
|
|
This is for automatic task assignment.
|
|
"""
|
|
for agent in self.agents.values():
|
|
if not agent.active:
|
|
continue
|
|
|
|
if agent.current_task_id:
|
|
continue # Agent is busy
|
|
|
|
if required_role and agent.role != required_role:
|
|
continue
|
|
|
|
if required_capabilities:
|
|
if not required_capabilities.issubset(agent.capabilities):
|
|
continue
|
|
|
|
return agent
|
|
|
|
return None
|
|
|
|
def create_handoff(self, from_agent_id: str, to_agent_id: str,
|
|
task_id: str, handoff_type: HandoffType,
|
|
message: str = "", context: Optional[Dict] = None) -> Handoff:
|
|
"""
|
|
Create a handoff from one agent to another.
|
|
|
|
This is how agents collaborate!
|
|
"""
|
|
handoff = Handoff(
|
|
from_agent_id=from_agent_id,
|
|
to_agent_id=to_agent_id,
|
|
task_id=task_id,
|
|
handoff_type=handoff_type,
|
|
message=message,
|
|
context=context or {}
|
|
)
|
|
|
|
self.handoffs[handoff.id] = handoff
|
|
|
|
# Get full context for the handoff
|
|
if self.context_engine:
|
|
context_bundle = self.context_engine.get_context_for_task(task_id)
|
|
handoff.context['cognitive_context'] = {
|
|
'goal': context_bundle.task_title,
|
|
'top_items': [
|
|
{
|
|
'type': item.type,
|
|
'title': item.title,
|
|
'content': item.content
|
|
}
|
|
for item in context_bundle.get_top_items(5)
|
|
]
|
|
}
|
|
|
|
return handoff
|
|
|
|
def accept_handoff(self, handoff_id: str, agent_id: str) -> bool:
|
|
"""Agent accepts a handoff"""
|
|
if handoff_id not in self.handoffs:
|
|
return False
|
|
|
|
handoff = self.handoffs[handoff_id]
|
|
|
|
if handoff.to_agent_id != agent_id:
|
|
return False # Wrong agent
|
|
|
|
handoff.status = "accepted"
|
|
handoff.accepted_at = datetime.now()
|
|
|
|
# Update task ownership
|
|
self.task_ownership[handoff.task_id] = agent_id
|
|
if agent_id in self.agents:
|
|
self.agents[agent_id].current_task_id = handoff.task_id
|
|
|
|
return True
|
|
|
|
def complete_handoff(self, handoff_id: str, result: Optional[Dict] = None) -> bool:
|
|
"""Mark a handoff as completed"""
|
|
if handoff_id not in self.handoffs:
|
|
return False
|
|
|
|
handoff = self.handoffs[handoff_id]
|
|
handoff.status = "completed"
|
|
handoff.completed_at = datetime.now()
|
|
|
|
if result:
|
|
handoff.context['result'] = result
|
|
|
|
# Update receiving agent
|
|
if handoff.to_agent_id in self.agents:
|
|
agent = self.agents[handoff.to_agent_id]
|
|
if agent.current_task_id == handoff.task_id:
|
|
agent.current_task_id = None
|
|
|
|
return True
|
|
|
|
def get_agent_context(self, agent_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get full context for an agent.
|
|
|
|
This tells the agent what they need to know:
|
|
- Current task
|
|
- Related context
|
|
- Active handoffs
|
|
- Session info
|
|
"""
|
|
if agent_id not in self.agents:
|
|
return {}
|
|
|
|
agent = self.agents[agent_id]
|
|
context = {
|
|
'agent': {
|
|
'id': agent.id,
|
|
'name': agent.name,
|
|
'role': agent.role.value,
|
|
'current_task': agent.current_task_id
|
|
},
|
|
'active_handoffs': [],
|
|
'pending_handoffs': [],
|
|
'sessions': []
|
|
}
|
|
|
|
# Get active handoffs
|
|
for handoff in self.handoffs.values():
|
|
if handoff.to_agent_id == agent_id:
|
|
if handoff.status == "pending":
|
|
context['pending_handoffs'].append({
|
|
'id': handoff.id,
|
|
'from_agent': handoff.from_agent_id,
|
|
'task_id': handoff.task_id,
|
|
'type': handoff.handoff_type.value,
|
|
'message': handoff.message
|
|
})
|
|
elif handoff.status == "accepted":
|
|
context['active_handoffs'].append({
|
|
'id': handoff.id,
|
|
'task_id': handoff.task_id,
|
|
'type': handoff.handoff_type.value
|
|
})
|
|
|
|
# Get task context if agent has current task
|
|
if agent.current_task_id and self.context_engine:
|
|
task_context = self.context_engine.get_context_for_task(agent.current_task_id)
|
|
context['task_context'] = {
|
|
'title': task_context.task_title,
|
|
'top_items': [
|
|
{
|
|
'type': item.type,
|
|
'title': item.title,
|
|
'relevance': item.relevance_score
|
|
}
|
|
for item in task_context.get_top_items(5)
|
|
]
|
|
}
|
|
|
|
# Get active sessions
|
|
for session in self.sessions.values():
|
|
if agent_id in session.agents:
|
|
context['sessions'].append({
|
|
'id': session.id,
|
|
'goal': session.goal,
|
|
'status': session.status,
|
|
'is_coordinator': session.coordinator_id == agent_id
|
|
})
|
|
|
|
return context
|
|
|
|
def suggest_collaboration(self, task_id: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Suggest how to collaborate on a task.
|
|
|
|
Analyzes task and suggests:
|
|
- Which agents should work on it
|
|
- What collaboration pattern to use
|
|
- How to split the work
|
|
"""
|
|
suggestions = []
|
|
|
|
if not self.intent_graph or task_id not in self.intent_graph.nodes:
|
|
return suggestions
|
|
|
|
task = self.intent_graph.nodes[task_id]
|
|
|
|
# If task has subtasks, suggest parallel work
|
|
if task.child_ids:
|
|
suggestions.append({
|
|
"pattern": "parallel",
|
|
"description": f"Task has {len(task.child_ids)} subtasks. Assign to multiple agents for parallel execution.",
|
|
"agents_needed": min(len(task.child_ids), 3),
|
|
"roles": [AgentRole.EXECUTOR.value]
|
|
})
|
|
|
|
# If task is complex, suggest planner + executor pattern
|
|
if task.description and len(task.description) > 500:
|
|
suggestions.append({
|
|
"pattern": "plan_and_execute",
|
|
"description": "Complex task. Use planner to break down, then executor to implement.",
|
|
"agents_needed": 2,
|
|
"roles": [AgentRole.PLANNER.value, AgentRole.EXECUTOR.value]
|
|
})
|
|
|
|
# If task involves code, suggest coder + reviewer pattern
|
|
code_extensions = {'.py', '.js', '.java', '.cpp', '.go', '.rs'}
|
|
has_code = any(
|
|
any(path.endswith(ext) for ext in code_extensions)
|
|
for path in task.file_paths
|
|
)
|
|
|
|
if has_code:
|
|
suggestions.append({
|
|
"pattern": "code_and_review",
|
|
"description": "Task involves code. Use coder + reviewer for quality.",
|
|
"agents_needed": 2,
|
|
"roles": [AgentRole.CODER.value, AgentRole.REVIEWER.value]
|
|
})
|
|
|
|
return suggestions
|
|
|
|
def get_session_status(self, session_id: str) -> Dict[str, Any]:
|
|
"""Get status of a collaboration session"""
|
|
if session_id not in self.sessions:
|
|
return {}
|
|
|
|
session = self.sessions[session_id]
|
|
|
|
# Gather task statuses
|
|
task_statuses = {}
|
|
if self.intent_graph:
|
|
for task_id in session.task_ids:
|
|
if task_id in self.intent_graph.nodes:
|
|
task = self.intent_graph.nodes[task_id]
|
|
status = task.status.value
|
|
task_statuses[status] = task_statuses.get(status, 0) + 1
|
|
|
|
# Get agent activity
|
|
active_agents = []
|
|
for agent_id in session.agents:
|
|
if agent_id in self.agents:
|
|
agent = self.agents[agent_id]
|
|
if agent.active and agent.current_task_id:
|
|
active_agents.append(agent_id)
|
|
|
|
return {
|
|
'session_id': session.id,
|
|
'goal': session.goal,
|
|
'status': session.status,
|
|
'agents': {
|
|
'total': len(session.agents),
|
|
'active': len(active_agents)
|
|
},
|
|
'tasks': {
|
|
'total': len(session.task_ids),
|
|
'by_status': task_statuses
|
|
},
|
|
'progress': self._calculate_progress(session.task_ids),
|
|
'duration': (datetime.now() - session.created_at).seconds if session.created_at else 0
|
|
}
|
|
|
|
def _calculate_progress(self, task_ids: Set[str]) -> float:
|
|
"""Calculate progress for a set of tasks"""
|
|
if not task_ids or not self.intent_graph:
|
|
return 0.0
|
|
|
|
total = len(task_ids)
|
|
completed = 0
|
|
|
|
for task_id in task_ids:
|
|
if task_id in self.intent_graph.nodes:
|
|
task = self.intent_graph.nodes[task_id]
|
|
if task.status.value == "completed":
|
|
completed += 1
|
|
|
|
return completed / total if total > 0 else 0.0
|
|
|
|
def resolve_conflict(self, task_id: str, agent1_id: str, agent2_id: str) -> str:
|
|
"""
|
|
Resolve conflict when multiple agents try to work on same task.
|
|
|
|
Resolution strategies:
|
|
- Check who claimed first
|
|
- Consider agent roles/capabilities
|
|
- Suggest splitting task
|
|
- Suggest collaboration pattern
|
|
"""
|
|
# Simple strategy: first come, first served
|
|
if task_id in self.task_ownership:
|
|
return self.task_ownership[task_id]
|
|
|
|
# If both agents have same role, assign to agent with less work
|
|
if agent1_id in self.agents and agent2_id in self.agents:
|
|
agent1 = self.agents[agent1_id]
|
|
agent2 = self.agents[agent2_id]
|
|
|
|
# Count active tasks
|
|
agent1_tasks = sum(1 for tid, aid in self.task_ownership.items()
|
|
if aid == agent1_id)
|
|
agent2_tasks = sum(1 for tid, aid in self.task_ownership.items()
|
|
if aid == agent2_id)
|
|
|
|
return agent1_id if agent1_tasks <= agent2_tasks else agent2_id
|
|
|
|
return agent1_id
|
|
|
|
def export_coordination_state(self, file_path: str) -> None:
|
|
"""Export coordination state for persistence"""
|
|
data = {
|
|
'agents': {
|
|
aid: {
|
|
'id': a.id,
|
|
'name': a.name,
|
|
'role': a.role.value,
|
|
'current_task': a.current_task_id,
|
|
'active': a.active
|
|
}
|
|
for aid, a in self.agents.items()
|
|
},
|
|
'sessions': {
|
|
sid: {
|
|
'id': s.id,
|
|
'goal': s.goal,
|
|
'agents': list(s.agents),
|
|
'tasks': list(s.task_ids),
|
|
'status': s.status
|
|
}
|
|
for sid, s in self.sessions.items()
|
|
},
|
|
'task_ownership': self.task_ownership
|
|
}
|
|
|
|
with open(file_path, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
|
|
# Example usage and collaboration patterns
|
|
if __name__ == "__main__":
|
|
# Example: Multi-agent code review workflow
|
|
# coordinator = AgentCoordinator(intent_graph, context_engine)
|
|
|
|
# Register agents
|
|
# coder = AgentInfo(name="CodeWriter", role=AgentRole.CODER)
|
|
# reviewer = AgentInfo(name="CodeReviewer", role=AgentRole.REVIEWER)
|
|
# coordinator.register_agent(coder)
|
|
# coordinator.register_agent(reviewer)
|
|
|
|
# Create session
|
|
# session = coordinator.create_session(
|
|
# goal="Implement user authentication feature",
|
|
# description="Add login, signup, and password reset"
|
|
# )
|
|
|
|
# Assign work
|
|
# coordinator.assign_task("implement-login", coder.id)
|
|
|
|
# When coder is done, handoff to reviewer
|
|
# handoff = coordinator.create_handoff(
|
|
# from_agent_id=coder.id,
|
|
# to_agent_id=reviewer.id,
|
|
# task_id="implement-login",
|
|
# handoff_type=HandoffType.REVIEW,
|
|
# message="Login implementation complete, ready for review"
|
|
# )
|
|
|
|
print("Agent Coordination Protocol initialized")
|