From 34ba77be106fd1e0e77957b0a68306aae35df6ba Mon Sep 17 00:00:00 2001 From: alex Date: Mon, 12 Dec 2022 22:42:58 +0100 Subject: [PATCH] rabbitmq routing --- modules/rabbitmq/rabbitmq.go | 220 ++++++++++++++++++++++++++++------- modules/structs/socket.go | 1 + socketserver/hub.go | 13 ++- 3 files changed, 188 insertions(+), 46 deletions(-) diff --git a/modules/rabbitmq/rabbitmq.go b/modules/rabbitmq/rabbitmq.go index d623212..8916252 100644 --- a/modules/rabbitmq/rabbitmq.go +++ b/modules/rabbitmq/rabbitmq.go @@ -9,7 +9,6 @@ import ( "clickandjoin.app/websocketserver/modules/config" "clickandjoin.app/websocketserver/modules/structs" - "clickandjoin.app/websocketserver/socketclients" amqp "github.com/rabbitmq/amqp091-go" "github.com/sirupsen/logrus" ) @@ -41,47 +40,47 @@ func Init() { Channel = ch //declareQueue(ch, "messages", &MessagesQueue) + /* + err = ch.ExchangeDeclare( + "messages", // name + "fanout", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments + ) - err = ch.ExchangeDeclare( - "messages", // name - "fanout", // type - true, // durable - false, // auto-deleted - false, // internal - false, // no-wait - nil, // arguments - ) + if err != nil { + logrus.Fatalln("Failed to declare exchange, err:", err) + } - if err != nil { - logrus.Fatalln("Failed to declare exchange, err:", err) - } + q, err := ch.QueueDeclare( + "", // name + false, // durable + false, // delete when unused + true, // exclusive + false, // no-wait + nil, // arguments + ) - q, err := ch.QueueDeclare( - "", // name - false, // durable - false, // delete when unused - true, // exclusive - false, // no-wait - nil, // arguments - ) + if err != nil { + logrus.Fatalln("Failed to declare queue, err:", err) + } - if err != nil { - logrus.Fatalln("Failed to declare queue, err:", err) - } + err = ch.QueueBind( + q.Name, // queue name + "", // routing key + "messages", // exchange + false, + nil, + ) - err = ch.QueueBind( - q.Name, // queue name - "", // routing key - "messages", // exchange - false, - nil, - ) + if err != nil { + logrus.Fatalln("Failed to bind queue, err:", err) + } */ - if err != nil { - logrus.Fatalln("Failed to bind queue, err:", err) - } - - msgs, err := ch.Consume( + /*msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto-ack @@ -93,20 +92,38 @@ func Init() { if err != nil { logrus.Fatalln("Failed to register consumer, err:", err) + } */ + + // test + + err = ch.ExchangeDeclare( + "test", // name + "direct", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments + ) + + if err != nil { + logrus.Fatalln("Failed to declare exchange, err:", err) } - var forever chan struct{} + // listening for messages + /* + var forever chan struct{} - go func() { - for d := range msgs { - log.Printf(" [x] %s", d.Body) + go func() { + for d := range msgs { + log.Printf(" [x] %s", d.Body) - socketclients.BroadcastMessage(d.Body) - } - }() + socketclients.BroadcastMessage(d.Body) + } + }() - log.Printf(" [*] Waiting for messages") - <-forever + log.Printf(" [*] Waiting for messages") + <-forever */ /* @@ -136,6 +153,119 @@ func Init() { <-forever */ } +var msgs chan amqp.Delivery + +func Listener() { + for d := range msgs { + log.Printf(" [x] %s", d.Body) + } +} + +func CreateClientBinding(id string) (queueName string, err error) { + q, err := Channel.QueueDeclare( + "", // name + false, // durable + false, // delete when unused + true, // exclusive + false, // no-wait + nil, // arguments + ) + + if err != nil { + logrus.Errorln("Failed to declare queue, err:", err) + return "", err + } + + err = Channel.QueueBind( + q.Name, + id, + "test", + false, + nil, + ) + + if err != nil { + logrus.Println("Failed to bind queue, err:", err) + return "", err + } + + msgs, err := Channel.Consume( + q.Name, // queue + "", // consumer + true, // auto ack + false, // exclusive + false, // no local + false, // no wait + nil, // args + ) + + if err != nil { + logrus.Println("Failed to register consumer, err:", err) + return "", err + } + + var forever chan struct{} + + go func() { + for d := range msgs { + log.Printf(" [x] %s", d.Body) + } + }() + + log.Printf(" [*] Waiting for logs") + <-forever + + logrus.Println("binded", id, q.Name) + + return q.Name, nil +} + +func DeleteClientBinding(qName string, id string) error { + err := Channel.QueueUnbind( + qName, + id, + "test", + nil, + ) + + if err != nil { + logrus.Println("Failed to unbind queue, err:", err) + return err + } + + return nil +} + +func PublishClientMessage(rabbitMqMessage structs.RabbitMqMessage) error { + msg, err := json.Marshal(rabbitMqMessage) + + if err != nil { + logrus.Errorln("Failed to marshal rabbitMqMessage, err:", err) + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + err = Channel.PublishWithContext(ctx, + "test", // exchange + rabbitMqMessage.Rec, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: msg, + }) + + if err != nil { + logrus.Errorln("Failed to publish client msg, err:", err) + return err + } + + return nil + +} + func PublishBroadcastMessage(rabbitMqMessage structs.RabbitMqMessage) error { msg, err := json.Marshal(rabbitMqMessage) diff --git a/modules/structs/socket.go b/modules/structs/socket.go index b825436..b3f55fb 100644 --- a/modules/structs/socket.go +++ b/modules/structs/socket.go @@ -11,6 +11,7 @@ import ( type SocketClient struct { Conn *websocket.Conn connMu sync.Mutex + QName string } type SocketMessage struct { diff --git a/socketserver/hub.go b/socketserver/hub.go index 85c5348..ea58759 100644 --- a/socketserver/hub.go +++ b/socketserver/hub.go @@ -22,6 +22,14 @@ func RunHub() { case newSocketClient := <-register: uuid := uuid.New().String() + qName, err := rabbitmq.CreateClientBinding(uuid) + + logrus.Println("qName", qName) + + if err == nil { + newSocketClient.QName = qName + } + cache.SocketClients[uuid] = newSocketClient logrus.Println("register client", uuid) @@ -55,7 +63,8 @@ func RunHub() { } else { logrus.Println("rec not found") - rabbitmq.PublishBroadcastMessage(structs.RabbitMqMessage{Cmd: recMsg.Cmd, Rec: recMsg.Rec, Body: recMsg.Body}) + //rabbitmq.PublishBroadcastMessage(structs.RabbitMqMessage{Cmd: recMsg.Cmd, Rec: recMsg.Rec, Body: recMsg.Body}) + rabbitmq.PublishClientMessage(structs.RabbitMqMessage{Cmd: recMsg.Cmd, Rec: recMsg.Rec, Body: recMsg.Body}) } } @@ -65,6 +74,8 @@ func RunHub() { for id, client := range cache.SocketClients { if connection == client.Conn { delete(cache.SocketClients, id) + + rabbitmq.DeleteClientBinding(client.QName, id) } } }