diff --git a/dist/script.js b/dist/script.js index 6487718..0371534 100644 --- a/dist/script.js +++ b/dist/script.js @@ -5,42 +5,64 @@ const colorClamp = "#34495e" const colorTime = "#95a5a6" -const serverTypes = [ - "API", - "StorageServer" -] -const serverTypesColors = [ - "yellow", - "orange" -] const messageTypes = [ - "ERR", + "INFO", "DEB", - "INFO" + "ERR" ] const messageTypesColors = [ "#e74c3c", "#95a5a6", "#ecf0f1" ] + const messageColors = [ "#e74c3c", "#95a5a6", "#ecf0f1" ] +const serviceNames = [ + "API", + "MAILER", + "STORAGE", + "SSE", + "WS" +] +const serviceNamesColors = [ + "yellow", + "orange", + "blue", + "green", + "purple" +] + +const serviceTypes = [ + "ALPHA", + "BETA", + "STABLE" +] +const serviceTypesColors = [ + "red", + "orange", + "blue" +] + +const wsAddress = "ws://localhost:50000/ws?auth=aksmdaksdmaskdm213askm" + /** * Begin of code */ const messages = document.getElementById("messages") let autoScroll = true; -function addMessage(serverType, messageType, message) { +function addMessage(messageType, serviceType, serviceName, message, timestamp) { let li = document.createElement("li") - li.innerHTML = formatDate() + + li.innerHTML = formatDate(timestamp) + + formatServiceType(serviceType) + formatMessageType(messageType) + - formatServerType(serverType) + + formatServiceName(serviceName) + formatMessage(message, messageType) messages.appendChild(li) @@ -66,8 +88,9 @@ function clampClosed() { return "] " } -function formatDate() { - const date = new Date() +function formatDate(timestamp) { + const date = new Date(timestamp * 1000) + console.log(date) return clampOn() + "" + @@ -81,12 +104,16 @@ function formatDate() { "" + clampClosed() } -function formatServerType(serverType) { - return clampOn() + "" + serverTypes[serverType] + "" + clampClosed() +function formatServiceType(serviceType) { + return clampOn() + "" + serviceTypes[serviceType] + "" + clampClosed() +} + +function formatServiceName(serviceName) { + return clampOn() + "" + serviceNames[serviceName] + "" + clampClosed() } function formatMessage(message, messageType) { - return ""+message+"" + return "" + message + "" } function formatMessageType(messageType) { @@ -111,4 +138,42 @@ document.getElementById('btn-toggle-scroll').addEventListener('click', () => { scrollState.style.color = "#27ae60" scrollState.innerHTML = "on" } -}) \ No newline at end of file +}) + +// websocket + +let ws = null + +window.onload = () => { + connectWS() +} + +function connectWS() { + ws = new WebSocket(wsAddress) + + ws.onopen = () => { + console.info("ws open") + } + + ws.onmessage = (msg) => { + console.log("rec msg:", msg.data) + + let data = JSON.parse(msg.data) + + console.log("data", data) + + addMessage(data["MessageType"], data["ServiceType"], data["ServiceName"], data["Msg"], data["Timestamp"]) + } + + ws.onclose = (e) => { + console.log("closed", e.reason.code) + if (e.reason.code === 1005) return + + console.log("ws closed", e) + setTimeout(() => connectWS(), 100) + } + + ws.onerror = (err) => { + console.warn("err:", err) + } +} \ No newline at end of file diff --git a/example.env b/example.env index f103247..888e2e9 100644 --- a/example.env +++ b/example.env @@ -2,6 +2,7 @@ DEBUG=false HOST=127.0.0.1 PORT=8080 MANAGEMENTSYSTEM_API_KEY=test +MANAGEMENTSYSTEM_WEBSOCKET_KEY=test # ScyllaDB SCYLLADB_HOST=127.0.0.1 diff --git a/go.mod b/go.mod index a4e99b1..92cb2f4 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module clickandjoin.app/managementsystem go 1.18 require ( - git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.43 + git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.44 github.com/gocql/gocql v0.0.0-20211015133455-b225f9b53fa1 github.com/gofiber/fiber/v2 v2.42.0 github.com/joho/godotenv v1.5.0 @@ -12,6 +12,8 @@ require ( require ( github.com/andybalholm/brotli v1.0.4 // indirect + github.com/fasthttp/websocket v1.5.1 // indirect + github.com/gofiber/websocket/v2 v2.1.4 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/uuid v1.3.0 // indirect github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect diff --git a/go.sum b/go.sum index 11ad322..a3bdadb 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,8 @@ git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.42 h1:eiZ3QgIEYjziYE git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.42/go.mod h1:Jzc4/4ntrOLMOZYnUjkr1uBCVtRvPbEbQD+8kwBOdf4= git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.43 h1:5CtlOuz7EWOOYU9SyI7tSYrFpNHm4zmwR+tQ2KhH4rw= git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.43/go.mod h1:Jzc4/4ntrOLMOZYnUjkr1uBCVtRvPbEbQD+8kwBOdf4= +git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.44 h1:eS+W8qa+6VFk1X/Nn6JNzSYn4vGfAFymye2G3P+CLTQ= +git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.44/go.mod h1:Jzc4/4ntrOLMOZYnUjkr1uBCVtRvPbEbQD+8kwBOdf4= github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY= @@ -17,11 +19,15 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dR github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fasthttp/websocket v1.5.1 h1:iZsMv5OtZ1E52hhCnlOm/feLCrPhutlrZgvEGcZa1FM= +github.com/fasthttp/websocket v1.5.1/go.mod h1:s+gJkEn38QXLkNfOe/n75Yb8we+VEho1vYqeUYheomw= github.com/gocql/gocql v0.0.0-20211015133455-b225f9b53fa1 h1:px9qUCy/RNJNsfCam4m2IxWGxNuimkrioEF0vrrbPsg= github.com/gocql/gocql v0.0.0-20211015133455-b225f9b53fa1/go.mod h1:3gM2c4D3AnkISwBxGnMMsS8Oy4y2lhbPRsH4xnJrHG8= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gofiber/fiber/v2 v2.42.0 h1:Fnp7ybWvS+sjNQsFvkhf4G8OhXswvB6Vee8hM/LyS+8= github.com/gofiber/fiber/v2 v2.42.0/go.mod h1:3+SGNjqMh5VQH5Vz2Wdi43zTIV16ktlFd3x3R6O1Zlc= +github.com/gofiber/websocket/v2 v2.1.4 h1:Ki6L7auleAwgi7iRmtUiWKltlbmtkCJ0COtK1nt8L3g= +github.com/gofiber/websocket/v2 v2.1.4/go.mod h1:IC4ZUejlk0kJSaphJ1gjqgKfK9fhw8eoAr3/UdbOzEA= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= diff --git a/main.go b/main.go index 2b6cac9..5e955c0 100644 --- a/main.go +++ b/main.go @@ -19,15 +19,17 @@ import ( "clickandjoin.app/managementsystem/modules/rabbitmq" "clickandjoin.app/managementsystem/modules/scylladb" "clickandjoin.app/managementsystem/routers/router" + "clickandjoin.app/managementsystem/socketserver" gocnjhelper "git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper" "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/middleware/logger" + "github.com/gofiber/websocket/v2" ) func init() { config.LoadConfig() - gocnjhelper.InitLogger(config.Cfg.Debug, true, false, "", "", "") + gocnjhelper.InitLogger(config.Cfg.Debug, true, false, "", 0, 0) scylladb.InitDatabase() @@ -49,6 +51,27 @@ func main() { })) } + app.Use("/ws", func(c *fiber.Ctx) error { + // IsWebSocketUpgrade returns true if the client + // requested upgrade to the WebSocket protocol. + if websocket.IsWebSocketUpgrade(c) { + authKey := c.Query("auth") + + // no auth query available + if authKey != cfg.ManagementSystemWebSocketKey { + return c.SendStatus(fiber.StatusUnauthorized) + } + + return c.Next() + } + + return fiber.ErrUpgradeRequired + }) + + go socketserver.RunHub() + + socketserver.WebSocketServer(app) + router.SetupRoutes(app) app.Listen(cfg.Host + ":" + cfg.Port) diff --git a/modules/cache/cache.go b/modules/cache/cache.go index a559ed9..903ba9d 100644 --- a/modules/cache/cache.go +++ b/modules/cache/cache.go @@ -1,3 +1,38 @@ package cache -var WebSocketSessions = make(map[string][]string) +import ( + "sync" + + "clickandjoin.app/managementsystem/modules/structs" +) + +var socketClients = make(map[string]*structs.SocketClient) +var mu sync.RWMutex + +func AddSocketClient(clientId string, socketClient *structs.SocketClient) { + mu.Lock() + socketClients[clientId] = socketClient + mu.Unlock() +} + +func DeleteClient(clientId string) { + mu.Lock() + delete(socketClients, clientId) + mu.Unlock() +} + +func GetSocketClients() map[string]*structs.SocketClient { + mu.RLock() + defer mu.RUnlock() + + return socketClients +} + +func GetSocketClient(clientId string) (socketClient *structs.SocketClient, ok bool) { + mu.RLock() + defer mu.RUnlock() + + client, ok := socketClients[clientId] + + return client, ok +} diff --git a/modules/config/config.go b/modules/config/config.go index ee2d07a..21d9ba7 100644 --- a/modules/config/config.go +++ b/modules/config/config.go @@ -11,12 +11,13 @@ import ( var Cfg Config type Config struct { - Debug bool - Host string - Port string - ManagementSystemApiKey string - ScyllaDB ScyllaDB - RabbitMq RabbitMq + Debug bool + Host string + Port string + ManagementSystemApiKey string + ManagementSystemWebSocketKey string + ScyllaDB ScyllaDB + RabbitMq RabbitMq } type ScyllaDB struct { @@ -49,10 +50,11 @@ func LoadConfig() { } cfg := Config{ - Debug: debug, - Host: os.Getenv("HOST"), - Port: os.Getenv("PORT"), - ManagementSystemApiKey: os.Getenv("MANAGEMENTSYSTEM_API_KEY"), + Debug: debug, + Host: os.Getenv("HOST"), + Port: os.Getenv("PORT"), + ManagementSystemApiKey: os.Getenv("MANAGEMENTSYSTEM_API_KEY"), + ManagementSystemWebSocketKey: os.Getenv("MANAGEMENTSYSTEM_WEBSOCKET_KEY"), ScyllaDB: ScyllaDB{ Host: os.Getenv("SCYLLADB_HOST"), Username: os.Getenv("SCYLLADB_USERNAME"), diff --git a/modules/rabbitmq/helper.go b/modules/rabbitmq/helper.go index 04ead68..ed597e0 100644 --- a/modules/rabbitmq/helper.go +++ b/modules/rabbitmq/helper.go @@ -3,6 +3,7 @@ package rabbitmq import ( "encoding/json" + "clickandjoin.app/managementsystem/socketclients" gocnjhelper "git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper" amqp "github.com/rabbitmq/amqp091-go" "github.com/rs/zerolog/log" @@ -51,6 +52,8 @@ func LogsMessagesHandling() { log.Info().Msgf("msg", logMessage) + socketclients.BroadcastLogMessage(logMessage) + msg.Ack(false) } } diff --git a/modules/structs/socketclient.go b/modules/structs/socketclient.go new file mode 100644 index 0000000..42dd49b --- /dev/null +++ b/modules/structs/socketclient.go @@ -0,0 +1,50 @@ +package structs + +import ( + "encoding/json" + "errors" + "sync" + + gocnjhelper "git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper" + "github.com/gofiber/websocket/v2" +) + +type SocketClient struct { + ClientId string + Conn *websocket.Conn + connMu sync.Mutex +} + +type SocketMessage struct { + Conn *websocket.Conn + Msg []byte +} + +func (socketClient *SocketClient) SendMessage(message gocnjhelper.RabbitMqLogMessage) error { + var marshaledMessage []byte + var err error + + marshaledMessage, err = json.Marshal(message) + + if err != nil { + gocnjhelper.LogErrorf("Failed to marshal ws message, err: %s", err) + return err + } + + socketClient.connMu.Lock() + defer socketClient.connMu.Unlock() + + if socketClient.Conn == nil { + gocnjhelper.LogError("Failed to ws message because conn is nil") + return errors.New("ws client conn is nil") + } + + err = socketClient.Conn.WriteMessage(websocket.TextMessage, marshaledMessage) + + if err != nil { + gocnjhelper.LogErrorf("Failed to write ws message, err: %s", err) + return err + } + + return nil +} diff --git a/modules/utils/utils.go b/modules/utils/utils.go new file mode 100644 index 0000000..ec61898 --- /dev/null +++ b/modules/utils/utils.go @@ -0,0 +1,30 @@ +package utils + +import ( + "encoding/json" + + gocnjhelper "git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper" +) + +func MarshalMessage(message any) (marshaledMessage []byte, err error) { + marshaledMessage, err = json.Marshal(message) + + if err != nil { + gocnjhelper.LogErrorf("Failed to marshal send message, err: %s", err) + return nil, err + } + + return marshaledMessage, nil +} + +func UnmarshalReceivedMessage(body []byte, message any) error { + gocnjhelper.LogDebugf("UnmarshalReceivedMessage %s", string(body)) + err := json.Unmarshal(body, &message) + + if err != nil { + gocnjhelper.LogDebugf("Failed to unmarshal received message, err: %s", err) + return err + } + + return nil +} diff --git a/socketclients/socketclients.go b/socketclients/socketclients.go new file mode 100644 index 0000000..94858a9 --- /dev/null +++ b/socketclients/socketclients.go @@ -0,0 +1,12 @@ +package socketclients + +import ( + "clickandjoin.app/managementsystem/modules/cache" + gocnjhelper "git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper" +) + +func BroadcastLogMessage(rabbitMqLogMessage gocnjhelper.RabbitMqLogMessage) { + for _, client := range cache.GetSocketClients() { + client.SendMessage(rabbitMqLogMessage) + } +} diff --git a/socketserver/hub.go b/socketserver/hub.go new file mode 100644 index 0000000..3cf421e --- /dev/null +++ b/socketserver/hub.go @@ -0,0 +1,39 @@ +package socketserver + +import ( + "clickandjoin.app/managementsystem/modules/cache" + "clickandjoin.app/managementsystem/modules/structs" + gocnjhelper "git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper" + "github.com/gofiber/websocket/v2" + "github.com/google/uuid" +) + +var register = make(chan *structs.SocketClient) +var broadcast = make(chan structs.SocketMessage) +var unregister = make(chan *websocket.Conn) + +func RunHub() { + for { + select { + case newSocketClient := <-register: + newSocketClient.ClientId = uuid.New().String() + + gocnjhelper.LogDebugf("clients: %d %s", len(cache.GetSocketClients()), cache.GetSocketClients()) + + gocnjhelper.LogDebugf("REGISTER CLIENT: %s", newSocketClient.ClientId) + + cache.AddSocketClient(newSocketClient.ClientId, newSocketClient) + + case data := <-broadcast: + gocnjhelper.LogDebugf("RECEIVED WEBSOCKET MESSAGE: %s", data.Msg) + + case connection := <-unregister: + for id, client := range cache.GetSocketClients() { + if connection == client.Conn { + gocnjhelper.LogDebugf("UNREGISTER CLIENT: %s", id) + cache.DeleteClient(id) + } + } + } + } +} diff --git a/socketserver/server.go b/socketserver/server.go new file mode 100644 index 0000000..a6d3ea6 --- /dev/null +++ b/socketserver/server.go @@ -0,0 +1,37 @@ +package socketserver + +import ( + "clickandjoin.app/managementsystem/modules/structs" + gocnjhelper "git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper" + "github.com/gofiber/fiber/v2" + "github.com/gofiber/websocket/v2" +) + +func WebSocketServer(app *fiber.App) { + app.Get("/ws", websocket.New(func(c *websocket.Conn) { + defer func() { + unregister <- c + c.Close() + }() + + register <- &structs.SocketClient{Conn: c} + + for { + messageType, msg, err := c.ReadMessage() + + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + gocnjhelper.LogErrorf("Read err: %s", err) + } + + return + } + + if messageType == websocket.TextMessage { + broadcast <- structs.SocketMessage{Conn: c, Msg: msg} + } else { + gocnjhelper.LogDebugf("websocket message received of type %s", messageType) + } + } + })) +}