diff --git a/go.mod b/go.mod index 6f9ad2e..aba4a3f 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/fasthttp/websocket v1.5.0 // indirect github.com/gofiber/fiber/v2 v2.40.1 // indirect github.com/gofiber/websocket/v2 v2.1.2 // indirect + github.com/google/uuid v1.3.0 // indirect github.com/klauspost/compress v1.15.12 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.16 // indirect diff --git a/go.sum b/go.sum index 468ff71..da60851 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,7 @@ github.com/gofiber/fiber/v2 v2.40.1 h1:pc7n9VVpGIqNsvg9IPLQhyFEMJL8gCs1kneH5D1pI github.com/gofiber/fiber/v2 v2.40.1/go.mod h1:Gko04sLksnHbzLSRBFWPFdzM9Ws9pRxvvIaohJK1dsk= github.com/gofiber/websocket/v2 v2.1.2 h1:EulKyLB/fJgui5+6c8irwEnYQ9FRsrLZfkrq9OfTDGc= github.com/gofiber/websocket/v2 v2.1.2/go.mod h1:S+sKWo0xeC7Wnz5h4/8f6D/NxsrLFIdWDYB3SyVO9pE= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/klauspost/compress v1.14.1/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= diff --git a/main.go b/main.go index 003a670..eedc40a 100644 --- a/main.go +++ b/main.go @@ -1,7 +1,6 @@ package main import ( - "log" "os" "clickandjoin.app/websocketserver/modules/config" @@ -43,7 +42,7 @@ func main() { socketserver.WebSocketServer(app) if len(os.Args) < 2 { - log.Fatalln("Please specify port") + logrus.Fatalln("Please specify port") } app.Listen("127.0.0.1:" + os.Args[1]) diff --git a/modules/cache/cache.go b/modules/cache/cache.go index b6431f0..100cfcd 100644 --- a/modules/cache/cache.go +++ b/modules/cache/cache.go @@ -2,4 +2,4 @@ package cache import "clickandjoin.app/websocketserver/modules/structs" -var WebSocketClients = make(map[string]*structs.SocketClient) +var SocketClients = make(map[string]*structs.SocketClient) diff --git a/modules/rabbitmq/rabbitmq.go b/modules/rabbitmq/rabbitmq.go index 264088b..d623212 100644 --- a/modules/rabbitmq/rabbitmq.go +++ b/modules/rabbitmq/rabbitmq.go @@ -1,10 +1,15 @@ package rabbitmq import ( + "context" + "encoding/json" "fmt" "log" + "time" "clickandjoin.app/websocketserver/modules/config" + "clickandjoin.app/websocketserver/modules/structs" + "clickandjoin.app/websocketserver/socketclients" amqp "github.com/rabbitmq/amqp091-go" "github.com/sirupsen/logrus" ) @@ -95,6 +100,8 @@ func Init() { go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) + + socketclients.BroadcastMessage(d.Body) } }() @@ -129,6 +136,43 @@ func Init() { <-forever */ } +func PublishBroadcastMessage(rabbitMqMessage structs.RabbitMqMessage) error { + msg, err := json.Marshal(rabbitMqMessage) + + if err != nil { + logrus.Errorln("Failed to marshal rabbitMqMessage, err:", err) + return err + } + + err = publishMessage(msg, MessagesQueue.Name) + + return err +} + +func publishMessage(body []byte, channelName string) error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + err := Channel.PublishWithContext(ctx, + "messages", // exchange + "", // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: body, + }) + + if err != nil { + logrus.Errorln("Failed to publish a message, err:", err) + return err + } + + logrus.Printf("[x] Sent %s\n", body) + + return nil +} + /* func declareQueue(channel *amqp.Channel, name string, queue *amqp.Queue) { q, err := channel.QueueDeclare( diff --git a/modules/structs/SocketClient.go b/modules/structs/SocketClient.go deleted file mode 100644 index 1a8e512..0000000 --- a/modules/structs/SocketClient.go +++ /dev/null @@ -1,7 +0,0 @@ -package structs - -import "github.com/gofiber/websocket/v2" - -type SocketClient struct { - Conn *websocket.Conn -} diff --git a/modules/structs/SocketMessage.go b/modules/structs/SocketMessage.go deleted file mode 100644 index f4a664c..0000000 --- a/modules/structs/SocketMessage.go +++ /dev/null @@ -1,8 +0,0 @@ -package structs - -import "github.com/gofiber/websocket/v2" - -type SocketMessage struct { - Conn *websocket.Conn - Msg []byte -} diff --git a/modules/structs/rabbitmq.go b/modules/structs/rabbitmq.go new file mode 100644 index 0000000..59d5c77 --- /dev/null +++ b/modules/structs/rabbitmq.go @@ -0,0 +1,7 @@ +package structs + +type RabbitMqMessage struct { + Cmd int + Rec string + Body any +} diff --git a/modules/structs/socket.go b/modules/structs/socket.go new file mode 100644 index 0000000..b825436 --- /dev/null +++ b/modules/structs/socket.go @@ -0,0 +1,56 @@ +package structs + +import ( + "encoding/json" + "sync" + + "github.com/gofiber/websocket/v2" + "github.com/sirupsen/logrus" +) + +type SocketClient struct { + Conn *websocket.Conn + connMu sync.Mutex +} + +type SocketMessage struct { + Conn *websocket.Conn + Msg []byte +} + +func (socketClient *SocketClient) SendMessage(message SendSocketMessage) error { + marshaledMessage, err := json.Marshal(message) + + if err != nil { + logrus.Errorln("Failed to send ws message, err:", err) + return err + } + + socketClient.connMu.Lock() + defer socketClient.connMu.Unlock() + + err = socketClient.Conn.WriteMessage(websocket.TextMessage, marshaledMessage) + + if err != nil { + logrus.Errorln("Failed to write ws message, err:", err) + return err + } + + return nil +} + +type SendSocketMessage struct { + Cmd int + Body any +} + +type SocketMessageTest struct { + Cmd int + Body string +} + +type ReceivedSocketMessage struct { + Cmd int + Rec string // represent receiver user id + Body any +} diff --git a/socketclients/socketclients.go b/socketclients/socketclients.go new file mode 100644 index 0000000..5c100f0 --- /dev/null +++ b/socketclients/socketclients.go @@ -0,0 +1,23 @@ +package socketclients + +import ( + "clickandjoin.app/websocketserver/modules/cache" + "clickandjoin.app/websocketserver/modules/structs" + "github.com/gofiber/websocket/v2" +) + +func BroadcastMessage(message []byte) { + for _, client := range cache.SocketClients { + client.Conn.WriteMessage(websocket.TextMessage, message) + } +} + +func IsReceiverConnectedToThisServer(recId string) (isConnected bool, socketClient *structs.SocketClient) { + for id, client := range cache.SocketClients { + if id == recId { + return true, client + } + } + + return false, socketClient +} diff --git a/socketserver/hub.go b/socketserver/hub.go index 41a28fb..85c5348 100644 --- a/socketserver/hub.go +++ b/socketserver/hub.go @@ -1,8 +1,14 @@ package socketserver import ( + "encoding/json" + + "clickandjoin.app/websocketserver/modules/cache" + "clickandjoin.app/websocketserver/modules/rabbitmq" "clickandjoin.app/websocketserver/modules/structs" + "clickandjoin.app/websocketserver/socketclients" "github.com/gofiber/websocket/v2" + "github.com/google/uuid" "github.com/sirupsen/logrus" ) @@ -14,15 +20,53 @@ func RunHub() { for { select { case newSocketClient := <-register: - logrus.Println("register", newSocketClient) + uuid := uuid.New().String() - newSocketClient.Conn.WriteMessage(websocket.TextMessage, []byte("Good afternoon")) + cache.SocketClients[uuid] = newSocketClient + + logrus.Println("register client", uuid) + + // for testing + marshaled, err := json.Marshal(structs.SocketMessageTest{Cmd: 99999, Body: uuid}) + + if err != nil { + logrus.Errorln("Failed to marshal uuid, err:", err) + } + + newSocketClient.Conn.WriteMessage(websocket.TextMessage, []byte(marshaled)) case data := <-broadcast: - logrus.Println("data", data) + recMsg := structs.ReceivedSocketMessage{} + + err := json.Unmarshal(data.Msg, &recMsg) + + if err != nil { + logrus.Errorln("Failed err:", err) + } + + logrus.Println(recMsg) + + if recMsg.Rec != "" { + isConnected, recSocketClient := socketclients.IsReceiverConnectedToThisServer(recMsg.Rec) + + // send message to target receiver when connected to this server + if isConnected { + recSocketClient.SendMessage(structs.SendSocketMessage{Cmd: recMsg.Cmd, Body: recMsg.Body}) + } else { + logrus.Println("rec not found") + + rabbitmq.PublishBroadcastMessage(structs.RabbitMqMessage{Cmd: recMsg.Cmd, Rec: recMsg.Rec, Body: recMsg.Body}) + } + } case connection := <-unregister: logrus.Println("unregister", connection) + + for id, client := range cache.SocketClients { + if connection == client.Conn { + delete(cache.SocketClients, id) + } + } } } } diff --git a/socketserver/server.go b/socketserver/server.go index 30d9405..32eef4a 100644 --- a/socketserver/server.go +++ b/socketserver/server.go @@ -1,11 +1,10 @@ package socketserver import ( - "log" - "clickandjoin.app/websocketserver/modules/structs" "github.com/gofiber/fiber/v2" "github.com/gofiber/websocket/v2" + "github.com/sirupsen/logrus" ) func WebSocketServer(app *fiber.App) { @@ -22,16 +21,16 @@ func WebSocketServer(app *fiber.App) { if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { - log.Println("read error:", err) + logrus.Errorln("Read err:", err) } return } - if messageType == websocket.BinaryMessage { + if messageType == websocket.TextMessage { broadcast <- structs.SocketMessage{Conn: c, Msg: msg} } else { - log.Println("websocket message received of type", messageType) + logrus.Println("websocket message received of type", messageType) } } }))