Files
blackroad-operating-system/backend/app/main.py
Claude 1109603b3f Integrate LEITL Protocol and Cece Cognition Framework into agent system
This commit integrates the LEITL (Live Everyone In The Loop) Protocol and
Cece Cognition Framework into the BlackRoad agent ecosystem, enabling
multi-agent collaboration and advanced reasoning capabilities.

**Changes:**

1. **Cognition Router Integration** (`backend/app/routers/cognition.py`):
   - Fixed import path for orchestration service
   - Exposes full Cece Cognition Framework via REST API
   - Endpoints for single agent execution and multi-agent workflows
   - Supports sequential, parallel, and recursive execution modes

2. **Main App Updates** (`backend/app/main.py`):
   - Added cognition router to imports
   - Registered `/api/cognition` endpoints
   - Added Cognition tag to OpenAPI docs

3. **BaseAgent LEITL Integration** (`agents/base/agent.py`):
   - Added optional LEITL protocol support to base agent class
   - New methods: `enable_leitl()`, `disable_leitl()`, `_leitl_broadcast()`, `_leitl_heartbeat()`
   - Automatic event broadcasting during agent execution lifecycle
   - Events: task.started, task.completed, task.failed
   - Heartbeat support for session keep-alive

4. **AgentRegistry LEITL Support** (`agents/base/registry.py`):
   - Added `enable_leitl_for_all()` - Enable LEITL for all registered agents
   - Added `disable_leitl_for_all()` - Disable LEITL for all agents
   - Added `get_leitl_status()` - Get LEITL status and session IDs
   - Bulk agent session management

**Integration Architecture:**

```
User Request → Cognition API (/api/cognition)
                    ↓
          Orchestration Engine
                    ↓
       ┌────────────┴──────────┐
       ↓                       ↓
  Cece Agent              Other Agents
  (15-step reasoning)     (specialized)
       ↓                       ↓
  LEITL Protocol (if enabled)
       ↓
  Redis PubSub + WebSocket
       ↓
  Other active sessions
```

**New Capabilities:**

1. **Single Agent Execution**: POST /api/cognition/execute
   - Execute Cece, Wasp, Clause, or Codex individually
   - Full reasoning trace and confidence scores

2. **Multi-Agent Workflows**: POST /api/cognition/workflows
   - Orchestrate multiple agents in complex workflows
   - Sequential, parallel, or recursive execution
   - Shared memory and context across agents

3. **LEITL Collaboration**:
   - All agents can now broadcast their activity in real-time
   - Multi-agent sessions can see each other's work
   - Live activity feed via WebSocket
   - Session management with heartbeats

4. **Agent Registry**:
   - Bulk enable/disable LEITL for all agents
   - Query LEITL status across the agent ecosystem
   - Centralized session management

**Testing:**

-  All Python files compile successfully
-  Orchestration engine imports correctly
-  BaseAgent with LEITL integration works
-  AgentRegistry with LEITL support works
-  Cece agent imports and executes
2025-11-18 13:18:06 +00:00

273 lines
8.9 KiB
Python

