#!/bin/bash # BlackRoad Memory Real-Time Streaming Server # Live event stream via WebSocket + SSE (Server-Sent Events) MEMORY_DIR="$HOME/.blackroad/memory" STREAM_DIR="$MEMORY_DIR/stream" STREAM_DB="$STREAM_DIR/stream.db" STREAM_PORT="${STREAM_PORT:-9999}" SSE_PORT="${SSE_PORT:-9998}" # Colors GREEN='\033[0;32m' YELLOW='\033[1;33m' RED='\033[0;31m' CYAN='\033[0;36m' PURPLE='\033[0;35m' BLUE='\033[0;34m' NC='\033[0m' init() { echo -e "${PURPLE}╔════════════════════════════════════════════════╗${NC}" echo -e "${PURPLE}║ 🌊 Real-Time Memory Streaming Server ║${NC}" echo -e "${PURPLE}╚════════════════════════════════════════════════╝${NC}\n" mkdir -p "$STREAM_DIR" # Create streaming database sqlite3 "$STREAM_DB" <<'SQL' -- Stream subscribers CREATE TABLE IF NOT EXISTS subscribers ( id INTEGER PRIMARY KEY AUTOINCREMENT, client_id TEXT UNIQUE NOT NULL, connection_type TEXT NOT NULL, -- 'websocket' or 'sse' filters TEXT, -- JSON filters connected_at INTEGER NOT NULL, last_ping INTEGER, status TEXT DEFAULT 'active' ); -- Stream events (last 10k events) CREATE TABLE IF NOT EXISTS stream_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, event_type TEXT NOT NULL, event_data TEXT NOT NULL, -- JSON event data timestamp INTEGER NOT NULL, broadcasted INTEGER DEFAULT 0 ); -- Subscriber activity CREATE TABLE IF NOT EXISTS subscriber_activity ( id INTEGER PRIMARY KEY AUTOINCREMENT, client_id TEXT NOT NULL, event_type TEXT NOT NULL, events_received INTEGER DEFAULT 0, last_received INTEGER, FOREIGN KEY (client_id) REFERENCES subscribers(client_id) ); -- Create indexes CREATE INDEX IF NOT EXISTS idx_stream_events_timestamp ON stream_events(timestamp); CREATE INDEX IF NOT EXISTS idx_stream_events_broadcasted ON stream_events(broadcasted); CREATE INDEX IF NOT EXISTS idx_subscribers_status ON subscribers(status); SQL # Create named pipes for streaming [ -p "$STREAM_DIR/memory.fifo" ] || mkfifo "$STREAM_DIR/memory.fifo" [ -p "$STREAM_DIR/events.fifo" ] || mkfifo "$STREAM_DIR/events.fifo" echo -e "${GREEN}✓${NC} Real-time streaming server initialized" echo -e " ${CYAN}Stream DB:${NC} $STREAM_DB" echo -e " ${CYAN}WebSocket Port:${NC} $STREAM_PORT" echo -e " ${CYAN}SSE Port:${NC} $SSE_PORT" } # Watch memory journal for changes watch_journal() { local journal="$MEMORY_DIR/journals/master-journal.jsonl" echo -e "${CYAN}👁️ Watching memory journal for changes...${NC}" # Get current line count local last_line=$(wc -l < "$journal" 2>/dev/null || echo 0) while true; do sleep 1 local current_line=$(wc -l < "$journal" 2>/dev/null || echo 0) if [ "$current_line" -gt "$last_line" ]; then # New entries detected local new_entries=$((current_line - last_line)) echo -e "${GREEN}📥 $new_entries new entries detected${NC}" # Read new entries tail -n "$new_entries" "$journal" | while IFS= read -r entry; do # Broadcast to all subscribers broadcast_event "memory.entry" "$entry" # Store in stream events local timestamp=$(date +%s) sqlite3 "$STREAM_DB" <> "$STREAM_DIR/events.fifo" 2>/dev/null || true # Update broadcast status sqlite3 "$STREAM_DB" </dev/null } | nc -l "$SSE_PORT" 2>/dev/null sleep 0.1 done } # Register subscriber register_subscriber() { local client_id="$1" local connection_type="$2" local filters="${3:-null}" local timestamp=$(date +%s) sqlite3 "$STREAM_DB" < strftime('%s', 'now', '-1 day') GROUP BY event_type ORDER BY count DESC; SQL } # Start all streaming services start_all() { echo -e "${PURPLE}╔════════════════════════════════════════════════╗${NC}" echo -e "${PURPLE}║ 🚀 Starting All Streaming Services ║${NC}" echo -e "${PURPLE}╚════════════════════════════════════════════════╝${NC}\n" # Start journal watcher in background watch_journal & local watch_pid=$! echo -e "${GREEN}✓${NC} Journal watcher started (PID: $watch_pid)" # Start SSE server in background start_sse_server & local sse_pid=$! echo -e "${GREEN}✓${NC} SSE server started (PID: $sse_pid)" # Save PIDs echo "$watch_pid" > "$STREAM_DIR/watch.pid" echo "$sse_pid" > "$STREAM_DIR/sse.pid" echo -e "\n${GREEN}🌊 All streaming services running!${NC}" echo -e " ${CYAN}SSE Endpoint:${NC} http://localhost:$SSE_PORT" echo -e " ${CYAN}Subscribe:${NC} curl http://localhost:$SSE_PORT" echo -e "\n${YELLOW}Press Ctrl+C to stop${NC}" # Wait for processes wait } # Stop all streaming services stop_all() { echo -e "${YELLOW}🛑 Stopping all streaming services...${NC}" # Kill watch process if [ -f "$STREAM_DIR/watch.pid" ]; then local watch_pid=$(cat "$STREAM_DIR/watch.pid") kill "$watch_pid" 2>/dev/null && echo -e "${GREEN}✓${NC} Journal watcher stopped" rm "$STREAM_DIR/watch.pid" fi # Kill SSE server if [ -f "$STREAM_DIR/sse.pid" ]; then local sse_pid=$(cat "$STREAM_DIR/sse.pid") kill "$sse_pid" 2>/dev/null && echo -e "${GREEN}✓${NC} SSE server stopped" rm "$STREAM_DIR/sse.pid" fi # Kill any nc processes on our ports pkill -f "nc -l $SSE_PORT" 2>/dev/null echo -e "${GREEN}✓${NC} All streaming services stopped" } # Test stream test_stream() { echo -e "${PURPLE}╔════════════════════════════════════════════════╗${NC}" echo -e "${PURPLE}║ Stream Test ║${NC}" echo -e "${PURPLE}╚════════════════════════════════════════════════╝${NC}\n" # Generate test event local test_event="{\"action\":\"test\",\"entity\":\"stream-test\",\"timestamp\":$(date +%s)}" echo -e "${CYAN}📤 Broadcasting test event:${NC}" echo "$test_event" | jq '.' 2>/dev/null || echo "$test_event" broadcast_event "memory.test" "$test_event" echo -e "\n${GREEN}✓${NC} Test event broadcasted" echo -e "${YELLOW}💡 To receive: curl http://localhost:$SSE_PORT${NC}" } # Create web client create_web_client() { local client_file="$STREAM_DIR/stream-client.html" cat > "$client_file" <<'HTML' BlackRoad Memory Stream - Live

