bin/ 230 CLI tools (ask-*, br-*, agent-*, roadid, carpool) scripts/ 99 automation scripts fleet/ Node configs and deployment workers/ Cloudflare Worker sources (roadpay, road-search, squad webhooks) roadc/ RoadC programming language roadnet/ Mesh network (5 APs, WireGuard) operator/ Memory system scripts config/ System configs dotfiles/ Shell configs docs/ Documentation BlackRoad OS — Pave Tomorrow. RoadChain-SHA2048: d1a24f55318d338b RoadChain-Identity: alexa@sovereign RoadChain-Full: d1a24f55318d338b24b60bad7be39286379c76ae5470817482100cb0ddbbcb97e147d07ac7243da0a9f0363e4e5c833d612b9c0df3a3cd20802465420278ef74875a5b77f55af6fe42a931b8b635b3d0d0b6bde9abf33dc42eea52bc03c951406d8cbe49f1a3d29b26a94dade05e9477f34a7d4d4c6ec4005c3c2ac54e73a68440c512c8e83fd9b1fe234750b898ef8f4032c23db173961fe225e67a0432b5293a9714f76c5c57ed5fdf35b9fb40fd73c03ebf88b7253c6a0575f5afb6a6b49b3bda310602fb1ef676859962dad2aebbb2875814b30eee0a8ba195e482d4cbc91d8819e7f38f6db53e8063401649c77bb994371473cabfb917fb53e8cbe73d60
472 lines
13 KiB
Bash
Executable File
472 lines
13 KiB
Bash
Executable File
#!/bin/bash
|
|
# BlackRoad Task Queue
|
|
# Distributed message queue for async task processing
|
|
# Agent: Icarus (b3e01bd9)
|
|
|
|
PINK='\033[38;5;205m'
|
|
GREEN='\033[0;32m'
|
|
BLUE='\033[0;34m'
|
|
YELLOW='\033[1;33m'
|
|
RED='\033[0;31m'
|
|
CYAN='\033[0;36m'
|
|
RESET='\033[0m'
|
|
|
|
QUEUE_DIR="$HOME/.blackroad/queue"
|
|
QUEUE_DB="$QUEUE_DIR/queue.db"
|
|
ALL_NODES=("lucidia" "cecilia" "octavia" "aria" "alice")
|
|
|
|
# Initialize
|
|
init() {
|
|
mkdir -p "$QUEUE_DIR"/{dead-letter,results}
|
|
|
|
sqlite3 "$QUEUE_DB" << 'SQL'
|
|
CREATE TABLE IF NOT EXISTS queues (
|
|
name TEXT PRIMARY KEY,
|
|
max_size INTEGER DEFAULT 10000,
|
|
visibility_timeout INTEGER DEFAULT 30,
|
|
retention_hours INTEGER DEFAULT 24,
|
|
dead_letter_queue TEXT,
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS messages (
|
|
id TEXT PRIMARY KEY,
|
|
queue TEXT,
|
|
payload TEXT,
|
|
priority INTEGER DEFAULT 0,
|
|
delay_until DATETIME,
|
|
visibility_timeout DATETIME,
|
|
receive_count INTEGER DEFAULT 0,
|
|
max_receives INTEGER DEFAULT 3,
|
|
status TEXT DEFAULT 'pending',
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
processed_at DATETIME
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS workers (
|
|
id TEXT PRIMARY KEY,
|
|
queue TEXT,
|
|
node TEXT,
|
|
status TEXT DEFAULT 'idle',
|
|
last_heartbeat DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
messages_processed INTEGER DEFAULT 0
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_queue ON messages(queue);
|
|
CREATE INDEX IF NOT EXISTS idx_status ON messages(status);
|
|
CREATE INDEX IF NOT EXISTS idx_priority ON messages(priority DESC);
|
|
SQL
|
|
|
|
# Create default queues
|
|
seed_queues
|
|
|
|
echo -e "${GREEN}Task queue initialized${RESET}"
|
|
}
|
|
|
|
# Seed default queues
|
|
seed_queues() {
|
|
sqlite3 "$QUEUE_DB" << 'SQL'
|
|
INSERT OR IGNORE INTO queues (name, max_size, visibility_timeout, retention_hours) VALUES
|
|
('default', 10000, 30, 24),
|
|
('inference', 5000, 300, 12),
|
|
('batch', 1000, 600, 48),
|
|
('priority', 100, 60, 6),
|
|
('dead-letter', 10000, 0, 168);
|
|
SQL
|
|
}
|
|
|
|
# Create queue
|
|
create_queue() {
|
|
local name="$1"
|
|
local max_size="${2:-10000}"
|
|
local timeout="${3:-30}"
|
|
local retention="${4:-24}"
|
|
|
|
sqlite3 "$QUEUE_DB" "
|
|
INSERT OR REPLACE INTO queues (name, max_size, visibility_timeout, retention_hours)
|
|
VALUES ('$name', $max_size, $timeout, $retention)
|
|
"
|
|
|
|
echo -e "${GREEN}Queue created: $name${RESET}"
|
|
}
|
|
|
|
# Send message to queue
|
|
send() {
|
|
local queue="$1"
|
|
local payload="$2"
|
|
local priority="${3:-0}"
|
|
local delay="${4:-0}"
|
|
|
|
local msg_id="msg_$(date +%s%N)_$(openssl rand -hex 4)"
|
|
|
|
local delay_until="NULL"
|
|
[ "$delay" -gt 0 ] && delay_until="datetime('now', '+$delay seconds')"
|
|
|
|
sqlite3 "$QUEUE_DB" "
|
|
INSERT INTO messages (id, queue, payload, priority, delay_until)
|
|
VALUES ('$msg_id', '$queue', '$(echo "$payload" | sed "s/'/''/g")', $priority, $delay_until)
|
|
"
|
|
|
|
echo -e "${GREEN}Sent: $msg_id${RESET}"
|
|
echo "$msg_id"
|
|
}
|
|
|
|
# Send batch messages
|
|
send_batch() {
|
|
local queue="$1"
|
|
local file="$2"
|
|
|
|
local count=0
|
|
while IFS= read -r payload; do
|
|
[ -z "$payload" ] && continue
|
|
send "$queue" "$payload" >/dev/null
|
|
((count++))
|
|
done < "$file"
|
|
|
|
echo -e "${GREEN}Sent $count messages to $queue${RESET}"
|
|
}
|
|
|
|
# Receive message from queue
|
|
receive() {
|
|
local queue="$1"
|
|
local visibility="${2:-30}"
|
|
local wait="${3:-0}"
|
|
|
|
local start=$(date +%s)
|
|
|
|
while true; do
|
|
# Get next available message
|
|
local msg=$(sqlite3 "$QUEUE_DB" "
|
|
SELECT id, payload, receive_count FROM messages
|
|
WHERE queue = '$queue'
|
|
AND status = 'pending'
|
|
AND (delay_until IS NULL OR datetime(delay_until) <= datetime('now'))
|
|
AND (visibility_timeout IS NULL OR datetime(visibility_timeout) <= datetime('now'))
|
|
ORDER BY priority DESC, created_at ASC
|
|
LIMIT 1
|
|
")
|
|
|
|
if [ -n "$msg" ]; then
|
|
local msg_id=$(echo "$msg" | cut -d'|' -f1)
|
|
local payload=$(echo "$msg" | cut -d'|' -f2)
|
|
local receive_count=$(echo "$msg" | cut -d'|' -f3)
|
|
|
|
# Update visibility timeout
|
|
sqlite3 "$QUEUE_DB" "
|
|
UPDATE messages
|
|
SET visibility_timeout = datetime('now', '+$visibility seconds'),
|
|
receive_count = receive_count + 1,
|
|
status = 'processing'
|
|
WHERE id = '$msg_id'
|
|
"
|
|
|
|
echo "$msg_id|$payload"
|
|
return 0
|
|
fi
|
|
|
|
# Long polling
|
|
if [ "$wait" -gt 0 ]; then
|
|
local elapsed=$(($(date +%s) - start))
|
|
if [ "$elapsed" -lt "$wait" ]; then
|
|
sleep 1
|
|
continue
|
|
fi
|
|
fi
|
|
|
|
return 1
|
|
done
|
|
}
|
|
|
|
# Delete message (acknowledge)
|
|
delete() {
|
|
local msg_id="$1"
|
|
|
|
sqlite3 "$QUEUE_DB" "
|
|
UPDATE messages SET status = 'completed', processed_at = datetime('now')
|
|
WHERE id = '$msg_id'
|
|
"
|
|
|
|
echo -e "${GREEN}Deleted: $msg_id${RESET}"
|
|
}
|
|
|
|
# Return message to queue (nack)
|
|
nack() {
|
|
local msg_id="$1"
|
|
local delay="${2:-0}"
|
|
|
|
local delay_until="NULL"
|
|
[ "$delay" -gt 0 ] && delay_until="datetime('now', '+$delay seconds')"
|
|
|
|
sqlite3 "$QUEUE_DB" "
|
|
UPDATE messages
|
|
SET status = 'pending', visibility_timeout = NULL, delay_until = $delay_until
|
|
WHERE id = '$msg_id'
|
|
"
|
|
|
|
echo -e "${YELLOW}Returned to queue: $msg_id${RESET}"
|
|
}
|
|
|
|
# Move to dead letter queue
|
|
dead_letter() {
|
|
local msg_id="$1"
|
|
local reason="$2"
|
|
|
|
local payload=$(sqlite3 "$QUEUE_DB" "SELECT payload FROM messages WHERE id = '$msg_id'")
|
|
|
|
sqlite3 "$QUEUE_DB" "
|
|
UPDATE messages SET status = 'dead-letter', queue = 'dead-letter' WHERE id = '$msg_id'
|
|
"
|
|
|
|
# Store reason
|
|
echo "{\"original_id\":\"$msg_id\",\"reason\":\"$reason\",\"payload\":$payload}" > "$QUEUE_DIR/dead-letter/${msg_id}.json"
|
|
|
|
echo -e "${RED}Moved to dead-letter: $msg_id${RESET}"
|
|
}
|
|
|
|
# Process messages that exceeded max receives
|
|
process_failed() {
|
|
sqlite3 "$QUEUE_DB" "
|
|
SELECT id FROM messages
|
|
WHERE status = 'processing'
|
|
AND receive_count >= max_receives
|
|
" | while read -r msg_id; do
|
|
dead_letter "$msg_id" "max_receives_exceeded"
|
|
done
|
|
}
|
|
|
|
# List queues
|
|
list_queues() {
|
|
echo -e "${PINK}=== QUEUES ===${RESET}"
|
|
echo
|
|
|
|
sqlite3 "$QUEUE_DB" "
|
|
SELECT q.name, q.max_size, q.visibility_timeout,
|
|
(SELECT COUNT(*) FROM messages m WHERE m.queue = q.name AND m.status = 'pending'),
|
|
(SELECT COUNT(*) FROM messages m WHERE m.queue = q.name AND m.status = 'processing')
|
|
FROM queues q ORDER BY q.name
|
|
" | while IFS='|' read -r name max timeout pending processing; do
|
|
printf " %-15s pending:%-5d processing:%-5d (max:%d, timeout:%ds)\n" "$name" "$pending" "$processing" "$max" "$timeout"
|
|
done
|
|
}
|
|
|
|
# Queue stats
|
|
stats() {
|
|
local queue="${1:-all}"
|
|
|
|
echo -e "${PINK}=== QUEUE STATISTICS ===${RESET}"
|
|
echo
|
|
|
|
local where=""
|
|
[ "$queue" != "all" ] && where="WHERE queue = '$queue'"
|
|
|
|
echo "Message counts:"
|
|
sqlite3 "$QUEUE_DB" "
|
|
SELECT queue, status, COUNT(*) FROM messages $where GROUP BY queue, status
|
|
" | while IFS='|' read -r q status count; do
|
|
printf " %-15s %-12s %d\n" "$q" "$status" "$count"
|
|
done
|
|
|
|
echo
|
|
echo "Processing rate (last hour):"
|
|
sqlite3 "$QUEUE_DB" "
|
|
SELECT queue, COUNT(*) FROM messages
|
|
WHERE status = 'completed'
|
|
AND datetime(processed_at, '+1 hour') > datetime('now')
|
|
$where
|
|
GROUP BY queue
|
|
" | while IFS='|' read -r q count; do
|
|
printf " %-15s %d/hour\n" "$q" "$count"
|
|
done
|
|
}
|
|
|
|
# Peek at messages (don't consume)
|
|
peek() {
|
|
local queue="$1"
|
|
local limit="${2:-5}"
|
|
|
|
echo -e "${PINK}=== PEEK: $queue ===${RESET}"
|
|
echo
|
|
|
|
sqlite3 "$QUEUE_DB" "
|
|
SELECT id, priority, receive_count, status, created_at, substr(payload, 1, 50)
|
|
FROM messages WHERE queue = '$queue'
|
|
ORDER BY priority DESC, created_at ASC
|
|
LIMIT $limit
|
|
" | while IFS='|' read -r id pri recv status created payload; do
|
|
printf " %s [P%d R%d] %s\n" "$id" "$pri" "$recv" "$status"
|
|
echo " ${payload}..."
|
|
done
|
|
}
|
|
|
|
# Purge queue
|
|
purge() {
|
|
local queue="$1"
|
|
|
|
sqlite3 "$QUEUE_DB" "DELETE FROM messages WHERE queue = '$queue'"
|
|
|
|
echo -e "${GREEN}Purged queue: $queue${RESET}"
|
|
}
|
|
|
|
# Worker daemon
|
|
worker() {
|
|
local queue="${1:-default}"
|
|
local handler="${2:-echo}"
|
|
|
|
local worker_id="worker_$(hostname)_$$"
|
|
|
|
sqlite3 "$QUEUE_DB" "
|
|
INSERT OR REPLACE INTO workers (id, queue, node, status)
|
|
VALUES ('$worker_id', '$queue', '$(hostname)', 'running')
|
|
"
|
|
|
|
echo -e "${PINK}╔══════════════════════════════════════════════════════════════╗${RESET}"
|
|
echo -e "${PINK}║ ⚙️ QUEUE WORKER: $worker_id${RESET}"
|
|
echo -e "${PINK}╚══════════════════════════════════════════════════════════════╝${RESET}"
|
|
echo
|
|
echo "Queue: $queue"
|
|
echo "Handler: $handler"
|
|
echo "Press Ctrl+C to stop"
|
|
echo
|
|
|
|
trap "sqlite3 '$QUEUE_DB' \"UPDATE workers SET status = 'stopped' WHERE id = '$worker_id'\"; exit" SIGINT SIGTERM
|
|
|
|
local processed=0
|
|
|
|
while true; do
|
|
# Heartbeat
|
|
sqlite3 "$QUEUE_DB" "
|
|
UPDATE workers SET last_heartbeat = datetime('now'), messages_processed = $processed
|
|
WHERE id = '$worker_id'
|
|
"
|
|
|
|
# Process failed messages
|
|
process_failed
|
|
|
|
# Receive message
|
|
local msg=$(receive "$queue" 60 5)
|
|
|
|
if [ -n "$msg" ]; then
|
|
local msg_id=$(echo "$msg" | cut -d'|' -f1)
|
|
local payload=$(echo "$msg" | cut -d'|' -f2-)
|
|
|
|
echo "[$(date '+%H:%M:%S')] Processing $msg_id..."
|
|
|
|
# Execute handler
|
|
if eval "$handler" "'$payload'" >/dev/null 2>&1; then
|
|
delete "$msg_id" >/dev/null
|
|
((processed++))
|
|
echo " ✓ Success"
|
|
else
|
|
nack "$msg_id" 30 >/dev/null
|
|
echo " ✗ Failed, returning to queue"
|
|
fi
|
|
fi
|
|
done
|
|
}
|
|
|
|
# List workers
|
|
workers() {
|
|
echo -e "${PINK}=== WORKERS ===${RESET}"
|
|
echo
|
|
|
|
sqlite3 "$QUEUE_DB" "
|
|
SELECT id, queue, node, status, messages_processed, last_heartbeat FROM workers ORDER BY queue
|
|
" | while IFS='|' read -r id queue node status processed heartbeat; do
|
|
local status_color=$GREEN
|
|
[ "$status" = "stopped" ] && status_color=$RED
|
|
|
|
printf " %-30s %-10s %-10s ${status_color}%-8s${RESET} %d msgs\n" "$id" "$queue" "$node" "$status" "$processed"
|
|
done
|
|
}
|
|
|
|
# Clean old messages
|
|
cleanup() {
|
|
local hours="${1:-24}"
|
|
|
|
sqlite3 "$QUEUE_DB" "
|
|
DELETE FROM messages
|
|
WHERE status IN ('completed', 'dead-letter')
|
|
AND datetime(processed_at, '+$hours hours') < datetime('now')
|
|
"
|
|
|
|
echo -e "${GREEN}Cleaned messages older than ${hours}h${RESET}"
|
|
}
|
|
|
|
# Help
|
|
help() {
|
|
echo -e "${PINK}BlackRoad Task Queue${RESET}"
|
|
echo
|
|
echo "Distributed message queue for async processing"
|
|
echo
|
|
echo "Commands:"
|
|
echo " create-queue <name> [max] [timeout] Create queue"
|
|
echo " send <queue> <payload> [pri] [delay] Send message"
|
|
echo " send-batch <queue> <file> Send from file"
|
|
echo " receive <queue> [timeout] [wait] Receive message"
|
|
echo " delete <msg_id> Acknowledge"
|
|
echo " nack <msg_id> [delay] Return to queue"
|
|
echo " list-queues List queues"
|
|
echo " peek <queue> [limit] View messages"
|
|
echo " stats [queue] Statistics"
|
|
echo " worker <queue> [handler] Start worker"
|
|
echo " workers List workers"
|
|
echo " purge <queue> Clear queue"
|
|
echo " cleanup [hours] Remove old"
|
|
echo
|
|
echo "Examples:"
|
|
echo " $0 send inference '{\"prompt\":\"hello\"}'"
|
|
echo " $0 worker inference 'python process.py'"
|
|
echo " $0 send priority 'urgent task' 10"
|
|
}
|
|
|
|
# Ensure initialized
|
|
[ -f "$QUEUE_DB" ] || init >/dev/null
|
|
|
|
case "${1:-help}" in
|
|
init)
|
|
init
|
|
;;
|
|
create-queue|queue)
|
|
create_queue "$2" "$3" "$4" "$5"
|
|
;;
|
|
send|push|enqueue)
|
|
send "$2" "$3" "$4" "$5"
|
|
;;
|
|
send-batch|batch)
|
|
send_batch "$2" "$3"
|
|
;;
|
|
receive|pop|dequeue)
|
|
receive "$2" "$3" "$4"
|
|
;;
|
|
delete|ack)
|
|
delete "$2"
|
|
;;
|
|
nack|reject)
|
|
nack "$2" "$3"
|
|
;;
|
|
list-queues|queues)
|
|
list_queues
|
|
;;
|
|
peek|view)
|
|
peek "$2" "$3"
|
|
;;
|
|
stats)
|
|
stats "$2"
|
|
;;
|
|
worker|consume)
|
|
worker "$2" "$3"
|
|
;;
|
|
workers)
|
|
workers
|
|
;;
|
|
purge|clear)
|
|
purge "$2"
|
|
;;
|
|
cleanup|clean)
|
|
cleanup "$2"
|
|
;;
|
|
*)
|
|
help
|
|
;;
|
|
esac
|