ServerSentEventsServer/main.go

95 lines
2.3 KiB
Go

package main
import (
"bufio"
"fmt"
"time"
"clickandjoin.app/serversenteventsserver/modules/cache"
"clickandjoin.app/serversenteventsserver/modules/config"
"clickandjoin.app/serversenteventsserver/modules/rabbitmq"
"clickandjoin.app/serversenteventsserver/modules/structs"
gocnjhelper "git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/cors"
"github.com/google/uuid"
"github.com/valyala/fasthttp"
)
func init() {
config.LoadConfig()
cfg := config.Cfg
gocnjhelper.InitLogger(config.Cfg.Debug,
true,
true,
gocnjhelper.GetConnectionString(cfg.RabbitMq.Username, cfg.RabbitMq.Password, cfg.RabbitMq.Host),
cfg.ServiceName,
cfg.ServiceType)
go rabbitmq.Init()
}
func main() {
app := fiber.New()
// wait so that rabbitmq can connect
// TODO: better way to handle this
time.Sleep(500 * time.Millisecond)
// TODO: origin
// CORS for external resources
app.Use(cors.New(cors.Config{
AllowOrigins: "*",
AllowHeaders: "Cache-Control",
AllowCredentials: true,
}))
app.Get("/", func(c *fiber.Ctx) error {
c.Set("Content-Type", "text/event-stream")
c.Set("Cache-Control", "no-cache")
c.Set("Connection", "keep-alive")
c.Set("Transfer-Encoding", "chunked")
c.Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) {
var sseclient structs.SSEClient
sseclient.MessageChan = make(chan structs.SSEClientChanMessage)
clientId := uuid.New()
cache.SSEClients[clientId] = sseclient
gocnjhelper.LogDebugf("NEW CLIENT ID: %s", clientId)
gocnjhelper.LogDebugf("%d CLIENTS CONNECTED", len(cache.SSEClients))
for message := range sseclient.MessageChan {
fmt.Fprintf(w, "data: %s\n\n", message.Message)
err := w.Flush()
if err != nil {
// Refreshing page in web browser will establish a new
// SSE connection, but only (the last) one is alive, so
// dead connections must be closed here.
for id, sseClient := range cache.SSEClients {
if id == message.ClientId {
close(sseClient.MessageChan)
delete(cache.SSEClients, id)
gocnjhelper.LogDebugf("DELETE clientId: %s", id)
}
}
}
}
}))
return nil
})
cfg := config.Cfg
app.Listen(cfg.Host + ":" + cfg.Port)
}