Add graceful shutdown mechanism to StreamHub
Co-authored-by: blackboxprogramming <118287761+blackboxprogramming@users.noreply.github.com>
This commit is contained in:
5
.gitignore
vendored
Normal file
5
.gitignore
vendored
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
# Binaries
|
||||||
|
beacon
|
||||||
|
|
||||||
|
# Build artifacts
|
||||||
|
*.db
|
||||||
@@ -1,9 +1,11 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"os/signal"
|
||||||
|
"syscall"
|
||||||
|
|
||||||
"blackroad-os-beacon/config"
|
"blackroad-os-beacon/config"
|
||||||
"blackroad-os-beacon/internal/db"
|
"blackroad-os-beacon/internal/db"
|
||||||
@@ -25,8 +27,12 @@ func main() {
|
|||||||
}
|
}
|
||||||
defer database.Close()
|
defer database.Close()
|
||||||
|
|
||||||
|
// Create context for graceful shutdown
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
hub := handler.NewStreamHub()
|
hub := handler.NewStreamHub()
|
||||||
go hub.Run()
|
go hub.Run(ctx)
|
||||||
|
|
||||||
app := fiber.New(fiber.Config{AppName: "blackroad-os-beacon"})
|
app := fiber.New(fiber.Config{AppName: "blackroad-os-beacon"})
|
||||||
|
|
||||||
@@ -42,6 +48,19 @@ func main() {
|
|||||||
|
|
||||||
log.Printf("beacon listening on %s", addr)
|
log.Printf("beacon listening on %s", addr)
|
||||||
|
|
||||||
|
// Setup signal handling for graceful shutdown
|
||||||
|
sigChan := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
<-sigChan
|
||||||
|
log.Println("shutting down gracefully...")
|
||||||
|
cancel() // Cancel the context to stop the hub
|
||||||
|
if err := app.Shutdown(); err != nil {
|
||||||
|
log.Printf("error during shutdown: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
if err := app.Listen(addr); err != nil {
|
if err := app.Listen(addr); err != nil {
|
||||||
log.Fatalf("listen: %v", err)
|
log.Fatalf("listen: %v", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package handler
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
@@ -30,7 +31,9 @@ func TestIngestStoresPingAndBroadcasts(t *testing.T) {
|
|||||||
t.Cleanup(func() { database.Close() })
|
t.Cleanup(func() { database.Close() })
|
||||||
|
|
||||||
hub := NewStreamHub()
|
hub := NewStreamHub()
|
||||||
go hub.Run()
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
go hub.Run(ctx)
|
||||||
|
|
||||||
app := setupTestApp(t, database, hub, "")
|
app := setupTestApp(t, database, hub, "")
|
||||||
|
|
||||||
@@ -67,7 +70,9 @@ func TestHMACMiddleware(t *testing.T) {
|
|||||||
t.Cleanup(func() { database.Close() })
|
t.Cleanup(func() { database.Close() })
|
||||||
|
|
||||||
hub := NewStreamHub()
|
hub := NewStreamHub()
|
||||||
go hub.Run()
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
go hub.Run(ctx)
|
||||||
app := setupTestApp(t, database, hub, "secret")
|
app := setupTestApp(t, database, hub, "secret")
|
||||||
|
|
||||||
body := []byte(`{"env":"core","status":"ok"}`)
|
body := []byte(`{"env":"core","status":"ok"}`)
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package handler
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"log"
|
"log"
|
||||||
"time"
|
"time"
|
||||||
@@ -30,9 +31,12 @@ func NewStreamHub() *StreamHub {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Run processes registration and broadcasts.
|
// Run processes registration and broadcasts.
|
||||||
func (h *StreamHub) Run() {
|
func (h *StreamHub) Run(ctx context.Context) {
|
||||||
|
defer h.shutdown()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
case client := <-h.register:
|
case client := <-h.register:
|
||||||
h.clients[client] = struct{}{}
|
h.clients[client] = struct{}{}
|
||||||
case client := <-h.unregister:
|
case client := <-h.unregister:
|
||||||
@@ -50,6 +54,13 @@ func (h *StreamHub) Run() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// shutdown closes all client channels when the hub stops.
|
||||||
|
func (h *StreamHub) shutdown() {
|
||||||
|
for client := range h.clients {
|
||||||
|
close(client)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Broadcast sends message to all subscribers.
|
// Broadcast sends message to all subscribers.
|
||||||
func (h *StreamHub) Broadcast(p model.Ping) {
|
func (h *StreamHub) Broadcast(p model.Ping) {
|
||||||
select {
|
select {
|
||||||
|
|||||||
Reference in New Issue
Block a user