Files
blackroad-operating-system/agents/base/registry.py
Claude 1109603b3f Integrate LEITL Protocol and Cece Cognition Framework into agent system
This commit integrates the LEITL (Live Everyone In The Loop) Protocol and
Cece Cognition Framework into the BlackRoad agent ecosystem, enabling
multi-agent collaboration and advanced reasoning capabilities.

**Changes:**

1. **Cognition Router Integration** (`backend/app/routers/cognition.py`):
   - Fixed import path for orchestration service
   - Exposes full Cece Cognition Framework via REST API
   - Endpoints for single agent execution and multi-agent workflows
   - Supports sequential, parallel, and recursive execution modes

2. **Main App Updates** (`backend/app/main.py`):
   - Added cognition router to imports
   - Registered `/api/cognition` endpoints
   - Added Cognition tag to OpenAPI docs

3. **BaseAgent LEITL Integration** (`agents/base/agent.py`):
   - Added optional LEITL protocol support to base agent class
   - New methods: `enable_leitl()`, `disable_leitl()`, `_leitl_broadcast()`, `_leitl_heartbeat()`
   - Automatic event broadcasting during agent execution lifecycle
   - Events: task.started, task.completed, task.failed
   - Heartbeat support for session keep-alive

4. **AgentRegistry LEITL Support** (`agents/base/registry.py`):
   - Added `enable_leitl_for_all()` - Enable LEITL for all registered agents
   - Added `disable_leitl_for_all()` - Disable LEITL for all agents
   - Added `get_leitl_status()` - Get LEITL status and session IDs
   - Bulk agent session management

**Integration Architecture:**

```
User Request → Cognition API (/api/cognition)
                    ↓
          Orchestration Engine
                    ↓
       ┌────────────┴──────────┐
       ↓                       ↓
  Cece Agent              Other Agents
  (15-step reasoning)     (specialized)
       ↓                       ↓
  LEITL Protocol (if enabled)
       ↓
  Redis PubSub + WebSocket
       ↓
  Other active sessions
```

**New Capabilities:**

1. **Single Agent Execution**: POST /api/cognition/execute
   - Execute Cece, Wasp, Clause, or Codex individually
   - Full reasoning trace and confidence scores

2. **Multi-Agent Workflows**: POST /api/cognition/workflows
   - Orchestrate multiple agents in complex workflows
   - Sequential, parallel, or recursive execution
   - Shared memory and context across agents

3. **LEITL Collaboration**:
   - All agents can now broadcast their activity in real-time
   - Multi-agent sessions can see each other's work
   - Live activity feed via WebSocket
   - Session management with heartbeats

4. **Agent Registry**:
   - Bulk enable/disable LEITL for all agents
   - Query LEITL status across the agent ecosystem
   - Centralized session management

**Testing:**

-  All Python files compile successfully
-  Orchestration engine imports correctly
-  BaseAgent with LEITL integration works
-  AgentRegistry with LEITL support works
-  Cece agent imports and executes
2025-11-18 13:18:06 +00:00

401 lines
12 KiB
Python

