diff --git a/modules/rabbitmq/rabbitmq.go b/modules/rabbitmq/rabbitmq.go index c62b9ce..fdbfbe8 100644 --- a/modules/rabbitmq/rabbitmq.go +++ b/modules/rabbitmq/rabbitmq.go @@ -4,11 +4,12 @@ import ( "context" "encoding/json" "fmt" - "log" "time" "clickandjoin.app/websocketserver/modules/config" "clickandjoin.app/websocketserver/modules/structs" + "clickandjoin.app/websocketserver/modules/utils" + "clickandjoin.app/websocketserver/socketclients" "github.com/google/uuid" amqp "github.com/rabbitmq/amqp091-go" "github.com/sirupsen/logrus" @@ -17,9 +18,10 @@ import ( var Conn *amqp.Connection var Channel *amqp.Channel -var MessagesQueue amqp.Queue - -const exchangeWebsocketMessages = "cnj.websocketserver.messages" +const ( + exchangeWebsocketMessages = "cnj.websocketserver.messages" + exchangeBroadcastMessages = "cnj.api.broadcast.messages" +) func getConnectionString() string { cfg := &config.Cfg.RabbitMq @@ -42,6 +44,9 @@ func Init() { Channel = ch + /* + * websocketserver messages + */ // creates a new exchange if one does not already exist err = ch.ExchangeDeclare( @@ -57,6 +62,66 @@ func Init() { if err != nil { logrus.Fatalln("Failed to declare exchange, err:", err) } + + /* + * api broadcast messages + */ + + q, err := ch.QueueDeclare( + "", // name + false, // durable + false, // delete when unused + true, // exclusive + false, // no-wait + nil, // arguments + ) + + if err != nil { + logrus.Fatalln("Failed to declare queue, err:", err) + } + + err = ch.QueueBind( + q.Name, // queue name + "", // routing key + exchangeBroadcastMessages, // exchange + false, + nil, + ) + + if err != nil { + logrus.Fatalln("Failed to declare queue, err:", err) + } + + msgs, err := ch.Consume( + q.Name, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + + if err != nil { + logrus.Fatalln("Failed to register consumer, err:", err) + } + + go func() { + for d := range msgs { + var receivedMessage structs.ReceivedMessage + + err = utils.UnmarshalReceivedMessage(d.Body, &receivedMessage) + + if err != nil { + logrus.Errorln("Failed to unmarshal received msg, err:", err) + } + + logrus.Debugln("RABBITMQ RECEIVED BROADCAST MESSAGE:", receivedMessage) + + socketclients.BroadcastMessage(structs.SendSocketMessage{Cmd: receivedMessage.Cmd, Body: receivedMessage.Body}) + } + }() + } func CreateClientBinding(socketClient *structs.SocketClient, id string) (queueName string, err error) { @@ -107,11 +172,9 @@ func CreateClientBinding(socketClient *structs.SocketClient, id string) (queueNa go func() { for d := range msgs { - log.Printf(" [x] %s", d.Body) + var receivedMessage structs.ReceivedMessage - var receivedMessage structs.ReceivedSocketMessage - - err := json.Unmarshal(d.Body, &receivedMessage) + err = utils.UnmarshalReceivedMessage(d.Body, &receivedMessage) if err != nil { logrus.Errorln("Failed to unmarshal received msg, err:", err) @@ -179,6 +242,7 @@ func PublishClientMessage(rabbitMqMessage structs.RabbitMqMessage) error { return nil } +/* func PublishBroadcastMessage(rabbitMqMessage structs.RabbitMqMessage) error { msg, err := json.Marshal(rabbitMqMessage) @@ -214,7 +278,7 @@ func publishMessage(body []byte, channelName string) error { logrus.Printf("[x] Sent %s\n", body) return nil -} +} */ /* func declareQueue(channel *amqp.Channel, name string, queue *amqp.Queue) { diff --git a/modules/structs/message.go b/modules/structs/message.go new file mode 100644 index 0000000..38f578f --- /dev/null +++ b/modules/structs/message.go @@ -0,0 +1,7 @@ +package structs + +type ReceivedMessage struct { + Cmd int + Rec string // represent receiver user id + Body any +} diff --git a/modules/structs/socket.go b/modules/structs/socket.go index 2d142b5..9dac536 100644 --- a/modules/structs/socket.go +++ b/modules/structs/socket.go @@ -50,9 +50,3 @@ type SocketMessageTest struct { Cmd int Body string } - -type ReceivedSocketMessage struct { - Cmd int - Rec string // represent receiver user id - Body any -} diff --git a/modules/utils/utils.go b/modules/utils/utils.go new file mode 100644 index 0000000..3e3bb17 --- /dev/null +++ b/modules/utils/utils.go @@ -0,0 +1,30 @@ +package utils + +import ( + "encoding/json" + + "clickandjoin.app/websocketserver/modules/structs" + "github.com/sirupsen/logrus" +) + +func MarshalMessage(message any) (marshaledMessage []byte, err error) { + marshaledMessage, err = json.Marshal(message) + + if err != nil { + logrus.Errorln("Failed to marshal send message, err:", err) + return nil, err + } + + return marshaledMessage, nil +} + +func UnmarshalReceivedMessage(body []byte, receivedMessage *structs.ReceivedMessage) error { + err := json.Unmarshal(body, &receivedMessage) + + if err != nil { + logrus.Errorln("Failed to unmarshal received message, err:", err) + return err + } + + return nil +} diff --git a/socketclients/socketclients.go b/socketclients/socketclients.go index 5c100f0..28803f2 100644 --- a/socketclients/socketclients.go +++ b/socketclients/socketclients.go @@ -3,12 +3,11 @@ package socketclients import ( "clickandjoin.app/websocketserver/modules/cache" "clickandjoin.app/websocketserver/modules/structs" - "github.com/gofiber/websocket/v2" ) -func BroadcastMessage(message []byte) { +func BroadcastMessage(sendSocketMessage structs.SendSocketMessage) { for _, client := range cache.SocketClients { - client.Conn.WriteMessage(websocket.TextMessage, message) + client.SendMessage(sendSocketMessage) } } diff --git a/socketserver/hub.go b/socketserver/hub.go index 7a2ec9f..f446c4e 100644 --- a/socketserver/hub.go +++ b/socketserver/hub.go @@ -1,11 +1,10 @@ package socketserver import ( - "encoding/json" - "clickandjoin.app/websocketserver/modules/cache" "clickandjoin.app/websocketserver/modules/rabbitmq" "clickandjoin.app/websocketserver/modules/structs" + "clickandjoin.app/websocketserver/modules/utils" "clickandjoin.app/websocketserver/socketclients" "github.com/gofiber/websocket/v2" "github.com/google/uuid" @@ -25,7 +24,7 @@ func RunHub() { queueName, err := rabbitmq.CreateClientBinding(newSocketClient, uuid) if err != nil { - logrus.Fatalln("Failed to create client binding, err:", err) + logrus.Errorln("Failed to create client binding, err:", err) break } @@ -36,7 +35,7 @@ func RunHub() { logrus.Debugln("REGISTER CLIENT:", uuid) // for testing - marshaled, err := json.Marshal(structs.SocketMessageTest{Cmd: 99999, Body: uuid}) + marshaled, err := utils.MarshalMessage(structs.SocketMessageTest{Cmd: 99999, Body: uuid}) if err != nil { logrus.Errorln("Failed to marshal uuid, err:", err) @@ -46,30 +45,30 @@ func RunHub() { newSocketClient.Conn.WriteMessage(websocket.TextMessage, []byte(marshaled)) case data := <-broadcast: - recMsg := structs.ReceivedSocketMessage{} + var receivedMessage structs.ReceivedMessage - err := json.Unmarshal(data.Msg, &recMsg) + err := utils.UnmarshalReceivedMessage(data.Msg, &receivedMessage) if err != nil { - logrus.Errorln("Failed err:", err) + logrus.Errorln("Failed to unmarshal received msg, err:", err) } - logrus.Debugln("RECEIVED MESSAGE:", recMsg) + logrus.Debugln("RECEIVED WEBSOCKET MESSAGE:", receivedMessage) - if recMsg.Rec != "" { - isConnected, recSocketClient := socketclients.IsReceiverConnectedToThisServer(recMsg.Rec) + if receivedMessage.Rec != "" { + isConnected, recSocketClient := socketclients.IsReceiverConnectedToThisServer(receivedMessage.Rec) // send message to target receiver when connected to this server if isConnected { logrus.Debugln("FORWARDING MESSAGE: receiver is on connected to this server") - recSocketClient.SendMessage(structs.SendSocketMessage{Cmd: recMsg.Cmd, Body: recMsg.Body}) + recSocketClient.SendMessage(structs.SendSocketMessage{Cmd: receivedMessage.Cmd, Body: receivedMessage.Body}) } else { logrus.Debugln("FORWARDING MESSAGE: receiver connected to other server") - err = rabbitmq.PublishClientMessage(structs.RabbitMqMessage{Cmd: recMsg.Cmd, Rec: recMsg.Rec, Body: recMsg.Body}) + err = rabbitmq.PublishClientMessage(structs.RabbitMqMessage{Cmd: receivedMessage.Cmd, Rec: receivedMessage.Rec, Body: receivedMessage.Body}) if err != nil { - logrus.Fatalln("Failed to publish client message, err:", err) + logrus.Errorln("Failed to publish client message, err:", err) } } }