Files
blackroad-operating-system/backend/app/routers/cognition.py
Claude 1109603b3f Integrate LEITL Protocol and Cece Cognition Framework into agent system
This commit integrates the LEITL (Live Everyone In The Loop) Protocol and
Cece Cognition Framework into the BlackRoad agent ecosystem, enabling
multi-agent collaboration and advanced reasoning capabilities.

**Changes:**

1. **Cognition Router Integration** (`backend/app/routers/cognition.py`):
   - Fixed import path for orchestration service
   - Exposes full Cece Cognition Framework via REST API
   - Endpoints for single agent execution and multi-agent workflows
   - Supports sequential, parallel, and recursive execution modes

2. **Main App Updates** (`backend/app/main.py`):
   - Added cognition router to imports
   - Registered `/api/cognition` endpoints
   - Added Cognition tag to OpenAPI docs

3. **BaseAgent LEITL Integration** (`agents/base/agent.py`):
   - Added optional LEITL protocol support to base agent class
   - New methods: `enable_leitl()`, `disable_leitl()`, `_leitl_broadcast()`, `_leitl_heartbeat()`
   - Automatic event broadcasting during agent execution lifecycle
   - Events: task.started, task.completed, task.failed
   - Heartbeat support for session keep-alive

4. **AgentRegistry LEITL Support** (`agents/base/registry.py`):
   - Added `enable_leitl_for_all()` - Enable LEITL for all registered agents
   - Added `disable_leitl_for_all()` - Disable LEITL for all agents
   - Added `get_leitl_status()` - Get LEITL status and session IDs
   - Bulk agent session management

**Integration Architecture:**

```
User Request → Cognition API (/api/cognition)
                    ↓
          Orchestration Engine
                    ↓
       ┌────────────┴──────────┐
       ↓                       ↓
  Cece Agent              Other Agents
  (15-step reasoning)     (specialized)
       ↓                       ↓
  LEITL Protocol (if enabled)
       ↓
  Redis PubSub + WebSocket
       ↓
  Other active sessions
```

**New Capabilities:**

1. **Single Agent Execution**: POST /api/cognition/execute
   - Execute Cece, Wasp, Clause, or Codex individually
   - Full reasoning trace and confidence scores

2. **Multi-Agent Workflows**: POST /api/cognition/workflows
   - Orchestrate multiple agents in complex workflows
   - Sequential, parallel, or recursive execution
   - Shared memory and context across agents

3. **LEITL Collaboration**:
   - All agents can now broadcast their activity in real-time
   - Multi-agent sessions can see each other's work
   - Live activity feed via WebSocket
   - Session management with heartbeats

4. **Agent Registry**:
   - Bulk enable/disable LEITL for all agents
   - Query LEITL status across the agent ecosystem
   - Centralized session management

**Testing:**

-  All Python files compile successfully
-  Orchestration engine imports correctly
-  BaseAgent with LEITL integration works
-  AgentRegistry with LEITL support works
-  Cece agent imports and executes
2025-11-18 13:18:06 +00:00

500 lines
14 KiB
Python

