""" Base Agent Class The foundation for all BlackRoad agents. Provides common functionality, error handling, logging, and execution framework. """ import asyncio import logging import time from abc import ABC, abstractmethod from dataclasses import dataclass, field from datetime import datetime from enum import Enum from typing import Any, Dict, List, Optional, Callable from uuid import uuid4 class AgentStatus(Enum): """Agent execution status.""" IDLE = "idle" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" @dataclass class AgentMetadata: """Agent metadata and configuration.""" name: str description: str category: str version: str author: str = "BlackRoad" tags: List[str] = field(default_factory=list) dependencies: List[str] = field(default_factory=list) timeout: int = 300 # seconds retry_count: int = 3 retry_delay: int = 5 # seconds @dataclass class AgentResult: """Agent execution result.""" agent_name: str execution_id: str status: AgentStatus data: Optional[Dict[str, Any]] = None error: Optional[str] = None started_at: Optional[datetime] = None completed_at: Optional[datetime] = None duration_seconds: Optional[float] = None metadata: Dict[str, Any] = field(default_factory=dict) class BaseAgent(ABC): """ Base class for all BlackRoad agents. Provides: - Lifecycle management (initialize, execute, cleanup) - Error handling and retries - Logging and telemetry - Input validation - Configuration management Example: ```python class MyAgent(BaseAgent): def __init__(self): super().__init__( name='my-agent', description='My custom agent', category='custom', version='1.0.0' ) async def execute(self, params): # Your logic here return {'result': 'success'} ``` """ def __init__( self, name: str, description: str, category: str, version: str, **kwargs ): """Initialize the base agent.""" self.metadata = AgentMetadata( name=name, description=description, category=category, version=version, **kwargs ) self.status = AgentStatus.IDLE self.logger = logging.getLogger(f"agent.{name}") self._execution_id: Optional[str] = None self._hooks: Dict[str, List[Callable]] = { 'before_execute': [], 'after_execute': [], 'on_error': [], 'on_success': [] } @abstractmethod async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]: """ Execute the agent logic. Args: params: Input parameters for the agent Returns: Dictionary containing execution results Raises: Exception: If execution fails """ pass async def initialize(self) -> None: """ Initialize the agent before execution. Override this method to add custom initialization logic. """ self.logger.info(f"Initializing agent: {self.metadata.name}") async def cleanup(self) -> None: """ Cleanup after agent execution. Override this method to add custom cleanup logic. """ self.logger.info(f"Cleaning up agent: {self.metadata.name}") def validate_params(self, params: Dict[str, Any]) -> bool: """ Validate input parameters. Override this method to add custom validation logic. Args: params: Parameters to validate Returns: True if valid, False otherwise """ return True async def run(self, params: Dict[str, Any]) -> AgentResult: """ Run the agent with full lifecycle management. Args: params: Input parameters Returns: AgentResult containing execution details """ execution_id = str(uuid4()) self._execution_id = execution_id started_at = datetime.utcnow() self.logger.info( f"Starting agent execution: {self.metadata.name} " f"(ID: {execution_id})" ) try: # Validate params if not self.validate_params(params): raise ValueError("Invalid parameters provided") # Initialize await self.initialize() # Run before hooks await self._run_hooks('before_execute', params) # Execute with retries self.status = AgentStatus.RUNNING result_data = await self._execute_with_retry(params) # Run success hooks await self._run_hooks('on_success', result_data) # Cleanup await self.cleanup() # Run after hooks await self._run_hooks('after_execute', result_data) self.status = AgentStatus.COMPLETED completed_at = datetime.utcnow() duration = (completed_at - started_at).total_seconds() self.logger.info( f"Agent execution completed: {self.metadata.name} " f"(Duration: {duration:.2f}s)" ) return AgentResult( agent_name=self.metadata.name, execution_id=execution_id, status=AgentStatus.COMPLETED, data=result_data, started_at=started_at, completed_at=completed_at, duration_seconds=duration ) except Exception as e: self.status = AgentStatus.FAILED completed_at = datetime.utcnow() duration = (completed_at - started_at).total_seconds() error_msg = f"Agent execution failed: {str(e)}" self.logger.error(error_msg, exc_info=True) # Run error hooks await self._run_hooks('on_error', {'error': str(e)}) return AgentResult( agent_name=self.metadata.name, execution_id=execution_id, status=AgentStatus.FAILED, error=str(e), started_at=started_at, completed_at=completed_at, duration_seconds=duration ) async def _execute_with_retry( self, params: Dict[str, Any] ) -> Dict[str, Any]: """Execute with automatic retries on failure.""" last_exception = None for attempt in range(self.metadata.retry_count): try: return await asyncio.wait_for( self.execute(params), timeout=self.metadata.timeout ) except asyncio.TimeoutError: last_exception = Exception( f"Agent execution timed out after " f"{self.metadata.timeout} seconds" ) self.logger.warning( f"Attempt {attempt + 1}/{self.metadata.retry_count} " f"timed out" ) except Exception as e: last_exception = e self.logger.warning( f"Attempt {attempt + 1}/{self.metadata.retry_count} " f"failed: {str(e)}" ) if attempt < self.metadata.retry_count - 1: await asyncio.sleep(self.metadata.retry_delay) raise last_exception async def _run_hooks( self, hook_name: str, data: Dict[str, Any] ) -> None: """Run registered hooks.""" for hook in self._hooks.get(hook_name, []): try: await hook(self, data) except Exception as e: self.logger.error( f"Hook '{hook_name}' failed: {str(e)}", exc_info=True ) def register_hook( self, hook_name: str, callback: Callable ) -> None: """ Register a lifecycle hook. Args: hook_name: Name of the hook (before_execute, after_execute, etc.) callback: Async function to call """ if hook_name in self._hooks: self._hooks[hook_name].append(callback) else: raise ValueError(f"Unknown hook: {hook_name}") def get_info(self) -> Dict[str, Any]: """Get agent information.""" return { 'name': self.metadata.name, 'description': self.metadata.description, 'category': self.metadata.category, 'version': self.metadata.version, 'author': self.metadata.author, 'tags': self.metadata.tags, 'status': self.status.value, 'dependencies': self.metadata.dependencies }