moved sse

main
alex 2023-10-13 16:47:12 +02:00
parent b2bf49719d
commit a11fc2cd5c
4 changed files with 54 additions and 60 deletions

58
main.go
View File

@ -15,21 +15,14 @@
package main
import (
"bufio"
"fmt"
"jannex/log-manager/modules/cache"
"jannex/log-manager/modules/config"
"jannex/log-manager/modules/loghandler"
"jannex/log-manager/modules/structs"
"jannex/log-manager/modules/utils"
"jannex/log-manager/routers/router"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/cors"
"github.com/gofiber/fiber/v2/middleware/logger"
futils "github.com/gofiber/fiber/v2/utils"
"github.com/google/uuid"
"github.com/valyala/fasthttp"
)
func init() {
@ -58,56 +51,5 @@ func main() {
router.SetupRoutes(app)
if config.Cfg.SSEServerEnabled {
app.Get("/v1/log/sse/:logType/:date", func(c *fiber.Ctx) error {
fmt.Println("GET /:logType/:date", c.Params("logType"), c.Params("date"))
logType := futils.CopyString(c.Params("logType"))
date := futils.CopyString(c.Params("date"))
c.Set("Content-Type", "text/event-stream")
c.Set("Cache-Control", "no-cache")
c.Set("Connection", "keep-alive")
c.Set("Transfer-Encoding", "chunked")
c.Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) {
var sseclient structs.SSEClient
sseclient.MessageChannel = make(chan structs.SSEClientChannelMessage)
sseclient.LogType = logType
sseclient.Date = date
clientId := uuid.New()
cache.SSEClients[clientId] = sseclient
fmt.Printf("NEW CLIENT %v", sseclient)
fmt.Printf("%d CLIENTS CONNECTED", len(cache.SSEClients))
for message := range sseclient.MessageChannel {
fmt.Fprintf(w, "data: %s\n\n", message.Message)
err := w.Flush()
if err != nil {
// Refreshing page in web browser will establish a new
// SSE connection, but only (the last) one is alive, so
// dead connections must be closed here.
for id, sseClient := range cache.SSEClients {
if id == message.ClientId {
close(sseClient.MessageChannel)
delete(cache.SSEClients, id)
fmt.Printf("DELETE clientId: %s", id)
}
}
}
}
}))
return nil
})
}
app.Listen(config.Cfg.Host + ":" + config.Cfg.Port)
}

View File

@ -67,8 +67,6 @@ func AddLog(body structs.LogBody) {
for clientId, sseClient := range cache.SSEClients {
if sseClient.LogType == body.Type && sseClient.Date == date {
fmt.Println("Sending message to client", clientId)
marshaledLogs, err := json.Marshal(body.Logs)
if err != nil {

View File

@ -1,12 +1,18 @@
package log
import (
"bufio"
"fmt"
"jannex/log-manager/modules/cache"
"jannex/log-manager/modules/config"
"jannex/log-manager/modules/loghandler"
"jannex/log-manager/modules/structs"
"jannex/log-manager/modules/utils"
"github.com/gofiber/fiber/v2"
futils "github.com/gofiber/fiber/v2/utils"
"github.com/google/uuid"
"github.com/valyala/fasthttp"
)
func AddLog(c *fiber.Ctx) error {
@ -128,3 +134,50 @@ func GetAvailableLogTypes(c *fiber.Ctx) error {
return c.JSON(logTypes)
}
func SSE(c *fiber.Ctx) error {
if !config.Cfg.SSEServerEnabled {
return c.SendStatus(fiber.StatusNotFound)
}
logType := futils.CopyString(c.Params("logType"))
date := futils.CopyString(c.Params("date"))
c.Set("Content-Type", "text/event-stream")
c.Set("Cache-Control", "no-cache")
c.Set("Connection", "keep-alive")
c.Set("Transfer-Encoding", "chunked")
c.Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) {
var sseclient structs.SSEClient
sseclient.MessageChannel = make(chan structs.SSEClientChannelMessage)
sseclient.LogType = logType
sseclient.Date = date
clientId := uuid.New()
cache.SSEClients[clientId] = sseclient
for message := range sseclient.MessageChannel {
fmt.Fprintf(w, "data: %s\n\n", message.Message)
err := w.Flush()
if err != nil {
// Refreshing page in web browser will establish a new
// SSE connection, but only (the last) one is alive, so
// dead connections must be closed here.
for id, sseClient := range cache.SSEClients {
if id == message.ClientId {
close(sseClient.MessageChannel)
delete(cache.SSEClients, id)
}
}
}
}
}))
return nil
}

View File

@ -12,6 +12,7 @@ func SetupRoutes(app *fiber.App) {
l := v1.Group("/log")
l.Post("/", log.AddLog)
l.Get("/types", log.GetAvailableLogTypes)
l.Get("/sse/:logType/:date", log.SSE)
ls := v1.Group("/logs")
ls.Get("/:type", log.GetLog)