added websocket

alpha
alex 2023-02-18 08:58:58 +01:00
parent 0106ac298b
commit a8d9707870
13 changed files with 337 additions and 32 deletions

103
dist/script.js vendored
View File

@ -5,42 +5,64 @@
const colorClamp = "#34495e"
const colorTime = "#95a5a6"
const serverTypes = [
"API",
"StorageServer"
]
const serverTypesColors = [
"yellow",
"orange"
]
const messageTypes = [
"ERR",
"INFO",
"DEB",
"INFO"
"ERR"
]
const messageTypesColors = [
"#e74c3c",
"#95a5a6",
"#ecf0f1"
]
const messageColors = [
"#e74c3c",
"#95a5a6",
"#ecf0f1"
]
const serviceNames = [
"API",
"MAILER",
"STORAGE",
"SSE",
"WS"
]
const serviceNamesColors = [
"yellow",
"orange",
"blue",
"green",
"purple"
]
const serviceTypes = [
"ALPHA",
"BETA",
"STABLE"
]
const serviceTypesColors = [
"red",
"orange",
"blue"
]
const wsAddress = "ws://localhost:50000/ws?auth=aksmdaksdmaskdm213askm"
/**
* Begin of code
*/
const messages = document.getElementById("messages")
let autoScroll = true;
function addMessage(serverType, messageType, message) {
function addMessage(messageType, serviceType, serviceName, message, timestamp) {
let li = document.createElement("li")
li.innerHTML = formatDate() +
li.innerHTML = formatDate(timestamp) +
formatServiceType(serviceType) +
formatMessageType(messageType) +
formatServerType(serverType) +
formatServiceName(serviceName) +
formatMessage(message, messageType)
messages.appendChild(li)
@ -66,8 +88,9 @@ function clampClosed() {
return "<span style='color: "+colorClamp+"'>]</span> "
}
function formatDate() {
const date = new Date()
function formatDate(timestamp) {
const date = new Date(timestamp * 1000)
console.log(date)
return clampOn() +
"<span style='color: "+colorTime+"'>" +
@ -81,12 +104,16 @@ function formatDate() {
"</span>" + clampClosed()
}
function formatServerType(serverType) {
return clampOn() + "<span style='color: " + serverTypesColors[serverType] + "'>" + serverTypes[serverType] + "</span>" + clampClosed()
function formatServiceType(serviceType) {
return clampOn() + "<span style='color: " + serviceTypesColors[serviceType] + "'>" + serviceTypes[serviceType] + "</span>" + clampClosed()
}
function formatServiceName(serviceName) {
return clampOn() + "<span style='color: " + serviceNamesColors[serviceName] + "'>" + serviceNames[serviceName] + "</span>" + clampClosed()
}
function formatMessage(message, messageType) {
return "<span style='color: "+messageColors[messageType]+"'>"+message+"</span>"
return "<span style='color: " + messageColors[messageType] + "'>" + message + "</span>"
}
function formatMessageType(messageType) {
@ -111,4 +138,42 @@ document.getElementById('btn-toggle-scroll').addEventListener('click', () => {
scrollState.style.color = "#27ae60"
scrollState.innerHTML = "on"
}
})
})
// websocket
let ws = null
window.onload = () => {
connectWS()
}
function connectWS() {
ws = new WebSocket(wsAddress)
ws.onopen = () => {
console.info("ws open")
}
ws.onmessage = (msg) => {
console.log("rec msg:", msg.data)
let data = JSON.parse(msg.data)
console.log("data", data)
addMessage(data["MessageType"], data["ServiceType"], data["ServiceName"], data["Msg"], data["Timestamp"])
}
ws.onclose = (e) => {
console.log("closed", e.reason.code)
if (e.reason.code === 1005) return
console.log("ws closed", e)
setTimeout(() => connectWS(), 100)
}
ws.onerror = (err) => {
console.warn("err:", err)
}
}

View File

@ -2,6 +2,7 @@ DEBUG=false
HOST=127.0.0.1
PORT=8080
MANAGEMENTSYSTEM_API_KEY=test
MANAGEMENTSYSTEM_WEBSOCKET_KEY=test
# ScyllaDB
SCYLLADB_HOST=127.0.0.1

4
go.mod
View File

@ -3,7 +3,7 @@ module clickandjoin.app/managementsystem
go 1.18
require (
git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.43
git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.44
github.com/gocql/gocql v0.0.0-20211015133455-b225f9b53fa1
github.com/gofiber/fiber/v2 v2.42.0
github.com/joho/godotenv v1.5.0
@ -12,6 +12,8 @@ require (
require (
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/fasthttp/websocket v1.5.1 // indirect
github.com/gofiber/websocket/v2 v2.1.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect

6
go.sum
View File

@ -8,6 +8,8 @@ git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.42 h1:eiZ3QgIEYjziYE
git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.42/go.mod h1:Jzc4/4ntrOLMOZYnUjkr1uBCVtRvPbEbQD+8kwBOdf4=
git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.43 h1:5CtlOuz7EWOOYU9SyI7tSYrFpNHm4zmwR+tQ2KhH4rw=
git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.43/go.mod h1:Jzc4/4ntrOLMOZYnUjkr1uBCVtRvPbEbQD+8kwBOdf4=
git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.44 h1:eS+W8qa+6VFk1X/Nn6JNzSYn4vGfAFymye2G3P+CLTQ=
git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.44/go.mod h1:Jzc4/4ntrOLMOZYnUjkr1uBCVtRvPbEbQD+8kwBOdf4=
github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY=
github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY=
@ -17,11 +19,15 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dR
github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fasthttp/websocket v1.5.1 h1:iZsMv5OtZ1E52hhCnlOm/feLCrPhutlrZgvEGcZa1FM=
github.com/fasthttp/websocket v1.5.1/go.mod h1:s+gJkEn38QXLkNfOe/n75Yb8we+VEho1vYqeUYheomw=
github.com/gocql/gocql v0.0.0-20211015133455-b225f9b53fa1 h1:px9qUCy/RNJNsfCam4m2IxWGxNuimkrioEF0vrrbPsg=
github.com/gocql/gocql v0.0.0-20211015133455-b225f9b53fa1/go.mod h1:3gM2c4D3AnkISwBxGnMMsS8Oy4y2lhbPRsH4xnJrHG8=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofiber/fiber/v2 v2.42.0 h1:Fnp7ybWvS+sjNQsFvkhf4G8OhXswvB6Vee8hM/LyS+8=
github.com/gofiber/fiber/v2 v2.42.0/go.mod h1:3+SGNjqMh5VQH5Vz2Wdi43zTIV16ktlFd3x3R6O1Zlc=
github.com/gofiber/websocket/v2 v2.1.4 h1:Ki6L7auleAwgi7iRmtUiWKltlbmtkCJ0COtK1nt8L3g=
github.com/gofiber/websocket/v2 v2.1.4/go.mod h1:IC4ZUejlk0kJSaphJ1gjqgKfK9fhw8eoAr3/UdbOzEA=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=

25
main.go
View File

@ -19,15 +19,17 @@ import (
"clickandjoin.app/managementsystem/modules/rabbitmq"
"clickandjoin.app/managementsystem/modules/scylladb"
"clickandjoin.app/managementsystem/routers/router"
"clickandjoin.app/managementsystem/socketserver"
gocnjhelper "git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/logger"
"github.com/gofiber/websocket/v2"
)
func init() {
config.LoadConfig()
gocnjhelper.InitLogger(config.Cfg.Debug, true, false, "", "", "")
gocnjhelper.InitLogger(config.Cfg.Debug, true, false, "", 0, 0)
scylladb.InitDatabase()
@ -49,6 +51,27 @@ func main() {
}))
}
app.Use("/ws", func(c *fiber.Ctx) error {
// IsWebSocketUpgrade returns true if the client
// requested upgrade to the WebSocket protocol.
if websocket.IsWebSocketUpgrade(c) {
authKey := c.Query("auth")
// no auth query available
if authKey != cfg.ManagementSystemWebSocketKey {
return c.SendStatus(fiber.StatusUnauthorized)
}
return c.Next()
}
return fiber.ErrUpgradeRequired
})
go socketserver.RunHub()
socketserver.WebSocketServer(app)
router.SetupRoutes(app)
app.Listen(cfg.Host + ":" + cfg.Port)

View File

@ -1,3 +1,38 @@
package cache
var WebSocketSessions = make(map[string][]string)
import (
"sync"
"clickandjoin.app/managementsystem/modules/structs"
)
var socketClients = make(map[string]*structs.SocketClient)
var mu sync.RWMutex
func AddSocketClient(clientId string, socketClient *structs.SocketClient) {
mu.Lock()
socketClients[clientId] = socketClient
mu.Unlock()
}
func DeleteClient(clientId string) {
mu.Lock()
delete(socketClients, clientId)
mu.Unlock()
}
func GetSocketClients() map[string]*structs.SocketClient {
mu.RLock()
defer mu.RUnlock()
return socketClients
}
func GetSocketClient(clientId string) (socketClient *structs.SocketClient, ok bool) {
mu.RLock()
defer mu.RUnlock()
client, ok := socketClients[clientId]
return client, ok
}

View File

@ -11,12 +11,13 @@ import (
var Cfg Config
type Config struct {
Debug bool
Host string
Port string
ManagementSystemApiKey string
ScyllaDB ScyllaDB
RabbitMq RabbitMq
Debug bool
Host string
Port string
ManagementSystemApiKey string
ManagementSystemWebSocketKey string
ScyllaDB ScyllaDB
RabbitMq RabbitMq
}
type ScyllaDB struct {
@ -49,10 +50,11 @@ func LoadConfig() {
}
cfg := Config{
Debug: debug,
Host: os.Getenv("HOST"),
Port: os.Getenv("PORT"),
ManagementSystemApiKey: os.Getenv("MANAGEMENTSYSTEM_API_KEY"),
Debug: debug,
Host: os.Getenv("HOST"),
Port: os.Getenv("PORT"),
ManagementSystemApiKey: os.Getenv("MANAGEMENTSYSTEM_API_KEY"),
ManagementSystemWebSocketKey: os.Getenv("MANAGEMENTSYSTEM_WEBSOCKET_KEY"),
ScyllaDB: ScyllaDB{
Host: os.Getenv("SCYLLADB_HOST"),
Username: os.Getenv("SCYLLADB_USERNAME"),

View File

@ -3,6 +3,7 @@ package rabbitmq
import (
"encoding/json"
"clickandjoin.app/managementsystem/socketclients"
gocnjhelper "git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/rs/zerolog/log"
@ -51,6 +52,8 @@ func LogsMessagesHandling() {
log.Info().Msgf("msg", logMessage)
socketclients.BroadcastLogMessage(logMessage)
msg.Ack(false)
}
}

View File

@ -0,0 +1,50 @@
package structs
import (
"encoding/json"
"errors"
"sync"
gocnjhelper "git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper"
"github.com/gofiber/websocket/v2"
)
type SocketClient struct {
ClientId string
Conn *websocket.Conn
connMu sync.Mutex
}
type SocketMessage struct {
Conn *websocket.Conn
Msg []byte
}
func (socketClient *SocketClient) SendMessage(message gocnjhelper.RabbitMqLogMessage) error {
var marshaledMessage []byte
var err error
marshaledMessage, err = json.Marshal(message)
if err != nil {
gocnjhelper.LogErrorf("Failed to marshal ws message, err: %s", err)
return err
}
socketClient.connMu.Lock()
defer socketClient.connMu.Unlock()
if socketClient.Conn == nil {
gocnjhelper.LogError("Failed to ws message because conn is nil")
return errors.New("ws client conn is nil")
}
err = socketClient.Conn.WriteMessage(websocket.TextMessage, marshaledMessage)
if err != nil {
gocnjhelper.LogErrorf("Failed to write ws message, err: %s", err)
return err
}
return nil
}

30
modules/utils/utils.go Normal file
View File

@ -0,0 +1,30 @@
package utils
import (
"encoding/json"
gocnjhelper "git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper"
)
func MarshalMessage(message any) (marshaledMessage []byte, err error) {
marshaledMessage, err = json.Marshal(message)
if err != nil {
gocnjhelper.LogErrorf("Failed to marshal send message, err: %s", err)
return nil, err
}
return marshaledMessage, nil
}
func UnmarshalReceivedMessage(body []byte, message any) error {
gocnjhelper.LogDebugf("UnmarshalReceivedMessage %s", string(body))
err := json.Unmarshal(body, &message)
if err != nil {
gocnjhelper.LogDebugf("Failed to unmarshal received message, err: %s", err)
return err
}
return nil
}

View File

@ -0,0 +1,12 @@
package socketclients
import (
"clickandjoin.app/managementsystem/modules/cache"
gocnjhelper "git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper"
)
func BroadcastLogMessage(rabbitMqLogMessage gocnjhelper.RabbitMqLogMessage) {
for _, client := range cache.GetSocketClients() {
client.SendMessage(rabbitMqLogMessage)
}
}

39
socketserver/hub.go Normal file
View File

@ -0,0 +1,39 @@
package socketserver
import (
"clickandjoin.app/managementsystem/modules/cache"
"clickandjoin.app/managementsystem/modules/structs"
gocnjhelper "git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper"
"github.com/gofiber/websocket/v2"
"github.com/google/uuid"
)
var register = make(chan *structs.SocketClient)
var broadcast = make(chan structs.SocketMessage)
var unregister = make(chan *websocket.Conn)
func RunHub() {
for {
select {
case newSocketClient := <-register:
newSocketClient.ClientId = uuid.New().String()
gocnjhelper.LogDebugf("clients: %d %s", len(cache.GetSocketClients()), cache.GetSocketClients())
gocnjhelper.LogDebugf("REGISTER CLIENT: %s", newSocketClient.ClientId)
cache.AddSocketClient(newSocketClient.ClientId, newSocketClient)
case data := <-broadcast:
gocnjhelper.LogDebugf("RECEIVED WEBSOCKET MESSAGE: %s", data.Msg)
case connection := <-unregister:
for id, client := range cache.GetSocketClients() {
if connection == client.Conn {
gocnjhelper.LogDebugf("UNREGISTER CLIENT: %s", id)
cache.DeleteClient(id)
}
}
}
}
}

37
socketserver/server.go Normal file
View File

@ -0,0 +1,37 @@
package socketserver
import (
"clickandjoin.app/managementsystem/modules/structs"
gocnjhelper "git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/websocket/v2"
)
func WebSocketServer(app *fiber.App) {
app.Get("/ws", websocket.New(func(c *websocket.Conn) {
defer func() {
unregister <- c
c.Close()
}()
register <- &structs.SocketClient{Conn: c}
for {
messageType, msg, err := c.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
gocnjhelper.LogErrorf("Read err: %s", err)
}
return
}
if messageType == websocket.TextMessage {
broadcast <- structs.SocketMessage{Conn: c, Msg: msg}
} else {
gocnjhelper.LogDebugf("websocket message received of type %s", messageType)
}
}
}))
}