mirror of
https://github.com/blackboxprogramming/BlackRoad-Operating-System.git
synced 2026-03-16 23:57:10 -05:00
This commit integrates the LEITL (Live Everyone In The Loop) Protocol and
Cece Cognition Framework into the BlackRoad agent ecosystem, enabling
multi-agent collaboration and advanced reasoning capabilities.
**Changes:**
1. **Cognition Router Integration** (`backend/app/routers/cognition.py`):
- Fixed import path for orchestration service
- Exposes full Cece Cognition Framework via REST API
- Endpoints for single agent execution and multi-agent workflows
- Supports sequential, parallel, and recursive execution modes
2. **Main App Updates** (`backend/app/main.py`):
- Added cognition router to imports
- Registered `/api/cognition` endpoints
- Added Cognition tag to OpenAPI docs
3. **BaseAgent LEITL Integration** (`agents/base/agent.py`):
- Added optional LEITL protocol support to base agent class
- New methods: `enable_leitl()`, `disable_leitl()`, `_leitl_broadcast()`, `_leitl_heartbeat()`
- Automatic event broadcasting during agent execution lifecycle
- Events: task.started, task.completed, task.failed
- Heartbeat support for session keep-alive
4. **AgentRegistry LEITL Support** (`agents/base/registry.py`):
- Added `enable_leitl_for_all()` - Enable LEITL for all registered agents
- Added `disable_leitl_for_all()` - Disable LEITL for all agents
- Added `get_leitl_status()` - Get LEITL status and session IDs
- Bulk agent session management
**Integration Architecture:**
```
User Request → Cognition API (/api/cognition)
↓
Orchestration Engine
↓
┌────────────┴──────────┐
↓ ↓
Cece Agent Other Agents
(15-step reasoning) (specialized)
↓ ↓
LEITL Protocol (if enabled)
↓
Redis PubSub + WebSocket
↓
Other active sessions
```
**New Capabilities:**
1. **Single Agent Execution**: POST /api/cognition/execute
- Execute Cece, Wasp, Clause, or Codex individually
- Full reasoning trace and confidence scores
2. **Multi-Agent Workflows**: POST /api/cognition/workflows
- Orchestrate multiple agents in complex workflows
- Sequential, parallel, or recursive execution
- Shared memory and context across agents
3. **LEITL Collaboration**:
- All agents can now broadcast their activity in real-time
- Multi-agent sessions can see each other's work
- Live activity feed via WebSocket
- Session management with heartbeats
4. **Agent Registry**:
- Bulk enable/disable LEITL for all agents
- Query LEITL status across the agent ecosystem
- Centralized session management
**Testing:**
- ✅ All Python files compile successfully
- ✅ Orchestration engine imports correctly
- ✅ BaseAgent with LEITL integration works
- ✅ AgentRegistry with LEITL support works
- ✅ Cece agent imports and executes
457 lines
14 KiB
Python
457 lines
14 KiB
Python
"""
|
|
Base Agent Class
|
|
|
|
The foundation for all BlackRoad agents. Provides common functionality,
|
|
error handling, logging, and execution framework.
|
|
|
|
Integrations:
|
|
- LEITL Protocol: Multi-agent collaboration with live session management
|
|
- Cognition Framework: Access to Cece's reasoning and architecture capabilities
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
from abc import ABC, abstractmethod
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
from typing import Any, Dict, List, Optional, Callable
|
|
from uuid import uuid4
|
|
|
|
|
|
class AgentStatus(Enum):
|
|
"""Agent execution status."""
|
|
IDLE = "idle"
|
|
RUNNING = "running"
|
|
COMPLETED = "completed"
|
|
FAILED = "failed"
|
|
CANCELLED = "cancelled"
|
|
|
|
|
|
@dataclass
|
|
class AgentMetadata:
|
|
"""Agent metadata and configuration."""
|
|
name: str
|
|
description: str
|
|
category: str
|
|
version: str
|
|
author: str = "BlackRoad"
|
|
tags: List[str] = field(default_factory=list)
|
|
dependencies: List[str] = field(default_factory=list)
|
|
timeout: int = 300 # seconds
|
|
retry_count: int = 3
|
|
retry_delay: int = 5 # seconds
|
|
|
|
|
|
@dataclass
|
|
class AgentResult:
|
|
"""Agent execution result."""
|
|
agent_name: str
|
|
execution_id: str
|
|
status: AgentStatus
|
|
data: Optional[Dict[str, Any]] = None
|
|
error: Optional[str] = None
|
|
started_at: Optional[datetime] = None
|
|
completed_at: Optional[datetime] = None
|
|
duration_seconds: Optional[float] = None
|
|
metadata: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
class BaseAgent(ABC):
|
|
"""
|
|
Base class for all BlackRoad agents.
|
|
|
|
Provides:
|
|
- Lifecycle management (initialize, execute, cleanup)
|
|
- Error handling and retries
|
|
- Logging and telemetry
|
|
- Input validation
|
|
- Configuration management
|
|
|
|
Example:
|
|
```python
|
|
class MyAgent(BaseAgent):
|
|
def __init__(self):
|
|
super().__init__(
|
|
name='my-agent',
|
|
description='My custom agent',
|
|
category='custom',
|
|
version='1.0.0'
|
|
)
|
|
|
|
async def execute(self, params):
|
|
# Your logic here
|
|
return {'result': 'success'}
|
|
```
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
description: str,
|
|
category: str,
|
|
version: str,
|
|
**kwargs
|
|
):
|
|
"""Initialize the base agent."""
|
|
self.metadata = AgentMetadata(
|
|
name=name,
|
|
description=description,
|
|
category=category,
|
|
version=version,
|
|
**kwargs
|
|
)
|
|
|
|
self.status = AgentStatus.IDLE
|
|
self.logger = logging.getLogger(f"agent.{name}")
|
|
self._execution_id: Optional[str] = None
|
|
self._hooks: Dict[str, List[Callable]] = {
|
|
'before_execute': [],
|
|
'after_execute': [],
|
|
'on_error': [],
|
|
'on_success': []
|
|
}
|
|
|
|
# LEITL Protocol integration (optional)
|
|
self._leitl_enabled: bool = False
|
|
self._leitl_session_id: Optional[str] = None
|
|
self._leitl_protocol = None # Will be set if LEITL is enabled
|
|
|
|
@abstractmethod
|
|
async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Execute the agent logic.
|
|
|
|
Args:
|
|
params: Input parameters for the agent
|
|
|
|
Returns:
|
|
Dictionary containing execution results
|
|
|
|
Raises:
|
|
Exception: If execution fails
|
|
"""
|
|
pass
|
|
|
|
async def initialize(self) -> None:
|
|
"""
|
|
Initialize the agent before execution.
|
|
Override this method to add custom initialization logic.
|
|
"""
|
|
self.logger.info(f"Initializing agent: {self.metadata.name}")
|
|
|
|
async def cleanup(self) -> None:
|
|
"""
|
|
Cleanup after agent execution.
|
|
Override this method to add custom cleanup logic.
|
|
"""
|
|
self.logger.info(f"Cleaning up agent: {self.metadata.name}")
|
|
|
|
def validate_params(self, params: Dict[str, Any]) -> bool:
|
|
"""
|
|
Validate input parameters.
|
|
Override this method to add custom validation logic.
|
|
|
|
Args:
|
|
params: Parameters to validate
|
|
|
|
Returns:
|
|
True if valid, False otherwise
|
|
"""
|
|
return True
|
|
|
|
async def run(self, params: Dict[str, Any]) -> AgentResult:
|
|
"""
|
|
Run the agent with full lifecycle management.
|
|
|
|
Args:
|
|
params: Input parameters
|
|
|
|
Returns:
|
|
AgentResult containing execution details
|
|
"""
|
|
execution_id = str(uuid4())
|
|
self._execution_id = execution_id
|
|
started_at = datetime.utcnow()
|
|
|
|
self.logger.info(
|
|
f"Starting agent execution: {self.metadata.name} "
|
|
f"(ID: {execution_id})"
|
|
)
|
|
|
|
# Broadcast task started via LEITL (if enabled)
|
|
await self._leitl_broadcast(
|
|
"task.started",
|
|
{
|
|
"task_id": execution_id,
|
|
"agent": self.metadata.name,
|
|
"description": str(params.get("input", ""))[:200]
|
|
}
|
|
)
|
|
|
|
try:
|
|
# Validate params
|
|
if not self.validate_params(params):
|
|
raise ValueError("Invalid parameters provided")
|
|
|
|
# Initialize
|
|
await self.initialize()
|
|
|
|
# Run before hooks
|
|
await self._run_hooks('before_execute', params)
|
|
|
|
# Execute with retries
|
|
self.status = AgentStatus.RUNNING
|
|
|
|
# Send heartbeat
|
|
await self._leitl_heartbeat(f"Executing {self.metadata.name}")
|
|
|
|
result_data = await self._execute_with_retry(params)
|
|
|
|
# Run success hooks
|
|
await self._run_hooks('on_success', result_data)
|
|
|
|
# Cleanup
|
|
await self.cleanup()
|
|
|
|
# Run after hooks
|
|
await self._run_hooks('after_execute', result_data)
|
|
|
|
self.status = AgentStatus.COMPLETED
|
|
completed_at = datetime.utcnow()
|
|
duration = (completed_at - started_at).total_seconds()
|
|
|
|
self.logger.info(
|
|
f"Agent execution completed: {self.metadata.name} "
|
|
f"(Duration: {duration:.2f}s)"
|
|
)
|
|
|
|
# Broadcast task completed via LEITL (if enabled)
|
|
await self._leitl_broadcast(
|
|
"task.completed",
|
|
{
|
|
"task_id": execution_id,
|
|
"agent": self.metadata.name,
|
|
"duration": duration,
|
|
"status": "success"
|
|
}
|
|
)
|
|
|
|
return AgentResult(
|
|
agent_name=self.metadata.name,
|
|
execution_id=execution_id,
|
|
status=AgentStatus.COMPLETED,
|
|
data=result_data,
|
|
started_at=started_at,
|
|
completed_at=completed_at,
|
|
duration_seconds=duration
|
|
)
|
|
|
|
except Exception as e:
|
|
self.status = AgentStatus.FAILED
|
|
completed_at = datetime.utcnow()
|
|
duration = (completed_at - started_at).total_seconds()
|
|
|
|
error_msg = f"Agent execution failed: {str(e)}"
|
|
self.logger.error(error_msg, exc_info=True)
|
|
|
|
# Run error hooks
|
|
await self._run_hooks('on_error', {'error': str(e)})
|
|
|
|
# Broadcast task failed via LEITL (if enabled)
|
|
await self._leitl_broadcast(
|
|
"task.failed",
|
|
{
|
|
"task_id": execution_id,
|
|
"agent": self.metadata.name,
|
|
"error": str(e),
|
|
"duration": duration
|
|
}
|
|
)
|
|
|
|
return AgentResult(
|
|
agent_name=self.metadata.name,
|
|
execution_id=execution_id,
|
|
status=AgentStatus.FAILED,
|
|
error=str(e),
|
|
started_at=started_at,
|
|
completed_at=completed_at,
|
|
duration_seconds=duration
|
|
)
|
|
|
|
async def _execute_with_retry(
|
|
self,
|
|
params: Dict[str, Any]
|
|
) -> Dict[str, Any]:
|
|
"""Execute with automatic retries on failure."""
|
|
last_exception = None
|
|
|
|
for attempt in range(self.metadata.retry_count):
|
|
try:
|
|
return await asyncio.wait_for(
|
|
self.execute(params),
|
|
timeout=self.metadata.timeout
|
|
)
|
|
except asyncio.TimeoutError:
|
|
last_exception = Exception(
|
|
f"Agent execution timed out after "
|
|
f"{self.metadata.timeout} seconds"
|
|
)
|
|
self.logger.warning(
|
|
f"Attempt {attempt + 1}/{self.metadata.retry_count} "
|
|
f"timed out"
|
|
)
|
|
except Exception as e:
|
|
last_exception = e
|
|
self.logger.warning(
|
|
f"Attempt {attempt + 1}/{self.metadata.retry_count} "
|
|
f"failed: {str(e)}"
|
|
)
|
|
|
|
if attempt < self.metadata.retry_count - 1:
|
|
await asyncio.sleep(self.metadata.retry_delay)
|
|
|
|
raise last_exception
|
|
|
|
async def _run_hooks(
|
|
self,
|
|
hook_name: str,
|
|
data: Dict[str, Any]
|
|
) -> None:
|
|
"""Run registered hooks."""
|
|
for hook in self._hooks.get(hook_name, []):
|
|
try:
|
|
await hook(self, data)
|
|
except Exception as e:
|
|
self.logger.error(
|
|
f"Hook '{hook_name}' failed: {str(e)}",
|
|
exc_info=True
|
|
)
|
|
|
|
def register_hook(
|
|
self,
|
|
hook_name: str,
|
|
callback: Callable
|
|
) -> None:
|
|
"""
|
|
Register a lifecycle hook.
|
|
|
|
Args:
|
|
hook_name: Name of the hook (before_execute, after_execute, etc.)
|
|
callback: Async function to call
|
|
"""
|
|
if hook_name in self._hooks:
|
|
self._hooks[hook_name].append(callback)
|
|
else:
|
|
raise ValueError(f"Unknown hook: {hook_name}")
|
|
|
|
def get_info(self) -> Dict[str, Any]:
|
|
"""Get agent information."""
|
|
return {
|
|
'name': self.metadata.name,
|
|
'description': self.metadata.description,
|
|
'category': self.metadata.category,
|
|
'version': self.metadata.version,
|
|
'author': self.metadata.author,
|
|
'tags': self.metadata.tags,
|
|
'status': self.status.value,
|
|
'dependencies': self.metadata.dependencies,
|
|
'leitl_enabled': self._leitl_enabled,
|
|
'leitl_session_id': self._leitl_session_id
|
|
}
|
|
|
|
async def enable_leitl(
|
|
self,
|
|
leitl_protocol=None,
|
|
tags: Optional[List[str]] = None
|
|
) -> str:
|
|
"""
|
|
Enable LEITL protocol for this agent
|
|
|
|
Args:
|
|
leitl_protocol: LEITL protocol instance (optional, will be imported if not provided)
|
|
tags: Optional tags for session categorization
|
|
|
|
Returns:
|
|
LEITL session ID
|
|
"""
|
|
if self._leitl_enabled:
|
|
return self._leitl_session_id
|
|
|
|
# Import LEITL protocol if not provided
|
|
if leitl_protocol is None:
|
|
try:
|
|
from app.services.leitl_protocol import leitl_protocol as protocol
|
|
self._leitl_protocol = protocol
|
|
except ImportError:
|
|
self.logger.warning("LEITL protocol not available - running in standalone mode")
|
|
return None
|
|
else:
|
|
self._leitl_protocol = leitl_protocol
|
|
|
|
# Register session
|
|
try:
|
|
session = await self._leitl_protocol.register_session(
|
|
agent_name=self.metadata.name,
|
|
agent_type=self.metadata.category,
|
|
tags=tags or self.metadata.tags
|
|
)
|
|
|
|
self._leitl_enabled = True
|
|
self._leitl_session_id = session.session_id
|
|
|
|
self.logger.info(f"✅ LEITL enabled - Session ID: {session.session_id}")
|
|
return session.session_id
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to enable LEITL: {str(e)}")
|
|
return None
|
|
|
|
async def disable_leitl(self):
|
|
"""Disable LEITL protocol and end session"""
|
|
if not self._leitl_enabled:
|
|
return
|
|
|
|
try:
|
|
if self._leitl_protocol and self._leitl_session_id:
|
|
await self._leitl_protocol.end_session(self._leitl_session_id)
|
|
|
|
self._leitl_enabled = False
|
|
self._leitl_session_id = None
|
|
self.logger.info("LEITL session ended")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error disabling LEITL: {str(e)}")
|
|
|
|
async def _leitl_broadcast(
|
|
self,
|
|
event_type: str,
|
|
data: Optional[Dict[str, Any]] = None
|
|
):
|
|
"""Broadcast event via LEITL protocol"""
|
|
if not self._leitl_enabled or not self._leitl_protocol:
|
|
return
|
|
|
|
try:
|
|
await self._leitl_protocol.broadcast_event(
|
|
event_type=event_type,
|
|
session_id=self._leitl_session_id,
|
|
data=data
|
|
)
|
|
except Exception as e:
|
|
self.logger.warning(f"LEITL broadcast failed: {str(e)}")
|
|
|
|
async def _leitl_heartbeat(self, current_task: Optional[str] = None):
|
|
"""Send heartbeat to LEITL protocol"""
|
|
if not self._leitl_enabled or not self._leitl_protocol:
|
|
return
|
|
|
|
try:
|
|
await self._leitl_protocol.heartbeat(
|
|
session_id=self._leitl_session_id,
|
|
current_task=current_task
|
|
)
|
|
except Exception as e:
|
|
self.logger.warning(f"LEITL heartbeat failed: {str(e)}")
|