🌊 BlackRoad Memory Stream

Disconnected
0
Events Received
--
Connected For
Never
Last Event

No events yet. Click "Connect" to start streaming.

HTML echo -e "${GREEN}✓${NC} Web client created: $client_file" echo -e "${CYAN}💡 Open in browser:${NC} open $client_file" } # Main execution case "${1:-help}" in init) init create_web_client ;; start) start_all ;; stop) stop_all ;; watch) watch_journal ;; subscribers) show_subscribers ;; stats) show_stats ;; test) test_stream ;; client) create_web_client ;; help|*) echo -e "${PURPLE}╔════════════════════════════════════════════════╗${NC}" echo -e "${PURPLE}║ 🌊 Real-Time Memory Streaming Server ║${NC}" echo -e "${PURPLE}╚════════════════════════════════════════════════╝${NC}\n" echo "Live event streaming for BlackRoad Memory System" echo "" echo "Usage: $0 COMMAND" echo "" echo "Setup:" echo " init - Initialize streaming server" echo "" echo "Server:" echo " start - Start all streaming services" echo " stop - Stop all streaming services" echo " watch - Watch journal only (no server)" echo "" echo "Monitoring:" echo " subscribers - Show active subscribers" echo " stats - Show stream statistics" echo "" echo "Testing:" echo " test - Broadcast test event" echo " client - Create web client" echo "" echo "Examples:" echo " $0 init" echo " $0 start" echo " $0 stats" echo "" echo "Connect to stream:" echo " curl http://localhost:$SSE_PORT" echo " open ~/.blackroad/memory/stream/stream-client.html" ;; esac