From 06fe5a03338fc41cdd84cebaf004c396199f5bea Mon Sep 17 00:00:00 2001 From: alex Date: Mon, 11 Sep 2023 23:03:11 +0200 Subject: [PATCH] added sse --- main.go | 60 +++++++++++++++++++++++++++++++- modules/cache/cache.go | 9 +++++ modules/config/config.go | 15 ++++---- modules/loghandler/loghandler.go | 28 +++++++++++++-- modules/structs/sseclient.go | 14 ++++++++ start.sh | 2 ++ 6 files changed, 118 insertions(+), 10 deletions(-) create mode 100644 modules/cache/cache.go create mode 100644 modules/structs/sseclient.go create mode 100755 start.sh diff --git a/main.go b/main.go index a0362cf..9f90ce3 100644 --- a/main.go +++ b/main.go @@ -15,13 +15,20 @@ package main import ( + "bufio" + "fmt" + "jannex/log-manager/modules/cache" "jannex/log-manager/modules/config" + "jannex/log-manager/modules/structs" "jannex/log-manager/modules/utils" "jannex/log-manager/routers/router" "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/middleware/cors" "github.com/gofiber/fiber/v2/middleware/logger" + futils "github.com/gofiber/fiber/v2/utils" + "github.com/google/uuid" + "github.com/valyala/fasthttp" ) func init() { @@ -36,7 +43,7 @@ func main() { BodyLimit: 100 * 1024 * 1024, }) - app.Use(cors.New()) + app.Use(cors.New(cors.Config{})) app.Use(logger.New(logger.Config{ Format: "${pid} ${locals:requestid} ${status} - ${latency} ${method} ${path}​\n", @@ -44,5 +51,56 @@ func main() { router.SetupRoutes(app) + if config.Cfg.SSEServerEnabled { + app.Get("/v1/log/sse/:logType/:date", func(c *fiber.Ctx) error { + fmt.Println("GET /:logType/:date", c.Params("logType"), c.Params("date")) + + logType := futils.CopyString(c.Params("logType")) + date := futils.CopyString(c.Params("date")) + + c.Set("Content-Type", "text/event-stream") + c.Set("Cache-Control", "no-cache") + c.Set("Connection", "keep-alive") + c.Set("Transfer-Encoding", "chunked") + + c.Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) { + var sseclient structs.SSEClient + + sseclient.MessageChannel = make(chan structs.SSEClientChannelMessage) + sseclient.LogType = logType + sseclient.Date = date + + clientId := uuid.New() + + cache.SSEClients[clientId] = sseclient + + fmt.Printf("NEW CLIENT %v", sseclient) + fmt.Printf("%d CLIENTS CONNECTED", len(cache.SSEClients)) + + for message := range sseclient.MessageChannel { + fmt.Fprintf(w, "data: %s\n\n", message.Message) + + err := w.Flush() + + if err != nil { + // Refreshing page in web browser will establish a new + // SSE connection, but only (the last) one is alive, so + // dead connections must be closed here. + + for id, sseClient := range cache.SSEClients { + if id == message.ClientId { + close(sseClient.MessageChannel) + delete(cache.SSEClients, id) + fmt.Printf("DELETE clientId: %s", id) + } + } + } + } + })) + + return nil + }) + } + app.Listen(config.Cfg.Host + ":" + config.Cfg.Port) } diff --git a/modules/cache/cache.go b/modules/cache/cache.go new file mode 100644 index 0000000..17a13d8 --- /dev/null +++ b/modules/cache/cache.go @@ -0,0 +1,9 @@ +package cache + +import ( + "jannex/log-manager/modules/structs" + + "github.com/google/uuid" +) + +var SSEClients = make(map[uuid.UUID]structs.SSEClient) diff --git a/modules/config/config.go b/modules/config/config.go index c922ea0..e5c0736 100644 --- a/modules/config/config.go +++ b/modules/config/config.go @@ -10,9 +10,10 @@ import ( var Cfg Config type Config struct { - Host string - Port string - LogFolder string + Host string + Port string + LogFolder string + SSEServerEnabled bool } func LoadConfig() { @@ -25,9 +26,9 @@ func LoadConfig() { } Cfg = Config{ - - Host: os.Getenv("HOST"), - Port: os.Getenv("PORT"), - LogFolder: os.Getenv("LOG_FOLDER"), + Host: os.Getenv("HOST"), + Port: os.Getenv("PORT"), + LogFolder: os.Getenv("LOG_FOLDER"), + SSEServerEnabled: os.Getenv("SSE_SERVER_ENABLED") == "true", } } diff --git a/modules/loghandler/loghandler.go b/modules/loghandler/loghandler.go index 1fc135e..e50df29 100644 --- a/modules/loghandler/loghandler.go +++ b/modules/loghandler/loghandler.go @@ -2,7 +2,9 @@ package loghandler import ( "bufio" + "encoding/json" "fmt" + "jannex/log-manager/modules/cache" "jannex/log-manager/modules/config" "jannex/log-manager/modules/structs" "jannex/log-manager/modules/utils" @@ -33,12 +35,12 @@ func getFileMutex(filePath string) *sync.Mutex { func AddLog(body structs.LogBody) { year, month, day := time.Now().Date() - + date := strconv.Itoa(day) + "-" + strconv.Itoa(int(month)) + "-" + strconv.Itoa(year) logFolder := config.Cfg.LogFolder utils.CreateDirectoryIfNotExists(logFolder + body.Type) - path := logFolder + body.Type + "/" + strconv.Itoa(day) + "-" + strconv.Itoa(int(month)) + "-" + strconv.Itoa(year) + ".log" + path := logFolder + body.Type + "/" + date + ".log" // get the mutex for this file mutex := getFileMutex(path) @@ -61,6 +63,24 @@ func AddLog(body structs.LogBody) { fmt.Println(err) } } + + for clientId, sseClient := range cache.SSEClients { + if sseClient.LogType == body.Type && sseClient.Date == date { + fmt.Println("Sending message to client", clientId) + + marshaledLogs, err := json.Marshal(body.Logs) + + if err != nil { + fmt.Println(err) + return + } + + sseClient.MessageChannel <- structs.SSEClientChannelMessage{ + ClientId: clientId, + Message: marshaledLogs, + } + } + } } func GetAvailableLogFiles(logType string) ([]string, error) { @@ -136,5 +156,9 @@ func GetAvailableLogTypes() []string { availableLogTypes = append(availableLogTypes, file.Name()) } + if len(availableLogTypes) == 0 { + return []string{} + } + return availableLogTypes } diff --git a/modules/structs/sseclient.go b/modules/structs/sseclient.go new file mode 100644 index 0000000..68389d3 --- /dev/null +++ b/modules/structs/sseclient.go @@ -0,0 +1,14 @@ +package structs + +import "github.com/google/uuid" + +type SSEClient struct { + MessageChannel chan SSEClientChannelMessage + LogType string + Date string +} + +type SSEClientChannelMessage struct { + ClientId uuid.UUID + Message []byte +} diff --git a/start.sh b/start.sh new file mode 100755 index 0000000..ee52aa4 --- /dev/null +++ b/start.sh @@ -0,0 +1,2 @@ +screen -dmS log-manager | exit 0 +screen -S log-manager -p 0 -X stuff 'go run main.go\n' \ No newline at end of file