added sse

main
alex 2023-09-11 23:03:11 +02:00
parent cb5ddf10f9
commit 06fe5a0333
6 changed files with 118 additions and 10 deletions

60
main.go
View File

@ -15,13 +15,20 @@
package main
import (
"bufio"
"fmt"
"jannex/log-manager/modules/cache"
"jannex/log-manager/modules/config"
"jannex/log-manager/modules/structs"
"jannex/log-manager/modules/utils"
"jannex/log-manager/routers/router"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/cors"
"github.com/gofiber/fiber/v2/middleware/logger"
futils "github.com/gofiber/fiber/v2/utils"
"github.com/google/uuid"
"github.com/valyala/fasthttp"
)
func init() {
@ -36,7 +43,7 @@ func main() {
BodyLimit: 100 * 1024 * 1024,
})
app.Use(cors.New())
app.Use(cors.New(cors.Config{}))
app.Use(logger.New(logger.Config{
Format: "${pid} ${locals:requestid} ${status} - ${latency} ${method} ${path}\n",
@ -44,5 +51,56 @@ func main() {
router.SetupRoutes(app)
if config.Cfg.SSEServerEnabled {
app.Get("/v1/log/sse/:logType/:date", func(c *fiber.Ctx) error {
fmt.Println("GET /:logType/:date", c.Params("logType"), c.Params("date"))
logType := futils.CopyString(c.Params("logType"))
date := futils.CopyString(c.Params("date"))
c.Set("Content-Type", "text/event-stream")
c.Set("Cache-Control", "no-cache")
c.Set("Connection", "keep-alive")
c.Set("Transfer-Encoding", "chunked")
c.Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) {
var sseclient structs.SSEClient
sseclient.MessageChannel = make(chan structs.SSEClientChannelMessage)
sseclient.LogType = logType
sseclient.Date = date
clientId := uuid.New()
cache.SSEClients[clientId] = sseclient
fmt.Printf("NEW CLIENT %v", sseclient)
fmt.Printf("%d CLIENTS CONNECTED", len(cache.SSEClients))
for message := range sseclient.MessageChannel {
fmt.Fprintf(w, "data: %s\n\n", message.Message)
err := w.Flush()
if err != nil {
// Refreshing page in web browser will establish a new
// SSE connection, but only (the last) one is alive, so
// dead connections must be closed here.
for id, sseClient := range cache.SSEClients {
if id == message.ClientId {
close(sseClient.MessageChannel)
delete(cache.SSEClients, id)
fmt.Printf("DELETE clientId: %s", id)
}
}
}
}
}))
return nil
})
}
app.Listen(config.Cfg.Host + ":" + config.Cfg.Port)
}

9
modules/cache/cache.go vendored Normal file
View File

@ -0,0 +1,9 @@
package cache
import (
"jannex/log-manager/modules/structs"
"github.com/google/uuid"
)
var SSEClients = make(map[uuid.UUID]structs.SSEClient)

View File

@ -10,9 +10,10 @@ import (
var Cfg Config
type Config struct {
Host string
Port string
LogFolder string
Host string
Port string
LogFolder string
SSEServerEnabled bool
}
func LoadConfig() {
@ -25,9 +26,9 @@ func LoadConfig() {
}
Cfg = Config{
Host: os.Getenv("HOST"),
Port: os.Getenv("PORT"),
LogFolder: os.Getenv("LOG_FOLDER"),
Host: os.Getenv("HOST"),
Port: os.Getenv("PORT"),
LogFolder: os.Getenv("LOG_FOLDER"),
SSEServerEnabled: os.Getenv("SSE_SERVER_ENABLED") == "true",
}
}

View File

@ -2,7 +2,9 @@ package loghandler
import (
"bufio"
"encoding/json"
"fmt"
"jannex/log-manager/modules/cache"
"jannex/log-manager/modules/config"
"jannex/log-manager/modules/structs"
"jannex/log-manager/modules/utils"
@ -33,12 +35,12 @@ func getFileMutex(filePath string) *sync.Mutex {
func AddLog(body structs.LogBody) {
year, month, day := time.Now().Date()
date := strconv.Itoa(day) + "-" + strconv.Itoa(int(month)) + "-" + strconv.Itoa(year)
logFolder := config.Cfg.LogFolder
utils.CreateDirectoryIfNotExists(logFolder + body.Type)
path := logFolder + body.Type + "/" + strconv.Itoa(day) + "-" + strconv.Itoa(int(month)) + "-" + strconv.Itoa(year) + ".log"
path := logFolder + body.Type + "/" + date + ".log"
// get the mutex for this file
mutex := getFileMutex(path)
@ -61,6 +63,24 @@ func AddLog(body structs.LogBody) {
fmt.Println(err)
}
}
for clientId, sseClient := range cache.SSEClients {
if sseClient.LogType == body.Type && sseClient.Date == date {
fmt.Println("Sending message to client", clientId)
marshaledLogs, err := json.Marshal(body.Logs)
if err != nil {
fmt.Println(err)
return
}
sseClient.MessageChannel <- structs.SSEClientChannelMessage{
ClientId: clientId,
Message: marshaledLogs,
}
}
}
}
func GetAvailableLogFiles(logType string) ([]string, error) {
@ -136,5 +156,9 @@ func GetAvailableLogTypes() []string {
availableLogTypes = append(availableLogTypes, file.Name())
}
if len(availableLogTypes) == 0 {
return []string{}
}
return availableLogTypes
}

View File

@ -0,0 +1,14 @@
package structs
import "github.com/google/uuid"
type SSEClient struct {
MessageChannel chan SSEClientChannelMessage
LogType string
Date string
}
type SSEClientChannelMessage struct {
ClientId uuid.UUID
Message []byte
}

2
start.sh Executable file
View File

@ -0,0 +1,2 @@
screen -dmS log-manager | exit 0
screen -S log-manager -p 0 -X stuff 'go run main.go\n'