Files
blackroad-watch/test-udp-listener.py

57 lines
2.1 KiB
Python
Executable File

#!/usr/bin/env python3
"""BlackRoad Watch - UDP Test Listener
Listens on port 8420 for JSON broadcasts from the M1s Dock.
Run this to verify the firmware is broadcasting correctly."""
import socket
import json
import sys
from datetime import datetime
PORT = 8420
PINK = "\033[38;5;205m"
AMBER = "\033[38;5;214m"
BLUE = "\033[38;5;69m"
GREEN = "\033[38;5;82m"
VIOLET = "\033[38;5;135m"
RESET = "\033[0m"
GRAY = "\033[38;5;245m"
def main():
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.bind(('', PORT))
print(f"{PINK}[BR]{RESET} BlackRoad Watch UDP Listener")
print(f"{GRAY} Listening on port {PORT}...{RESET}")
print(f"{GRAY} Press Ctrl+C to stop{RESET}")
print()
count = 0
while True:
try:
data, addr = sock.recvfrom(1024)
count += 1
now = datetime.now().strftime("%H:%M:%S")
try:
msg = json.loads(data.decode('utf-8'))
sensor = msg.get('sensor', {})
ai = msg.get('ai', {})
fleet = msg.get('fleet', {})
print(f"{GRAY}[{now}]{RESET} #{count} from {AMBER}{addr[0]}:{addr[1]}{RESET}")
print(f" {AMBER}Sensor{RESET} temp={sensor.get('temp','?')}C hum={sensor.get('hum','?')}% bat={sensor.get('bat','?')}mV up={sensor.get('up','?')}s")
print(f" {VIOLET}AI{RESET} load={ai.get('load','?')}% temp={ai.get('temp','?')}C infers={ai.get('infers','?')} conf={ai.get('conf','?')}%")
print(f" {PINK}Fleet{RESET} {fleet.get('on','?')}/{fleet.get('total','?')} devices {fleet.get('agents','?')} agents {GREEN}{fleet.get('green','?')} green{RESET} {fleet.get('tasks','?')} tasks")
print()
except json.JSONDecodeError:
print(f"{GRAY}[{now}]{RESET} Raw: {data[:80]}")
except KeyboardInterrupt:
print(f"\n{PINK}[BR]{RESET} Stopped. Received {count} packets.")
break
if __name__ == '__main__':
main()