"""Main FastAPI application"""
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, FileResponse
from fastapi.staticfiles import StaticFiles
from contextlib import asynccontextmanager
import time
import os
from app.config import settings
from app.database import async_engine, Base
from app.redis_client import close_redis
from app.routers import (
auth, email, social, video, files, blockchain, ai_chat, devices, miner,
digitalocean, github, huggingface, vscode, games, browser, dashboard,
railway, vercel, stripe, twilio, slack, discord, sentry, api_health, agents,
capture, identity_center, notifications_center, creator, compliance_ops,
search, cloudflare, system, webhooks, prism_static, ip_vault, leitl, cognition
)
from app.services.crypto import rotate_plaintext_wallet_keys
openapi_tags = [
{"name": "prism", "description": "Prism Console - Administrative interface for job queue, events, and metrics"},
{"name": "railway", "description": "Railway deployment management"},
{"name": "vercel", "description": "Vercel project automation"},
{"name": "stripe", "description": "Stripe billing integrations"},
{"name": "twilio", "description": "Twilio messaging"},
{"name": "slack", "description": "Slack workspace automation"},
{"name": "discord", "description": "Discord community integrations"},
{"name": "sentry", "description": "Sentry monitoring hooks"},
{"name": "health", "description": "BlackRoad OS service health"},
{"name": "agents", "description": "BlackRoad Agent Library - 208 AI agents across 10 categories"},
{"name": "cloudflare", "description": "Cloudflare zone, DNS, and Worker scaffolding"},
{"name": "IP Vault", "description": "Cryptographic proof-of-origin for ideas and intellectual property"},
{"name": "LEITL", "description": "Live Everyone In The Loop - Multi-agent collaboration with WebDAV context"},
{"name": "Cognition", "description": "Cece Cognition Framework - 15-step reasoning + 6-step architecture + multi-agent orchestration"},
]
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan events"""
# Startup
print("Starting BlackRoad Operating System Backend...")
# Create database tables
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
print("Database tables created successfully")
print(f"Server running on {settings.ENVIRONMENT} mode")
# Re-encrypt any legacy plaintext wallet keys before serving requests
updated_users, updated_wallets = await rotate_plaintext_wallet_keys()
if updated_users or updated_wallets:
print(
f"Re-encrypted {updated_users} user keys and {updated_wallets} wallet keys"
)
yield
# Shutdown
print("Shutting down...")
await close_redis()
await async_engine.dispose()
print("Shutdown complete")
# Create FastAPI app
app = FastAPI(
title=settings.APP_NAME,
version=settings.APP_VERSION,
description="Backend API for BlackRoad Operating System - A Windows 95-inspired web OS",
lifespan=lifespan,
docs_url="/api/docs",
redoc_url="/api/redoc",
openapi_url="/api/openapi.json",
openapi_tags=openapi_tags,
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.allowed_origins_list,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Request timing middleware
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
"""Add processing time header to responses"""
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
return response
# Error handlers
@app.exception_handler(404)
async def not_found_handler(request: Request, exc):
"""Handle 404 errors"""
return JSONResponse(
status_code=404,
content={"detail": "Resource not found"}
)
@app.exception_handler(500)
async def internal_error_handler(request: Request, exc):
"""Handle 500 errors"""
return JSONResponse(
status_code=500,
content={"detail": "Internal server error"}
)
# Include routers
app.include_router(auth.router)
app.include_router(email.router)
app.include_router(social.router)
app.include_router(video.router)
app.include_router(files.router)
app.include_router(blockchain.router)
app.include_router(ai_chat.router)
app.include_router(devices.router)
app.include_router(miner.router)
app.include_router(digitalocean.router)
app.include_router(github.router)
app.include_router(huggingface.router)
app.include_router(vscode.router)
app.include_router(games.router)
app.include_router(browser.router)
app.include_router(dashboard.router)
# New API integrations
app.include_router(railway.router)
app.include_router(vercel.router)
app.include_router(stripe.router)
app.include_router(twilio.router)
app.include_router(slack.router)
app.include_router(discord.router)
app.include_router(sentry.router)
app.include_router(capture.router)
app.include_router(identity_center.router)
app.include_router(notifications_center.router)
app.include_router(creator.router)
app.include_router(compliance_ops.router)
app.include_router(search.router)
app.include_router(cloudflare.router)
app.include_router(system.router)
# API health monitoring
app.include_router(api_health.router)
# Agent Library
app.include_router(agents.router)
# IP Vault
app.include_router(ip_vault.router)
# LEITL Protocol - Live Everyone In The Loop
app.include_router(leitl.router)
# Cognition Framework - Cece + Multi-Agent Orchestration
app.include_router(cognition.router)
# GitHub Webhooks (Phase Q automation)
app.include_router(webhooks.router)
# Prism Console (Phase 2.5) - Admin interface at /prism
prism_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "prism-console")
if os.path.exists(prism_dir):
app.mount("/prism", StaticFiles(directory=prism_dir, html=True), name="prism")
print(f"✅ Prism Console mounted at /prism")
# Prism Console (must be before StaticFiles mount to take precedence)
app.include_router(prism_static.router)
# Static file serving for the BlackRoad OS front-end
static_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "static")
if os.path.exists(static_dir):
# Mount static files (JS, CSS, images)
app.mount("/static", StaticFiles(directory=static_dir, html=True), name="static")
# Serve index.html at root
@app.get("/")
async def serve_frontend():
"""Serve the BlackRoad OS desktop interface"""
index_path = os.path.join(static_dir, "index.html")
if os.path.exists(index_path):
return FileResponse(index_path)
return {
"name": settings.APP_NAME,
"version": settings.APP_VERSION,
"environment": settings.ENVIRONMENT,
"docs": "/api/docs",
"prism": "/prism",
"status": "operational",
"note": "Front-end not found. API is operational."
}
else:
# Fallback if static directory doesn't exist
@app.get("/")
async def root():
"""Root endpoint"""
return {
"name": settings.APP_NAME,
"version": settings.APP_VERSION,
"environment": settings.ENVIRONMENT,
"docs": "/api/docs",
"prism": "/prism",
"status": "operational",
"note": "API-only mode. Front-end not deployed."
}
# Health check
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"timestamp": time.time()
}
# API info
@app.get("/api")
async def api_info():
"""API information"""
return {
"name": settings.APP_NAME,
"version": settings.APP_VERSION,
"endpoints": {
"auth": "/api/auth",
"email": "/api/email",
"social": "/api/social",
"videos": "/api/videos",
"files": "/api/files",
"blockchain": "/api/blockchain",
"ai_chat": "/api/ai-chat",
"devices": "/api/devices",
"miner": "/api/miner",
"digitalocean": "/api/digitalocean",
"github": "/api/github",
"huggingface": "/api/huggingface",
"vscode": "/api/vscode",
"games": "/api/games",
"browser": "/api/browser",
"dashboard": "/api/dashboard",
"railway": "/api/railway",
"vercel": "/api/vercel",
"stripe": "/api/stripe",
"twilio": "/api/twilio",
"slack": "/api/slack",
"discord": "/api/discord",
"sentry": "/api/sentry",
"health": "/api/health",
"agents": "/api/agents"
},
"documentation": {
"swagger": "/api/docs",
"redoc": "/api/redoc",
"openapi": "/api/openapi.json"
}
}