mirror of
https://github.com/blackboxprogramming/BlackRoad-Operating-System.git
synced 2026-03-17 01:57:11 -05:00
MASSIVE UPDATE - 271 new files ## Agent Library (208 agents across 10 categories) - DevOps (28 agents): deployment, monitoring, infrastructure - Engineering (30 agents): code generation, testing, documentation - Data (25 agents): ETL, analysis, visualization - Security (20 agents): scanning, compliance, threat detection - Finance (20 agents): trading, portfolio, risk analysis - Creative (20 agents): content generation, SEO, translation - Business (20 agents): CRM, automation, project management - Research (15 agents): literature review, experiments, analysis - Web (15 agents): scraping, API integration, webhooks - AI/ML (15 agents): training, deployment, monitoring ## Base Framework - BaseAgent class with lifecycle management - AgentExecutor with parallel/sequential/DAG execution - AgentRegistry with discovery and search - Configuration management - Comprehensive error handling and retries ## Python SDK - Production-ready pip-installable package - Sync and async clients - Full type hints and Pydantic models - Comprehensive examples and tests - Auth, Blockchain, and Agent clients ## TypeScript/JavaScript SDK - Production-ready npm-publishable package - Full TypeScript types - ESM + CommonJS dual package - Browser and Node.js support - Comprehensive examples and tests ## Backend Integration - /api/agents endpoints in FastAPI - Agent execution API - Agent discovery and search - Execution plans and orchestration Value: $5M+ worth of engineering work
325 lines
9.2 KiB
Python
325 lines
9.2 KiB
Python
"""
|
|
Agent Executor
|
|
|
|
Handles agent execution, orchestration, parallel execution,
|
|
and dependency management.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from typing import Any, Dict, List, Optional
|
|
from datetime import datetime
|
|
from dataclasses import dataclass, field
|
|
|
|
from .agent import BaseAgent, AgentResult, AgentStatus
|
|
|
|
|
|
@dataclass
|
|
class ExecutionPlan:
|
|
"""Plan for executing multiple agents."""
|
|
agents: List[BaseAgent]
|
|
mode: str = "sequential" # sequential, parallel, or dag
|
|
max_concurrency: int = 5
|
|
stop_on_error: bool = True
|
|
|
|
|
|
@dataclass
|
|
class OrchestrationResult:
|
|
"""Result of orchestrated agent execution."""
|
|
plan_id: str
|
|
results: List[AgentResult]
|
|
status: str
|
|
started_at: datetime
|
|
completed_at: datetime
|
|
total_duration_seconds: float
|
|
succeeded: int = 0
|
|
failed: int = 0
|
|
|
|
|
|
class AgentExecutor:
|
|
"""
|
|
Executes agents with support for:
|
|
- Single agent execution
|
|
- Parallel execution
|
|
- Sequential execution
|
|
- DAG-based execution with dependencies
|
|
- Resource management
|
|
- Rate limiting
|
|
|
|
Example:
|
|
```python
|
|
executor = AgentExecutor()
|
|
|
|
# Execute single agent
|
|
result = await executor.execute(agent, params)
|
|
|
|
# Execute multiple agents in parallel
|
|
results = await executor.execute_parallel([agent1, agent2], params)
|
|
|
|
# Execute with dependencies
|
|
plan = ExecutionPlan(
|
|
agents=[agent1, agent2, agent3],
|
|
mode='dag'
|
|
)
|
|
results = await executor.execute_plan(plan, params)
|
|
```
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
max_concurrent_agents: int = 10,
|
|
default_timeout: int = 300
|
|
):
|
|
"""Initialize the executor."""
|
|
self.max_concurrent_agents = max_concurrent_agents
|
|
self.default_timeout = default_timeout
|
|
self.logger = logging.getLogger("agent.executor")
|
|
self._semaphore = asyncio.Semaphore(max_concurrent_agents)
|
|
self._active_executions: Dict[str, AgentResult] = {}
|
|
|
|
async def execute(
|
|
self,
|
|
agent: BaseAgent,
|
|
params: Dict[str, Any]
|
|
) -> AgentResult:
|
|
"""
|
|
Execute a single agent.
|
|
|
|
Args:
|
|
agent: Agent to execute
|
|
params: Parameters to pass to the agent
|
|
|
|
Returns:
|
|
AgentResult containing execution details
|
|
"""
|
|
async with self._semaphore:
|
|
self.logger.info(f"Executing agent: {agent.metadata.name}")
|
|
result = await agent.run(params)
|
|
return result
|
|
|
|
async def execute_parallel(
|
|
self,
|
|
agents: List[BaseAgent],
|
|
params: Dict[str, Any],
|
|
max_concurrency: Optional[int] = None
|
|
) -> List[AgentResult]:
|
|
"""
|
|
Execute multiple agents in parallel.
|
|
|
|
Args:
|
|
agents: List of agents to execute
|
|
params: Parameters to pass to each agent
|
|
max_concurrency: Max number of concurrent executions
|
|
|
|
Returns:
|
|
List of AgentResult objects
|
|
"""
|
|
self.logger.info(
|
|
f"Executing {len(agents)} agents in parallel "
|
|
f"(max_concurrency: {max_concurrency or self.max_concurrent_agents})"
|
|
)
|
|
|
|
if max_concurrency:
|
|
semaphore = asyncio.Semaphore(max_concurrency)
|
|
|
|
async def execute_with_limit(agent):
|
|
async with semaphore:
|
|
return await agent.run(params)
|
|
|
|
tasks = [execute_with_limit(agent) for agent in agents]
|
|
else:
|
|
tasks = [self.execute(agent, params) for agent in agents]
|
|
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
# Convert exceptions to failed results
|
|
processed_results = []
|
|
for i, result in enumerate(results):
|
|
if isinstance(result, Exception):
|
|
processed_results.append(AgentResult(
|
|
agent_name=agents[i].metadata.name,
|
|
execution_id="error",
|
|
status=AgentStatus.FAILED,
|
|
error=str(result),
|
|
started_at=datetime.utcnow(),
|
|
completed_at=datetime.utcnow(),
|
|
duration_seconds=0.0
|
|
))
|
|
else:
|
|
processed_results.append(result)
|
|
|
|
return processed_results
|
|
|
|
async def execute_sequential(
|
|
self,
|
|
agents: List[BaseAgent],
|
|
params: Dict[str, Any],
|
|
stop_on_error: bool = True
|
|
) -> List[AgentResult]:
|
|
"""
|
|
Execute multiple agents sequentially.
|
|
|
|
Args:
|
|
agents: List of agents to execute
|
|
params: Parameters to pass to each agent
|
|
stop_on_error: Stop execution if an agent fails
|
|
|
|
Returns:
|
|
List of AgentResult objects
|
|
"""
|
|
self.logger.info(f"Executing {len(agents)} agents sequentially")
|
|
|
|
results = []
|
|
for agent in agents:
|
|
result = await self.execute(agent, params)
|
|
results.append(result)
|
|
|
|
if stop_on_error and result.status == AgentStatus.FAILED:
|
|
self.logger.warning(
|
|
f"Agent {agent.metadata.name} failed, "
|
|
f"stopping execution"
|
|
)
|
|
break
|
|
|
|
return results
|
|
|
|
async def execute_plan(
|
|
self,
|
|
plan: ExecutionPlan,
|
|
params: Dict[str, Any]
|
|
) -> OrchestrationResult:
|
|
"""
|
|
Execute an execution plan.
|
|
|
|
Args:
|
|
plan: Execution plan
|
|
params: Parameters to pass to agents
|
|
|
|
Returns:
|
|
OrchestrationResult with all execution results
|
|
"""
|
|
plan_id = f"plan_{datetime.utcnow().timestamp()}"
|
|
started_at = datetime.utcnow()
|
|
|
|
self.logger.info(
|
|
f"Executing plan {plan_id} with {len(plan.agents)} agents "
|
|
f"(mode: {plan.mode})"
|
|
)
|
|
|
|
if plan.mode == "parallel":
|
|
results = await self.execute_parallel(
|
|
plan.agents,
|
|
params,
|
|
plan.max_concurrency
|
|
)
|
|
elif plan.mode == "sequential":
|
|
results = await self.execute_sequential(
|
|
plan.agents,
|
|
params,
|
|
plan.stop_on_error
|
|
)
|
|
elif plan.mode == "dag":
|
|
results = await self._execute_dag(plan.agents, params)
|
|
else:
|
|
raise ValueError(f"Unknown execution mode: {plan.mode}")
|
|
|
|
completed_at = datetime.utcnow()
|
|
duration = (completed_at - started_at).total_seconds()
|
|
|
|
succeeded = sum(
|
|
1 for r in results if r.status == AgentStatus.COMPLETED
|
|
)
|
|
failed = sum(
|
|
1 for r in results if r.status == AgentStatus.FAILED
|
|
)
|
|
|
|
overall_status = "completed" if failed == 0 else "partial_failure"
|
|
|
|
self.logger.info(
|
|
f"Plan {plan_id} completed: {succeeded} succeeded, "
|
|
f"{failed} failed (Duration: {duration:.2f}s)"
|
|
)
|
|
|
|
return OrchestrationResult(
|
|
plan_id=plan_id,
|
|
results=results,
|
|
status=overall_status,
|
|
started_at=started_at,
|
|
completed_at=completed_at,
|
|
total_duration_seconds=duration,
|
|
succeeded=succeeded,
|
|
failed=failed
|
|
)
|
|
|
|
async def _execute_dag(
|
|
self,
|
|
agents: List[BaseAgent],
|
|
params: Dict[str, Any]
|
|
) -> List[AgentResult]:
|
|
"""
|
|
Execute agents based on dependency graph (DAG).
|
|
|
|
Args:
|
|
agents: List of agents with dependencies
|
|
params: Parameters to pass to agents
|
|
|
|
Returns:
|
|
List of AgentResult objects
|
|
"""
|
|
# Build dependency graph
|
|
graph = {}
|
|
for agent in agents:
|
|
graph[agent.metadata.name] = agent.metadata.dependencies
|
|
|
|
# Topological sort to determine execution order
|
|
executed = set()
|
|
results = []
|
|
|
|
async def execute_node(agent_name: str):
|
|
if agent_name in executed:
|
|
return
|
|
|
|
# Find the agent
|
|
agent = next(
|
|
(a for a in agents if a.metadata.name == agent_name),
|
|
None
|
|
)
|
|
if not agent:
|
|
return
|
|
|
|
# Execute dependencies first
|
|
for dep in agent.metadata.dependencies:
|
|
await execute_node(dep)
|
|
|
|
# Execute this agent
|
|
result = await self.execute(agent, params)
|
|
results.append(result)
|
|
executed.add(agent_name)
|
|
|
|
# Execute all agents
|
|
for agent in agents:
|
|
await execute_node(agent.metadata.name)
|
|
|
|
return results
|
|
|
|
def get_active_executions(self) -> Dict[str, AgentResult]:
|
|
"""Get currently active agent executions."""
|
|
return self._active_executions.copy()
|
|
|
|
async def cancel_execution(self, execution_id: str) -> bool:
|
|
"""
|
|
Cancel an active agent execution.
|
|
|
|
Args:
|
|
execution_id: ID of the execution to cancel
|
|
|
|
Returns:
|
|
True if cancelled, False if not found
|
|
"""
|
|
if execution_id in self._active_executions:
|
|
# Implementation would require task tracking
|
|
# For now, just log
|
|
self.logger.info(f"Cancelling execution: {execution_id}")
|
|
return True
|
|
return False
|