From 684dab3a565d3298b48b4aafc46ba10b7d9cae71 Mon Sep 17 00:00:00 2001 From: alex Date: Sat, 14 Oct 2023 10:34:09 +0200 Subject: [PATCH] added sync mutex for sse clients --- commit_and_push.sh | 7 +++++++ modules/cache/cache.go | 25 ++++++++++++++++++++++++- modules/loghandler/loghandler.go | 2 +- routers/router/api/v1/log/log.go | 9 ++++----- 4 files changed, 36 insertions(+), 7 deletions(-) create mode 100755 commit_and_push.sh diff --git a/commit_and_push.sh b/commit_and_push.sh new file mode 100755 index 0000000..554786f --- /dev/null +++ b/commit_and_push.sh @@ -0,0 +1,7 @@ +git add * + +read -p "Commit message: " commit_message + +git commit -m "$commit_message" + +git push -u origin main \ No newline at end of file diff --git a/modules/cache/cache.go b/modules/cache/cache.go index 17a13d8..755e386 100644 --- a/modules/cache/cache.go +++ b/modules/cache/cache.go @@ -2,8 +2,31 @@ package cache import ( "jannex/log-manager/modules/structs" + "sync" "github.com/google/uuid" ) -var SSEClients = make(map[uuid.UUID]structs.SSEClient) +var sseClients = make(map[uuid.UUID]structs.SSEClient) +var sMu sync.RWMutex + +func AddSSEClient(client structs.SSEClient) { + sMu.Lock() + defer sMu.Unlock() + + sseClients[uuid.New()] = client +} + +func DeleteSSEClient(id uuid.UUID) { + sMu.Lock() + defer sMu.Unlock() + + delete(sseClients, id) +} + +func GetSSEClients() map[uuid.UUID]structs.SSEClient { + sMu.RLock() + defer sMu.RUnlock() + + return sseClients +} diff --git a/modules/loghandler/loghandler.go b/modules/loghandler/loghandler.go index 6490815..7543577 100644 --- a/modules/loghandler/loghandler.go +++ b/modules/loghandler/loghandler.go @@ -72,7 +72,7 @@ func AddLog(body structs.LogBody) { return } - for clientId, sseClient := range cache.SSEClients { + for clientId, sseClient := range cache.GetSSEClients() { if sseClient.LogType == body.Type && sseClient.Date == date { sseClient.MessageChannel <- structs.SSEClientChannelMessage{ ClientId: clientId, diff --git a/routers/router/api/v1/log/log.go b/routers/router/api/v1/log/log.go index 7ad274b..5a167f2 100644 --- a/routers/router/api/v1/log/log.go +++ b/routers/router/api/v1/log/log.go @@ -11,7 +11,6 @@ import ( "github.com/gofiber/fiber/v2" futils "github.com/gofiber/fiber/v2/utils" - "github.com/google/uuid" "github.com/valyala/fasthttp" ) @@ -155,9 +154,9 @@ func SSE(c *fiber.Ctx) error { sseclient.LogType = logType sseclient.Date = date - clientId := uuid.New() + //cache.SSEClients[clientId] = sseclient - cache.SSEClients[clientId] = sseclient + cache.AddSSEClient(sseclient) for message := range sseclient.MessageChannel { fmt.Fprintf(w, "data: %s\n\n", message.Message) @@ -169,10 +168,10 @@ func SSE(c *fiber.Ctx) error { // SSE connection, but only (the last) one is alive, so // dead connections must be closed here. - for id, sseClient := range cache.SSEClients { + for id, sseClient := range cache.GetSSEClients() { if id == message.ClientId { close(sseClient.MessageChannel) - delete(cache.SSEClients, id) + cache.DeleteSSEClient(id) } } }