From cf5e6a29047e83dd506e38d13fa17cac6841a5c6 Mon Sep 17 00:00:00 2001 From: alex Date: Sat, 17 Dec 2022 23:47:51 +0100 Subject: [PATCH] message handling --- main.go | 5 + modules/rabbitmq/rabbitmq.go | 209 ++++++++++------------------------- modules/structs/socket.go | 7 +- socketserver/hub.go | 31 +++--- 4 files changed, 88 insertions(+), 164 deletions(-) diff --git a/main.go b/main.go index eedc40a..f103b03 100644 --- a/main.go +++ b/main.go @@ -2,6 +2,7 @@ package main import ( "os" + "time" "clickandjoin.app/websocketserver/modules/config" "clickandjoin.app/websocketserver/modules/rabbitmq" @@ -37,6 +38,10 @@ func main() { return fiber.ErrUpgradeRequired }) + // wait so that rabbitmq can connect + // TODO: better way to handle this + time.Sleep(500 * time.Millisecond) + go socketserver.RunHub() socketserver.WebSocketServer(app) diff --git a/modules/rabbitmq/rabbitmq.go b/modules/rabbitmq/rabbitmq.go index 8916252..c62b9ce 100644 --- a/modules/rabbitmq/rabbitmq.go +++ b/modules/rabbitmq/rabbitmq.go @@ -9,6 +9,7 @@ import ( "clickandjoin.app/websocketserver/modules/config" "clickandjoin.app/websocketserver/modules/structs" + "github.com/google/uuid" amqp "github.com/rabbitmq/amqp091-go" "github.com/sirupsen/logrus" ) @@ -18,6 +19,8 @@ var Channel *amqp.Channel var MessagesQueue amqp.Queue +const exchangeWebsocketMessages = "cnj.websocketserver.messages" + func getConnectionString() string { cfg := &config.Cfg.RabbitMq @@ -39,129 +42,24 @@ func Init() { Channel = ch - //declareQueue(ch, "messages", &MessagesQueue) - /* - err = ch.ExchangeDeclare( - "messages", // name - "fanout", // type - true, // durable - false, // auto-deleted - false, // internal - false, // no-wait - nil, // arguments - ) - - if err != nil { - logrus.Fatalln("Failed to declare exchange, err:", err) - } - - q, err := ch.QueueDeclare( - "", // name - false, // durable - false, // delete when unused - true, // exclusive - false, // no-wait - nil, // arguments - ) - - if err != nil { - logrus.Fatalln("Failed to declare queue, err:", err) - } - - err = ch.QueueBind( - q.Name, // queue name - "", // routing key - "messages", // exchange - false, - nil, - ) - - if err != nil { - logrus.Fatalln("Failed to bind queue, err:", err) - } */ - - /*msgs, err := ch.Consume( - q.Name, // queue - "", // consumer - true, // auto-ack - false, // exclusive - false, // no-local - false, // no-wait - nil, // args - ) - - if err != nil { - logrus.Fatalln("Failed to register consumer, err:", err) - } */ - - // test + // creates a new exchange if one does not already exist err = ch.ExchangeDeclare( - "test", // name - "direct", // type - true, // durable - false, // auto-deleted - false, // internal - false, // no-wait - nil, // arguments + exchangeWebsocketMessages, // name + "direct", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments ) if err != nil { logrus.Fatalln("Failed to declare exchange, err:", err) } - - // listening for messages - /* - var forever chan struct{} - - go func() { - for d := range msgs { - log.Printf(" [x] %s", d.Body) - - socketclients.BroadcastMessage(d.Body) - } - }() - - log.Printf(" [*] Waiting for messages") - <-forever */ - - /* - - msgs, err := ch.Consume( - MessagesQueue.Name, // queue - "", // consumer - true, // auto-ack - false, // exclusive - false, // no-local - false, // no-wait - nil, // args - ) - - if err != nil { - logrus.Fatalln("Failed to consume msgs, err:", err) - } - - var forever chan struct{} - - go func() { - for d := range msgs { - logrus.Println("Received msg", d.Body, string(d.Body)) - } - }() - - logrus.Println("Waiting for messages") - <-forever */ } -var msgs chan amqp.Delivery - -func Listener() { - for d := range msgs { - log.Printf(" [x] %s", d.Body) - } -} - -func CreateClientBinding(id string) (queueName string, err error) { +func CreateClientBinding(socketClient *structs.SocketClient, id string) (queueName string, err error) { q, err := Channel.QueueDeclare( "", // name false, // durable @@ -177,59 +75,75 @@ func CreateClientBinding(id string) (queueName string, err error) { } err = Channel.QueueBind( - q.Name, - id, - "test", - false, - nil, + q.Name, // name + id, // key + exchangeWebsocketMessages, // exchange + false, // no-wait + nil, // arguments ) if err != nil { - logrus.Println("Failed to bind queue, err:", err) + logrus.Errorln("Failed to bind queue, err:", err) return "", err } + // set an ID for the consumer to delete the consumer after the socket user has disconnected + socketClient.RabbitMqConsumerId = uuid.New().String() + msgs, err := Channel.Consume( - q.Name, // queue - "", // consumer - true, // auto ack - false, // exclusive - false, // no local - false, // no wait - nil, // args + q.Name, // queue + socketClient.RabbitMqConsumerId, // consumer + true, // auto ack + false, // exclusive + false, // no local + false, // no wait + nil, // args ) if err != nil { - logrus.Println("Failed to register consumer, err:", err) + logrus.Errorln("Failed to register consumer, err:", err) return "", err } - var forever chan struct{} - go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) + + var receivedMessage structs.ReceivedSocketMessage + + err := json.Unmarshal(d.Body, &receivedMessage) + + if err != nil { + logrus.Errorln("Failed to unmarshal received msg, err:", err) + } + + logrus.Debugln("RABBITMQ RECEIVED MESSAGE:", receivedMessage) + + err = socketClient.SendMessage(structs.SendSocketMessage{Cmd: receivedMessage.Cmd, Body: receivedMessage.Body}) + + if err != nil { + logrus.Errorln("Failed to send message to client, err:", err) + } } }() - log.Printf(" [*] Waiting for logs") - <-forever - - logrus.Println("binded", id, q.Name) - return q.Name, nil } -func DeleteClientBinding(qName string, id string) error { - err := Channel.QueueUnbind( - qName, - id, - "test", - nil, - ) +func DeleteClient(consumerId string, qName string) error { + // delete consumer + err := Channel.Cancel(consumerId, false) if err != nil { - logrus.Println("Failed to unbind queue, err:", err) + logrus.Errorln("Failed to delete consumer, err:", err) + return err + } + + // delete queue and queue binding + _, err = Channel.QueueDelete(qName, false, true, false) + + if err != nil { + logrus.Errorln("Failed to delete queue, err:", err) return err } @@ -248,10 +162,10 @@ func PublishClientMessage(rabbitMqMessage structs.RabbitMqMessage) error { defer cancel() err = Channel.PublishWithContext(ctx, - "test", // exchange - rabbitMqMessage.Rec, // routing key - false, // mandatory - false, // immediate + exchangeWebsocketMessages, // exchange + rabbitMqMessage.Rec, // routing key + false, // mandatory + false, // immediate amqp.Publishing{ ContentType: "application/json", Body: msg, @@ -263,7 +177,6 @@ func PublishClientMessage(rabbitMqMessage structs.RabbitMqMessage) error { } return nil - } func PublishBroadcastMessage(rabbitMqMessage structs.RabbitMqMessage) error { diff --git a/modules/structs/socket.go b/modules/structs/socket.go index b3f55fb..2d142b5 100644 --- a/modules/structs/socket.go +++ b/modules/structs/socket.go @@ -9,9 +9,10 @@ import ( ) type SocketClient struct { - Conn *websocket.Conn - connMu sync.Mutex - QName string + Conn *websocket.Conn + connMu sync.Mutex + RabbitMqQueueName string + RabbitMqConsumerId string } type SocketMessage struct { diff --git a/socketserver/hub.go b/socketserver/hub.go index ea58759..7a2ec9f 100644 --- a/socketserver/hub.go +++ b/socketserver/hub.go @@ -22,23 +22,25 @@ func RunHub() { case newSocketClient := <-register: uuid := uuid.New().String() - qName, err := rabbitmq.CreateClientBinding(uuid) + queueName, err := rabbitmq.CreateClientBinding(newSocketClient, uuid) - logrus.Println("qName", qName) - - if err == nil { - newSocketClient.QName = qName + if err != nil { + logrus.Fatalln("Failed to create client binding, err:", err) + break } + newSocketClient.RabbitMqQueueName = queueName + cache.SocketClients[uuid] = newSocketClient - logrus.Println("register client", uuid) + logrus.Debugln("REGISTER CLIENT:", uuid) // for testing marshaled, err := json.Marshal(structs.SocketMessageTest{Cmd: 99999, Body: uuid}) if err != nil { logrus.Errorln("Failed to marshal uuid, err:", err) + break } newSocketClient.Conn.WriteMessage(websocket.TextMessage, []byte(marshaled)) @@ -52,30 +54,33 @@ func RunHub() { logrus.Errorln("Failed err:", err) } - logrus.Println(recMsg) + logrus.Debugln("RECEIVED MESSAGE:", recMsg) if recMsg.Rec != "" { isConnected, recSocketClient := socketclients.IsReceiverConnectedToThisServer(recMsg.Rec) // send message to target receiver when connected to this server if isConnected { + logrus.Debugln("FORWARDING MESSAGE: receiver is on connected to this server") recSocketClient.SendMessage(structs.SendSocketMessage{Cmd: recMsg.Cmd, Body: recMsg.Body}) } else { - logrus.Println("rec not found") + logrus.Debugln("FORWARDING MESSAGE: receiver connected to other server") - //rabbitmq.PublishBroadcastMessage(structs.RabbitMqMessage{Cmd: recMsg.Cmd, Rec: recMsg.Rec, Body: recMsg.Body}) - rabbitmq.PublishClientMessage(structs.RabbitMqMessage{Cmd: recMsg.Cmd, Rec: recMsg.Rec, Body: recMsg.Body}) + err = rabbitmq.PublishClientMessage(structs.RabbitMqMessage{Cmd: recMsg.Cmd, Rec: recMsg.Rec, Body: recMsg.Body}) + + if err != nil { + logrus.Fatalln("Failed to publish client message, err:", err) + } } } case connection := <-unregister: - logrus.Println("unregister", connection) - for id, client := range cache.SocketClients { if connection == client.Conn { + logrus.Debugln("UNREGISTER CLIENT:", id) delete(cache.SocketClients, id) - rabbitmq.DeleteClientBinding(client.QName, id) + rabbitmq.DeleteClient(client.RabbitMqConsumerId, client.RabbitMqQueueName) } } }