Files
blackroad-watch/test-udp-sender.py

76 lines
2.2 KiB
Python
Executable File

#!/usr/bin/env python3
"""BlackRoad Watch - UDP Simulator
Simulates M1s Dock broadcasts for testing the iPhone/Watch app
without the physical board. Broadcasts on port 8420."""
import socket
import json
import time
import random
import sys
PORT = 8420
PINK = "\033[38;5;205m"
AMBER = "\033[38;5;214m"
GREEN = "\033[38;5;82m"
RESET = "\033[0m"
def main():
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
print(f"{PINK}[BR]{RESET} BlackRoad Watch UDP Simulator")
print(f"{AMBER} Broadcasting on port {PORT}...{RESET}")
print(f" Press Ctrl+C to stop")
print()
uptime = 0
infers = 0
while True:
try:
uptime += 1
infers += 1
temp = 23.5 + (uptime % 20 - 10) * 0.1
hum = 45.0 + (uptime % 30 - 15) * 0.2
msg = {
"type": "br_watch",
"sensor": {
"temp": round(temp, 1),
"hum": round(hum, 1),
"bat": 3700 - (uptime % 100),
"up": uptime
},
"ai": {
"load": 45 + random.randint(-10, 10),
"temp": 42 + random.randint(-3, 5),
"infers": infers,
"conf": 85 + random.randint(-5, 10)
},
"fleet": {
"on": 7,
"total": 8,
"agents": 3 + random.randint(0, 2),
"green": 58,
"tasks": 2298 + uptime,
"mem": 156866 + uptime * 3,
"repos": 1085,
"cf": 205
}
}
data = json.dumps(msg).encode('utf-8')
sock.sendto(data, ('<broadcast>', PORT))
print(f" {GREEN}>>>{RESET} packet #{uptime} temp={temp:.1f} npu={msg['ai']['load']}% infers={infers}", end='\r')
sys.stdout.flush()
time.sleep(1)
except KeyboardInterrupt:
print(f"\n\n{PINK}[BR]{RESET} Stopped. Sent {uptime} packets.")
break
if __name__ == '__main__':
main()