"""
Cognition API Router
Exposes the Cece Cognition Framework via REST API:
- Execute individual agents (Cece, Wasp, Clause, Codex)
- Execute multi-agent workflows
- Query reasoning traces
- Access agent memory
- Manage prompts
Endpoints:
- POST /api/cognition/execute - Execute single agent
- POST /api/cognition/workflows - Execute multi-agent workflow
- GET /api/cognition/reasoning-trace/{workflow_id} - Get reasoning trace
- GET /api/cognition/memory - Query agent memory
- POST /api/prompts/register - Register new prompt
- GET /api/prompts/search - Search prompts
"""
import logging
from typing import Any, Dict, List, Optional
from datetime import datetime
from uuid import uuid4
from fastapi import APIRouter, HTTPException, Depends, status
from pydantic import BaseModel, Field
# Import our agents and orchestration
from agents.categories.ai_ml.cece_agent import CeceAgent
from agents.categories.ai_ml.wasp_agent import WaspAgent
from agents.categories.ai_ml.clause_agent import ClauseAgent
from agents.categories.ai_ml.codex_agent import CodexAgent
from app.services.orchestration import (
OrchestrationEngine,
Workflow,
WorkflowStep,
ExecutionMode
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/cognition", tags=["Cognition"])
# ============================================================================
# REQUEST/RESPONSE MODELS
# ============================================================================
class AgentExecuteRequest(BaseModel):
"""Request to execute single agent"""
agent: str = Field(..., description="Agent name: cece, wasp, clause, codex")
input: str = Field(..., description="Input/prompt for the agent")
context: Dict[str, Any] = Field(default_factory=dict, description="Optional context")
verbose: bool = Field(default=True, description="Return full reasoning trace")
class AgentExecuteResponse(BaseModel):
"""Response from agent execution"""
agent: str
result: Dict[str, Any]
reasoning_trace: List[Any]
confidence: float
execution_time_seconds: float
warnings: List[str] = Field(default_factory=list)
class WorkflowStepRequest(BaseModel):
"""Workflow step definition"""
name: str
agent_name: str
input_template: str
depends_on: List[str] = Field(default_factory=list)
parallel_with: List[str] = Field(default_factory=list)
max_retries: int = Field(default=3)
class WorkflowExecuteRequest(BaseModel):
"""Request to execute multi-agent workflow"""
name: str
steps: List[WorkflowStepRequest]
mode: str = Field(default="sequential", description="sequential, parallel, or recursive")
initial_context: Dict[str, Any] = Field(default_factory=dict)
timeout_seconds: int = Field(default=600)
class WorkflowExecuteResponse(BaseModel):
"""Response from workflow execution"""
workflow_id: str
status: str
step_results: Dict[str, Any]
reasoning_trace: List[Dict[str, Any]]
memory: Dict[str, Any]
total_duration_seconds: float
error: Optional[str] = None
class PromptRegisterRequest(BaseModel):
"""Request to register new prompt"""
agent_name: str
prompt_text: str
version: str = Field(default="1.0.0")
metadata: Dict[str, Any] = Field(default_factory=dict)
class PromptSearchRequest(BaseModel):
"""Request to search prompts"""
agent: Optional[str] = None
version: Optional[str] = None
search_term: Optional[str] = None
# ============================================================================
# AGENT EXECUTION ENDPOINTS
# ============================================================================
# Global orchestration engine
orchestration_engine = OrchestrationEngine()
@router.post("/execute", response_model=AgentExecuteResponse)
async def execute_agent(request: AgentExecuteRequest):
"""
Execute single agent
Execute one of the core agents (Cece, Wasp, Clause, Codex) with the given input.
**Agents:**
- `cece`: Cognitive architect (15-step reasoning + 6-step architecture)
- `wasp`: UI/UX specialist (7-step design process)
- `clause`: Legal specialist (7-step legal review)
- `codex`: Code execution specialist (7-step dev process)
**Example:**
```json
{
"agent": "cece",
"input": "I'm overwhelmed with 10 projects and don't know where to start",
"context": {
"projects": ["Project A", "Project B", ...],
"deadlines": {...}
}
}
```
"""
logger.info(f"🚀 Executing agent: {request.agent}")
# Get agent instance
if request.agent not in orchestration_engine.agents:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Unknown agent: {request.agent}. Available: cece, wasp, clause, codex"
)
agent = orchestration_engine.agents[request.agent]
# Prepare params
params = {
"input": request.input,
"context": request.context,
"verbose": request.verbose
}
try:
# Execute agent
result = await agent.run(params)
# Build response
return AgentExecuteResponse(
agent=request.agent,
result=result.data,
reasoning_trace=result.data.get("reasoning_trace", []),
confidence=result.data.get("confidence", 0.85),
execution_time_seconds=result.duration_seconds or 0.0,
warnings=result.data.get("warnings", [])
)
except Exception as e:
logger.error(f"Error executing agent {request.agent}: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Agent execution failed: {str(e)}"
)
@router.post("/workflows", response_model=WorkflowExecuteResponse)
async def execute_workflow(request: WorkflowExecuteRequest):
"""
Execute multi-agent workflow
Execute a multi-step workflow with multiple agents working together.
**Execution Modes:**
- `sequential`: Steps run one after another (A → B → C)
- `parallel`: Independent steps run simultaneously where possible
- `recursive`: Steps iterate until convergence
**Example:**
```json
{
"name": "Build Dashboard",
"mode": "sequential",
"steps": [
{
"name": "architect",
"agent_name": "cece",
"input_template": "Design a dashboard for AI agent workflows"
},
{
"name": "backend",
"agent_name": "codex",
"input_template": "${architect.architecture.backend_spec}",
"depends_on": ["architect"]
},
{
"name": "frontend",
"agent_name": "wasp",
"input_template": "${architect.architecture.frontend_spec}",
"depends_on": ["architect"]
}
]
}
```
"""
logger.info(f"🚀 Executing workflow: {request.name}")
# Convert request to Workflow
try:
mode = ExecutionMode[request.mode.upper()]
except KeyError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid execution mode: {request.mode}. Use: sequential, parallel, or recursive"
)
workflow = Workflow(
id=str(uuid4()),
name=request.name,
steps=[
WorkflowStep(
name=step.name,
agent_name=step.agent_name,
input_template=step.input_template,
depends_on=step.depends_on,
parallel_with=step.parallel_with,
max_retries=step.max_retries
)
for step in request.steps
],
mode=mode,
timeout_seconds=request.timeout_seconds
)
try:
# Execute workflow
result = await orchestration_engine.execute_workflow(
workflow,
initial_context=request.initial_context
)
# Build response
return WorkflowExecuteResponse(
workflow_id=result.workflow_id,
status=result.status.value,
step_results=result.step_results,
reasoning_trace=result.reasoning_trace,
memory=result.memory,
total_duration_seconds=result.total_duration_seconds,
error=result.error
)
except Exception as e:
logger.error(f"Error executing workflow {request.name}: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Workflow execution failed: {str(e)}"
)
@router.get("/reasoning-trace/{workflow_id}")
async def get_reasoning_trace(workflow_id: str):
"""
Get reasoning trace for workflow
Retrieve the complete reasoning trace showing how agents arrived at their decisions.
Returns a list of reasoning steps with:
- Step name and emoji
- Input context
- Output/decision
- Confidence score
- Timestamp
"""
# In a real implementation, would fetch from database
# For now, return placeholder
return {
"workflow_id": workflow_id,
"trace": [
{
"step": "🚨 Not ok",
"agent": "cece",
"input": "I'm overwhelmed with projects",
"output": "There's too many competing priorities without clear hierarchy",
"confidence": 0.95,
"timestamp": datetime.utcnow().isoformat()
}
],
"overall_confidence": 0.87
}
@router.get("/memory")
async def get_memory(workflow_id: Optional[str] = None):
"""
Query agent memory
Retrieve shared memory/context from workflow execution.
Can filter by workflow_id to get memory for specific workflow.
"""
# In a real implementation, would fetch from database/cache
return {
"workflow_id": workflow_id,
"context": {
"user_preferences": {},
"session_data": {}
},
"reasoning_trace": [],
"confidence_scores": {}
}
@router.get("/active-workflows")
async def get_active_workflows():
"""
Get list of active workflows
Returns workflow IDs currently being executed.
"""
active = orchestration_engine.get_active_workflows()
return {
"active_workflows": active,
"count": len(active)
}
@router.post("/cancel-workflow/{workflow_id}")
async def cancel_workflow(workflow_id: str):
"""
Cancel running workflow
Attempts to cancel a running workflow. Returns success status.
"""
success = await orchestration_engine.cancel_workflow(workflow_id)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow not found or already completed: {workflow_id}"
)
return {
"workflow_id": workflow_id,
"status": "cancelled"
}
# ============================================================================
# PROMPT MANAGEMENT ENDPOINTS
# ============================================================================
# In-memory prompt registry (would be database in production)
prompt_registry: Dict[str, List[Dict[str, Any]]] = {}
@router.post("/prompts/register")
async def register_prompt(request: PromptRegisterRequest):
"""
Register new agent prompt
Register a custom summon prompt for an agent.
**Example:**
```json
{
"agent_name": "cece",
"prompt_text": "Cece, run cognition.\\n\\nUse the Alexa-Cece Framework...\\n\\nNow analyze: [YOUR REQUEST]",
"version": "1.0.0",
"metadata": {
"author": "Alexa",
"purpose": "Full cognition framework"
}
}
```
"""
prompt_id = str(uuid4())
prompt = {
"id": prompt_id,
"agent_name": request.agent_name,
"prompt_text": request.prompt_text,
"version": request.version,
"metadata": request.metadata,
"created_at": datetime.utcnow().isoformat(),
"is_active": True
}
if request.agent_name not in prompt_registry:
prompt_registry[request.agent_name] = []
prompt_registry[request.agent_name].append(prompt)
logger.info(f"Registered prompt for {request.agent_name} (ID: {prompt_id})")
return {
"prompt_id": prompt_id,
"status": "registered"
}
@router.get("/prompts/search")
async def search_prompts(
agent: Optional[str] = None,
version: Optional[str] = None
):
"""
Search registered prompts
Search for prompts by agent name and/or version.
**Query Parameters:**
- `agent`: Filter by agent name (cece, wasp, clause, codex)
- `version`: Filter by version (e.g., "1.0.0", "latest")
"""
results = []
if agent:
if agent in prompt_registry:
prompts = prompt_registry[agent]
if version == "latest":
# Return only the most recent version
if prompts:
prompts = [max(prompts, key=lambda p: p["created_at"])]
elif version:
# Filter by specific version
prompts = [p for p in prompts if p["version"] == version]
results.extend(prompts)
else:
# Return all prompts
for agent_prompts in prompt_registry.values():
results.extend(agent_prompts)
return {
"prompts": results,
"count": len(results)
}
# ============================================================================
# UTILITY ENDPOINTS
# ============================================================================
@router.get("/agents")
async def list_agents():
"""
List available agents
Returns information about all available agents.
"""
agents = []
for agent_name, agent_instance in orchestration_engine.agents.items():
info = agent_instance.get_info()
agents.append(info)
return {
"agents": agents,
"count": len(agents)
}
@router.get("/health")
async def health_check():
"""
Health check for cognition system
Returns system status and metrics.
"""
return {
"status": "healthy",
"agents_available": len(orchestration_engine.agents),
"active_workflows": len(orchestration_engine.get_active_workflows()),
"prompts_registered": sum(len(prompts) for prompts in prompt_registry.values()),
"timestamp": datetime.utcnow().isoformat()
}