"""
Agent Registry
Centralized registry for agent discovery, registration, and management.
"""
import importlib
import logging
import os
from pathlib import Path
from typing import Any, Dict, List, Optional, Type
from .agent import BaseAgent
class AgentRegistry:
"""
Registry for managing and discovering agents.
Features:
- Agent registration and discovery
- Category-based organization
- Dynamic loading from filesystem
- Agent search and filtering
- Version management
Example:
```python
registry = AgentRegistry()
# Register an agent
registry.register(MyAgent())
# Get an agent
agent = registry.get_agent('my-agent')
# List all agents
all_agents = registry.list_agents()
# List by category
devops_agents = registry.list_agents(category='devops')
# Search agents
results = registry.search('deploy')
```
"""
def __init__(self, auto_discover: bool = True):
"""Initialize the registry."""
self.logger = logging.getLogger("agent.registry")
self._agents: Dict[str, BaseAgent] = {}
self._agents_by_category: Dict[str, List[str]] = {}
self._agent_classes: Dict[str, Type[BaseAgent]] = {}
if auto_discover:
self.discover_agents()
def register(
self,
agent: BaseAgent,
override: bool = False
) -> None:
"""
Register an agent.
Args:
agent: Agent instance to register
override: Allow overriding existing agent
Raises:
ValueError: If agent already exists and override=False
"""
agent_name = agent.metadata.name
if agent_name in self._agents and not override:
raise ValueError(
f"Agent '{agent_name}' already registered. "
f"Use override=True to replace."
)
self._agents[agent_name] = agent
# Add to category index
category = agent.metadata.category
if category not in self._agents_by_category:
self._agents_by_category[category] = []
if agent_name not in self._agents_by_category[category]:
self._agents_by_category[category].append(agent_name)
self.logger.info(f"Registered agent: {agent_name}")
def register_class(
self,
agent_class: Type[BaseAgent],
name: Optional[str] = None
) -> None:
"""
Register an agent class (lazy instantiation).
Args:
agent_class: Agent class to register
name: Optional name override
"""
agent_name = name or agent_class.__name__
self._agent_classes[agent_name] = agent_class
self.logger.info(f"Registered agent class: {agent_name}")
def unregister(self, agent_name: str) -> bool:
"""
Unregister an agent.
Args:
agent_name: Name of agent to remove
Returns:
True if removed, False if not found
"""
if agent_name in self._agents:
agent = self._agents[agent_name]
category = agent.metadata.category
del self._agents[agent_name]
# Remove from category index
if category in self._agents_by_category:
self._agents_by_category[category].remove(agent_name)
self.logger.info(f"Unregistered agent: {agent_name}")
return True
return False
def get_agent(
self,
agent_name: str,
create_new: bool = False
) -> Optional[BaseAgent]:
"""
Get an agent by name.
Args:
agent_name: Name of the agent
create_new: Create a new instance if it's a registered class
Returns:
Agent instance or None if not found
"""
# Check instances first
if agent_name in self._agents:
return self._agents[agent_name]
# Check classes
if create_new and agent_name in self._agent_classes:
agent_class = self._agent_classes[agent_name]
agent = agent_class()
self.register(agent)
return agent
return None
def list_agents(
self,
category: Optional[str] = None
) -> List[BaseAgent]:
"""
List all registered agents.
Args:
category: Filter by category (optional)
Returns:
List of agent instances
"""
if category:
agent_names = self._agents_by_category.get(category, [])
return [self._agents[name] for name in agent_names]
return list(self._agents.values())
def list_categories(self) -> List[str]:
"""Get list of all agent categories."""
return list(self._agents_by_category.keys())
def search(
self,
query: str,
search_in: List[str] = None
) -> List[BaseAgent]:
"""
Search for agents.
Args:
query: Search query
search_in: Fields to search in (name, description, tags)
Returns:
List of matching agents
"""
if search_in is None:
search_in = ['name', 'description', 'tags']
query_lower = query.lower()
results = []
for agent in self._agents.values():
matched = False
if 'name' in search_in:
if query_lower in agent.metadata.name.lower():
matched = True
if 'description' in search_in:
if query_lower in agent.metadata.description.lower():
matched = True
if 'tags' in search_in:
for tag in agent.metadata.tags:
if query_lower in tag.lower():
matched = True
break
if matched:
results.append(agent)
return results
def discover_agents(
self,
base_path: Optional[Path] = None
) -> int:
"""
Discover and register agents from filesystem.
Args:
base_path: Base directory to search (defaults to ./categories)
Returns:
Number of agents discovered
"""
if base_path is None:
# Get path relative to this file
current_dir = Path(__file__).parent.parent
base_path = current_dir / "categories"
if not base_path.exists():
self.logger.warning(
f"Agent directory not found: {base_path}"
)
return 0
discovered = 0
# Scan all category directories
for category_dir in base_path.iterdir():
if not category_dir.is_dir():
continue
category_name = category_dir.name
# Scan all Python files in the category
for agent_file in category_dir.glob("*.py"):
if agent_file.name.startswith("_"):
continue
try:
# Import the module
module_name = (
f"agents.categories.{category_name}."
f"{agent_file.stem}"
)
module = importlib.import_module(module_name)
# Find agent classes in the module
for attr_name in dir(module):
attr = getattr(module, attr_name)
# Check if it's a BaseAgent subclass
if (
isinstance(attr, type) and
issubclass(attr, BaseAgent) and
attr is not BaseAgent
):
# Instantiate and register
try:
agent = attr()
self.register(agent)
discovered += 1
except Exception as e:
self.logger.error(
f"Failed to instantiate {attr_name}: "
f"{str(e)}"
)
except Exception as e:
self.logger.error(
f"Failed to load {agent_file}: {str(e)}"
)
self.logger.info(f"Discovered {discovered} agents")
return discovered
def get_stats(self) -> Dict[str, Any]:
"""Get registry statistics."""
return {
'total_agents': len(self._agents),
'total_categories': len(self._agents_by_category),
'agents_by_category': {
cat: len(agents)
for cat, agents in self._agents_by_category.items()
},
'categories': list(self._agents_by_category.keys())
}
def export_manifest(self) -> Dict[str, Any]:
"""Export agent manifest."""
return {
'agents': [
agent.get_info()
for agent in self._agents.values()
],
'categories': self.list_categories(),
'stats': self.get_stats()
}
async def enable_leitl_for_all(
self,
leitl_protocol=None,
tags: Optional[List[str]] = None
) -> Dict[str, str]:
"""
Enable LEITL protocol for all registered agents
This allows all agents to participate in the Live Everyone In The Loop
multi-agent collaboration protocol.
Args:
leitl_protocol: LEITL protocol instance (optional, will be imported if not provided)
tags: Optional tags for sessions
Returns:
Dict mapping agent names to LEITL session IDs
"""
self.logger.info("🔗 Enabling LEITL for all agents...")
sessions = {}
for agent_name, agent in self._agents.items():
try:
session_id = await agent.enable_leitl(
leitl_protocol=leitl_protocol,
tags=tags
)
if session_id:
sessions[agent_name] = session_id
self.logger.info(f"{agent_name}: {session_id}")
except Exception as e:
self.logger.warning(f"{agent_name}: {str(e)}")
self.logger.info(f"✅ LEITL enabled for {len(sessions)}/{len(self._agents)} agents")
return sessions
async def disable_leitl_for_all(self):
"""Disable LEITL protocol for all agents"""
self.logger.info("🔌 Disabling LEITL for all agents...")
for agent_name, agent in self._agents.items():
try:
await agent.disable_leitl()
self.logger.debug(f"{agent_name}")
except Exception as e:
self.logger.warning(f"{agent_name}: {str(e)}")
self.logger.info("✅ LEITL disabled for all agents")
def get_leitl_status(self) -> Dict[str, Any]:
"""
Get LEITL status for all agents
Returns:
Dict with LEITL enabled counts and session IDs
"""
enabled_agents = {}
disabled_agents = []
for agent_name, agent in self._agents.items():
if agent._leitl_enabled:
enabled_agents[agent_name] = agent._leitl_session_id
else:
disabled_agents.append(agent_name)
return {
"leitl_enabled_count": len(enabled_agents),
"leitl_disabled_count": len(disabled_agents),
"enabled_agents": enabled_agents,
"disabled_agents": disabled_agents,
"total_agents": len(self._agents)
}