From a11fc2cd5c4ed8d446ca5bc925452130aec21c8d Mon Sep 17 00:00:00 2001 From: alex Date: Fri, 13 Oct 2023 16:47:12 +0200 Subject: [PATCH] moved sse --- main.go | 58 -------------------------------- modules/loghandler/loghandler.go | 2 -- routers/router/api/v1/log/log.go | 53 +++++++++++++++++++++++++++++ routers/router/router.go | 1 + 4 files changed, 54 insertions(+), 60 deletions(-) diff --git a/main.go b/main.go index 4c8a747..c3cc3a3 100644 --- a/main.go +++ b/main.go @@ -15,21 +15,14 @@ package main import ( - "bufio" - "fmt" - "jannex/log-manager/modules/cache" "jannex/log-manager/modules/config" "jannex/log-manager/modules/loghandler" - "jannex/log-manager/modules/structs" "jannex/log-manager/modules/utils" "jannex/log-manager/routers/router" "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/middleware/cors" "github.com/gofiber/fiber/v2/middleware/logger" - futils "github.com/gofiber/fiber/v2/utils" - "github.com/google/uuid" - "github.com/valyala/fasthttp" ) func init() { @@ -58,56 +51,5 @@ func main() { router.SetupRoutes(app) - if config.Cfg.SSEServerEnabled { - app.Get("/v1/log/sse/:logType/:date", func(c *fiber.Ctx) error { - fmt.Println("GET /:logType/:date", c.Params("logType"), c.Params("date")) - - logType := futils.CopyString(c.Params("logType")) - date := futils.CopyString(c.Params("date")) - - c.Set("Content-Type", "text/event-stream") - c.Set("Cache-Control", "no-cache") - c.Set("Connection", "keep-alive") - c.Set("Transfer-Encoding", "chunked") - - c.Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) { - var sseclient structs.SSEClient - - sseclient.MessageChannel = make(chan structs.SSEClientChannelMessage) - sseclient.LogType = logType - sseclient.Date = date - - clientId := uuid.New() - - cache.SSEClients[clientId] = sseclient - - fmt.Printf("NEW CLIENT %v", sseclient) - fmt.Printf("%d CLIENTS CONNECTED", len(cache.SSEClients)) - - for message := range sseclient.MessageChannel { - fmt.Fprintf(w, "data: %s\n\n", message.Message) - - err := w.Flush() - - if err != nil { - // Refreshing page in web browser will establish a new - // SSE connection, but only (the last) one is alive, so - // dead connections must be closed here. - - for id, sseClient := range cache.SSEClients { - if id == message.ClientId { - close(sseClient.MessageChannel) - delete(cache.SSEClients, id) - fmt.Printf("DELETE clientId: %s", id) - } - } - } - } - })) - - return nil - }) - } - app.Listen(config.Cfg.Host + ":" + config.Cfg.Port) } diff --git a/modules/loghandler/loghandler.go b/modules/loghandler/loghandler.go index eea7894..1a0336e 100644 --- a/modules/loghandler/loghandler.go +++ b/modules/loghandler/loghandler.go @@ -67,8 +67,6 @@ func AddLog(body structs.LogBody) { for clientId, sseClient := range cache.SSEClients { if sseClient.LogType == body.Type && sseClient.Date == date { - fmt.Println("Sending message to client", clientId) - marshaledLogs, err := json.Marshal(body.Logs) if err != nil { diff --git a/routers/router/api/v1/log/log.go b/routers/router/api/v1/log/log.go index 4a12396..7ad274b 100644 --- a/routers/router/api/v1/log/log.go +++ b/routers/router/api/v1/log/log.go @@ -1,12 +1,18 @@ package log import ( + "bufio" "fmt" + "jannex/log-manager/modules/cache" + "jannex/log-manager/modules/config" "jannex/log-manager/modules/loghandler" "jannex/log-manager/modules/structs" "jannex/log-manager/modules/utils" "github.com/gofiber/fiber/v2" + futils "github.com/gofiber/fiber/v2/utils" + "github.com/google/uuid" + "github.com/valyala/fasthttp" ) func AddLog(c *fiber.Ctx) error { @@ -128,3 +134,50 @@ func GetAvailableLogTypes(c *fiber.Ctx) error { return c.JSON(logTypes) } + +func SSE(c *fiber.Ctx) error { + if !config.Cfg.SSEServerEnabled { + return c.SendStatus(fiber.StatusNotFound) + } + + logType := futils.CopyString(c.Params("logType")) + date := futils.CopyString(c.Params("date")) + + c.Set("Content-Type", "text/event-stream") + c.Set("Cache-Control", "no-cache") + c.Set("Connection", "keep-alive") + c.Set("Transfer-Encoding", "chunked") + + c.Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) { + var sseclient structs.SSEClient + + sseclient.MessageChannel = make(chan structs.SSEClientChannelMessage) + sseclient.LogType = logType + sseclient.Date = date + + clientId := uuid.New() + + cache.SSEClients[clientId] = sseclient + + for message := range sseclient.MessageChannel { + fmt.Fprintf(w, "data: %s\n\n", message.Message) + + err := w.Flush() + + if err != nil { + // Refreshing page in web browser will establish a new + // SSE connection, but only (the last) one is alive, so + // dead connections must be closed here. + + for id, sseClient := range cache.SSEClients { + if id == message.ClientId { + close(sseClient.MessageChannel) + delete(cache.SSEClients, id) + } + } + } + } + })) + + return nil +} diff --git a/routers/router/router.go b/routers/router/router.go index c0336b2..44d597a 100644 --- a/routers/router/router.go +++ b/routers/router/router.go @@ -12,6 +12,7 @@ func SetupRoutes(app *fiber.App) { l := v1.Group("/log") l.Post("/", log.AddLog) l.Get("/types", log.GetAvailableLogTypes) + l.Get("/sse/:logType/:date", log.SSE) ls := v1.Group("/logs") ls.Get("/:type", log.GetLog)