added sync mutex for sse clients
parent
438b4bee01
commit
684dab3a56
|
@ -0,0 +1,7 @@
|
||||||
|
git add *
|
||||||
|
|
||||||
|
read -p "Commit message: " commit_message
|
||||||
|
|
||||||
|
git commit -m "$commit_message"
|
||||||
|
|
||||||
|
git push -u origin main
|
|
@ -2,8 +2,31 @@ package cache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"jannex/log-manager/modules/structs"
|
"jannex/log-manager/modules/structs"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
var SSEClients = make(map[uuid.UUID]structs.SSEClient)
|
var sseClients = make(map[uuid.UUID]structs.SSEClient)
|
||||||
|
var sMu sync.RWMutex
|
||||||
|
|
||||||
|
func AddSSEClient(client structs.SSEClient) {
|
||||||
|
sMu.Lock()
|
||||||
|
defer sMu.Unlock()
|
||||||
|
|
||||||
|
sseClients[uuid.New()] = client
|
||||||
|
}
|
||||||
|
|
||||||
|
func DeleteSSEClient(id uuid.UUID) {
|
||||||
|
sMu.Lock()
|
||||||
|
defer sMu.Unlock()
|
||||||
|
|
||||||
|
delete(sseClients, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetSSEClients() map[uuid.UUID]structs.SSEClient {
|
||||||
|
sMu.RLock()
|
||||||
|
defer sMu.RUnlock()
|
||||||
|
|
||||||
|
return sseClients
|
||||||
|
}
|
||||||
|
|
|
@ -72,7 +72,7 @@ func AddLog(body structs.LogBody) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for clientId, sseClient := range cache.SSEClients {
|
for clientId, sseClient := range cache.GetSSEClients() {
|
||||||
if sseClient.LogType == body.Type && sseClient.Date == date {
|
if sseClient.LogType == body.Type && sseClient.Date == date {
|
||||||
sseClient.MessageChannel <- structs.SSEClientChannelMessage{
|
sseClient.MessageChannel <- structs.SSEClientChannelMessage{
|
||||||
ClientId: clientId,
|
ClientId: clientId,
|
||||||
|
|
|
@ -11,7 +11,6 @@ import (
|
||||||
|
|
||||||
"github.com/gofiber/fiber/v2"
|
"github.com/gofiber/fiber/v2"
|
||||||
futils "github.com/gofiber/fiber/v2/utils"
|
futils "github.com/gofiber/fiber/v2/utils"
|
||||||
"github.com/google/uuid"
|
|
||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -155,9 +154,9 @@ func SSE(c *fiber.Ctx) error {
|
||||||
sseclient.LogType = logType
|
sseclient.LogType = logType
|
||||||
sseclient.Date = date
|
sseclient.Date = date
|
||||||
|
|
||||||
clientId := uuid.New()
|
//cache.SSEClients[clientId] = sseclient
|
||||||
|
|
||||||
cache.SSEClients[clientId] = sseclient
|
cache.AddSSEClient(sseclient)
|
||||||
|
|
||||||
for message := range sseclient.MessageChannel {
|
for message := range sseclient.MessageChannel {
|
||||||
fmt.Fprintf(w, "data: %s\n\n", message.Message)
|
fmt.Fprintf(w, "data: %s\n\n", message.Message)
|
||||||
|
@ -169,10 +168,10 @@ func SSE(c *fiber.Ctx) error {
|
||||||
// SSE connection, but only (the last) one is alive, so
|
// SSE connection, but only (the last) one is alive, so
|
||||||
// dead connections must be closed here.
|
// dead connections must be closed here.
|
||||||
|
|
||||||
for id, sseClient := range cache.SSEClients {
|
for id, sseClient := range cache.GetSSEClients() {
|
||||||
if id == message.ClientId {
|
if id == message.ClientId {
|
||||||
close(sseClient.MessageChannel)
|
close(sseClient.MessageChannel)
|
||||||
delete(cache.SSEClients, id)
|
cache.DeleteSSEClient(id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue