From eb1a18d03a78eda3f0cf0d2845cee7d7f7624544 Mon Sep 17 00:00:00 2001 From: alex Date: Sun, 8 Jan 2023 16:23:15 +0100 Subject: [PATCH] rabbitmq auto reconnect after connection lost --- go.mod | 1 + go.sum | 40 +++++ modules/rabbitmq/helper.go | 220 +++++++++++++++++++++++ modules/rabbitmq/rabbitmq.go | 339 ++++++++--------------------------- modules/structs/socket.go | 2 + socketserver/hub.go | 7 +- 6 files changed, 336 insertions(+), 273 deletions(-) create mode 100644 modules/rabbitmq/helper.go diff --git a/go.mod b/go.mod index 11ddf58..7e5bc08 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module clickandjoin.app/websocketserver go 1.19 require ( + git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.23 // indirect github.com/andybalholm/brotli v1.0.4 // indirect github.com/fasthttp/websocket v1.5.0 // indirect github.com/gocql/gocql v1.3.1 // indirect diff --git a/go.sum b/go.sum index e98fb39..4e5653b 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,43 @@ +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.4 h1:LdnXOQ+rxyb3F0UsUB8PCe2rj13HF7rPU9/AI6XEE2Y= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.4/go.mod h1:TnJVKLDZ8l/2HFeU6yzSGrxa1A9g4IpXlpLMqbgxnvo= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.5 h1:zDfrB0gutr3XOHBLbWjH1D51d+4QLgr8VNbGYbBCkbg= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.5/go.mod h1:TnJVKLDZ8l/2HFeU6yzSGrxa1A9g4IpXlpLMqbgxnvo= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.6 h1:RLJUVs8EfmhlU9IWHTpqnd/65m58yloJ8vb7geHFOOU= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.6/go.mod h1:TnJVKLDZ8l/2HFeU6yzSGrxa1A9g4IpXlpLMqbgxnvo= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.7 h1:PLdOzcnNxyrnHOg8pRGUJ9lZ+YIB/J16Vspuoytqx3I= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.7/go.mod h1:TnJVKLDZ8l/2HFeU6yzSGrxa1A9g4IpXlpLMqbgxnvo= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.8 h1:uMFmtiaOy8jiqW0HpZ9/9O2tpXplvOjYXIB/spuDib4= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.8/go.mod h1:TnJVKLDZ8l/2HFeU6yzSGrxa1A9g4IpXlpLMqbgxnvo= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.9 h1:yU7apejSLPdNyUNi/k6/Ix+QbSjyDLMgxsX2r9dFPrM= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.9/go.mod h1:TnJVKLDZ8l/2HFeU6yzSGrxa1A9g4IpXlpLMqbgxnvo= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.10 h1:lzYHPbLKuSe6Uh7fifvRcWy/6ZkszZ9/vAKrhHsBRE0= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.10/go.mod h1:TnJVKLDZ8l/2HFeU6yzSGrxa1A9g4IpXlpLMqbgxnvo= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.11 h1:CcdPpf2DcdZiCJ1oGj4ExN3VGMeubmZBdjVmLAFQUdA= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.11/go.mod h1:TnJVKLDZ8l/2HFeU6yzSGrxa1A9g4IpXlpLMqbgxnvo= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.12 h1:/KZI3TdUGp8kMb+0xbafvvnvU3lorieJetnKE8KMEmE= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.12/go.mod h1:TnJVKLDZ8l/2HFeU6yzSGrxa1A9g4IpXlpLMqbgxnvo= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.13 h1:IKEIiXplAyutn/tMUjLkF5lbfmeWpL9rMcLYaMS0uDM= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.13/go.mod h1:TnJVKLDZ8l/2HFeU6yzSGrxa1A9g4IpXlpLMqbgxnvo= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.14 h1:lfjHLJVILSYhHSIMensVOqlhUrAmISSmEEtpZh8QJAk= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.14/go.mod h1:KPbHNtFhttE/TtXZcorq6LKpCigkTaF0qzmB2p7nFsg= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.15 h1:QVZrkx+/gTt/E7DtGYwGnI9hQ66/D/F3YWwFdBPdtyg= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.15/go.mod h1:KPbHNtFhttE/TtXZcorq6LKpCigkTaF0qzmB2p7nFsg= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.16 h1:i9+wrkOQQbhuRU1xMaWTr2Y4JFdOQamTS/BZO8pCMjc= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.16/go.mod h1:KPbHNtFhttE/TtXZcorq6LKpCigkTaF0qzmB2p7nFsg= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.17 h1:Xh62QbcYLj/07r2hoR+Dz9K0vkbuP8oZ2NdspCfXPmc= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.17/go.mod h1:KPbHNtFhttE/TtXZcorq6LKpCigkTaF0qzmB2p7nFsg= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.18 h1:7WzhKv+b/leJoqa9h/kPlEcN3zijHXD88aPYtCboWhY= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.18/go.mod h1:KPbHNtFhttE/TtXZcorq6LKpCigkTaF0qzmB2p7nFsg= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.19 h1:RDvCD1EtTMe+eDWitGnyA6DwlmO10NqfXLRvGw1Ieho= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.19/go.mod h1:KPbHNtFhttE/TtXZcorq6LKpCigkTaF0qzmB2p7nFsg= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.20 h1:0fFXnoqdnzXxmLRBf3Rnp9UTuUT0zmrnIxO7OA+IZao= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.20/go.mod h1:KPbHNtFhttE/TtXZcorq6LKpCigkTaF0qzmB2p7nFsg= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.21 h1:6VgGO8GsrPtBfS6tcamw2OFRt4XplIKAY6zpgNbCvdg= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.21/go.mod h1:KPbHNtFhttE/TtXZcorq6LKpCigkTaF0qzmB2p7nFsg= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.22 h1:26wm+XTVNsoQxg8RHzkwS/s5ZKSjTvjQXg2GTM7Pwgw= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.22/go.mod h1:KPbHNtFhttE/TtXZcorq6LKpCigkTaF0qzmB2p7nFsg= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.23 h1:FYMFWE9gXAXtvLzyVmv9yp/AmQos1OfTuVZy6DnQrEw= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.23/go.mod h1:KPbHNtFhttE/TtXZcorq6LKpCigkTaF0qzmB2p7nFsg= github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= diff --git a/modules/rabbitmq/helper.go b/modules/rabbitmq/helper.go new file mode 100644 index 0000000..22251de --- /dev/null +++ b/modules/rabbitmq/helper.go @@ -0,0 +1,220 @@ +package rabbitmq + +import ( + "context" + "encoding/json" + + "clickandjoin.app/websocketserver/modules/structs" + "clickandjoin.app/websocketserver/modules/utils" + "clickandjoin.app/websocketserver/socketclients" + gorabbitmqclient "git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client" + "github.com/google/uuid" + amqp "github.com/rabbitmq/amqp091-go" + "github.com/sirupsen/logrus" +) + +func PublishClientMessage(rabbitMqMessage structs.RabbitMqMessage) error { + msg, err := json.Marshal(rabbitMqMessage) + + if err != nil { + logrus.Errorln("Failed to marshal rabbitMqMessage, err:", err) + return err + } + + err = WebsocketClient.PushExchangeMessage(rabbitMqMessage.Rec, msg) + + if err != nil { + logrus.Errorln("Failed to publish client msg, err:", err) + return err + } + + return nil +} + +// For each user a queue is created in the websocket exchange channel where he receives his messages +func CreateWSClientBinding(socketClient *structs.SocketClient, id string) (err error) { + msgs, err := createWSClientQueueAndBindingConsumer(socketClient, id) + + if err != nil { + logrus.Errorln("Failed to create ws client queue and binding consumer, err:", err) + return err + } + + // canceled when websocket client disconnects + ctx, cancel := context.WithCancel(context.Background()) + + socketClient.CancelFunc = cancel + + go func() { + for { + select { + case <-ctx.Done(): + logrus.Println("DONE!") + return + case <-WebSocketChannelClosedChannel: + // This case handles the event of closed channel e.g. abnormal shutdown + msgs, err = createWSClientQueueAndBindingConsumer(socketClient, id) + + if err != nil { + // If the AMQP channel is not ready, it will continue the loop. Next + // iteration will enter this case because chClosedCh is closed by the + // library + continue + } + + logrus.Debugln("Re-set ws client channel", socketClient.RabbitMqConsumerId) + + // Re-set channel to receive notifications + // The library closes this channel after abnormal shutdown + WebSocketChannelClosedChannel = make(chan *amqp.Error, 1) + WebsocketClient.Channel.NotifyClose(WebSocketChannelClosedChannel) + + case msg := <-msgs: + var receivedMessage structs.ReceivedMessage + + err = utils.UnmarshalReceivedMessage(msg.Body, &receivedMessage) + + if err != nil { + logrus.Errorln("Failed to unmarshal received msg, err:", err) + } + + logrus.Debugln("RABBITMQ RECEIVED MESSAGE:", receivedMessage) + + err = socketClient.SendMessage(structs.SendSocketMessage{Cmd: receivedMessage.Cmd, Body: receivedMessage.Body}) + + if err != nil { + logrus.Errorln("Failed to send message to client, err:", err) + } + } + } + }() + + return nil +} + +func createWSClientQueueAndBindingConsumer(socketClient *structs.SocketClient, id string) (msgs <-chan amqp.Delivery, err error) { + q, err := WebsocketClient.QueueDeclare(gorabbitmqclient.QueueDeclareSettings{ + Name: "", + Durable: false, + DeleteWhenUnused: false, + Exclusive: true, + NoWait: false, + Arguments: nil, + }) + + if err != nil { + return msgs, err + } + + // set an ID for the consumer to delete the consumer after the socket user has disconnected + socketClient.RabbitMqConsumerId = uuid.New().String() + + // store queue name in the client to delete it when the client disconnects from the websocket + socketClient.RabbitMqQueueName = q.Name + + err = WebsocketClient.QueueBind(gorabbitmqclient.QueueBindSettings{ + QueueName: q.Name, + RoutingKey: id, + Exchange: exchangeWebsocketMessages, + NoWait: false, + Arguments: nil, + }) + + if err != nil { + logrus.Errorln("Failed to bind queue, err:", err) + return msgs, err + } + + return WebsocketClient.ConsumeChannelMessages(gorabbitmqclient.ChannelConsumeSettings{ + QueueName: q.Name, + Consumer: socketClient.RabbitMqConsumerId, + AutoAck: true, + Exclusive: false, + NoLocal: false, + NoWait: false, + Arguments: nil, + }) +} + +func DeleteWSClient(consumerId string, qName string) error { + // delete consumer + err := WebsocketClient.Channel.Cancel(consumerId, false) + + if err != nil { + logrus.Errorln("Failed to delete consumer, err:", err) + return err + } + + // delete queue and queue binding + _, err = WebsocketClient.Channel.QueueDelete(qName, false, true, false) + + if err != nil { + logrus.Errorln("Failed to delete queue, err:", err) + return err + } + + logrus.Println("deleted", consumerId, qName) + + return nil +} + +func ApiBroadcastMessagesHandling() { + msgs, err := apiBroadcastMessagesConsumer() + + if err != nil { + logrus.Errorln("Failed to create api broadcast message consumer, err:", err) + return + } + + channelClosedChannel := make(chan *amqp.Error, 1) + ApiBroadcastClient.Channel.NotifyClose(channelClosedChannel) + + go func() { + for { + select { + case <-channelClosedChannel: + // This case handles the event of closed channel e.g. abnormal shutdown + msgs, err = apiBroadcastMessagesConsumer() + + if err != nil { + // If the AMQP channel is not ready, it will continue the loop. Next + // iteration will enter this case because chClosedCh is closed by the + // library + continue + } + + logrus.Debugln("Re-set api broadcast channel") + + // Re-set channel to receive notifications + // The library closes this channel after abnormal shutdown + channelClosedChannel = make(chan *amqp.Error, 1) + WebsocketClient.Channel.NotifyClose(channelClosedChannel) + + case msg := <-msgs: + var receivedMessage structs.ReceivedMessage + + err = utils.UnmarshalReceivedMessage(msg.Body, &receivedMessage) + + if err != nil { + logrus.Errorln("Failed to unmarshal received msg, err:", err) + } + + logrus.Debugln("RABBITMQ RECEIVED BROADCAST MESSAGE:", receivedMessage) + + socketclients.BroadcastMessage(structs.SendSocketMessage{Cmd: receivedMessage.Cmd, Body: receivedMessage.Body}) + } + } + }() +} + +func apiBroadcastMessagesConsumer() (msgs <-chan amqp.Delivery, err error) { + return ApiBroadcastClient.ConsumeChannelMessages(gorabbitmqclient.ChannelConsumeSettings{ + QueueName: ApiBroadcastClient.AssignedQueueName, + Consumer: "", + AutoAck: true, + Exclusive: false, + NoLocal: false, + NoWait: false, + Arguments: nil, + }) +} diff --git a/modules/rabbitmq/rabbitmq.go b/modules/rabbitmq/rabbitmq.go index fdbfbe8..2a38b39 100644 --- a/modules/rabbitmq/rabbitmq.go +++ b/modules/rabbitmq/rabbitmq.go @@ -1,300 +1,101 @@ package rabbitmq import ( - "context" - "encoding/json" - "fmt" "time" "clickandjoin.app/websocketserver/modules/config" - "clickandjoin.app/websocketserver/modules/structs" - "clickandjoin.app/websocketserver/modules/utils" - "clickandjoin.app/websocketserver/socketclients" - "github.com/google/uuid" + gorabbitmqclient "git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client" amqp "github.com/rabbitmq/amqp091-go" - "github.com/sirupsen/logrus" ) -var Conn *amqp.Connection -var Channel *amqp.Channel - const ( exchangeWebsocketMessages = "cnj.websocketserver.messages" exchangeBroadcastMessages = "cnj.api.broadcast.messages" ) -func getConnectionString() string { - cfg := &config.Cfg.RabbitMq - - return fmt.Sprintf("amqp://%s:%s@%s/", cfg.Username, cfg.Password, cfg.Host) -} +var ( + WebsocketClient *gorabbitmqclient.Client + ApiBroadcastClient *gorabbitmqclient.Client + ConnectionAddress string + WebSocketChannelClosedChannel chan *amqp.Error // informs when the connection has been lost +) func Init() { - conn, err := amqp.Dial(getConnectionString()) + cfg := config.Cfg - if err != nil { - logrus.Fatalln("RabbitMQ connection failed, err:", err) - } - - ch, err := conn.Channel() - - if err != nil { - logrus.Fatalln(err) - } - - Channel = ch + ConnectionAddress = gorabbitmqclient.GetConnectionString(cfg.RabbitMq.Username, cfg.RabbitMq.Password, cfg.RabbitMq.Host) /* * websocketserver messages */ - // creates a new exchange if one does not already exist - - err = ch.ExchangeDeclare( - exchangeWebsocketMessages, // name - "direct", // type - true, // durable - false, // auto-deleted - false, // internal - false, // no-wait - nil, // arguments - ) - - if err != nil { - logrus.Fatalln("Failed to declare exchange, err:", err) - } + WebsocketClient = gorabbitmqclient.NewClient( + cfg.Debug, + gorabbitmqclient.ExchangeSettings{ + Name: exchangeWebsocketMessages, + Type: "direct", + Durable: true, + AutoDeleted: false, + Internal: false, + NoWait: false, + Arguments: nil, + ExchangePublishSettings: gorabbitmqclient.ExchangePublishSettings{ + Mandatory: false, + Immediate: false, + ContentType: gorabbitmqclient.ContentTypeJson, + }, + }, + gorabbitmqclient.QueueSettings{}, + gorabbitmqclient.ChannelQosSettingsDefault, + gorabbitmqclient.Config{ + ReconnectDelay: 1 * time.Second, + ReInitDelay: 1 * time.Second, + ResendDelay: 5 * time.Second, + }, + ConnectionAddress) /* * api broadcast messages */ - q, err := ch.QueueDeclare( - "", // name - false, // durable - false, // delete when unused - true, // exclusive - false, // no-wait - nil, // arguments - ) + ApiBroadcastClient = gorabbitmqclient.NewClient( + cfg.Debug, + gorabbitmqclient.ExchangeSettings{}, + gorabbitmqclient.QueueSettings{ + Name: "", + Durable: false, + DeleteWhenUnused: false, + Exclusive: true, + NoWait: false, + Arguments: nil, + QueueBindSettings: gorabbitmqclient.QueueBindSettings{ + Enabled: true, + QueueName: gorabbitmqclient.SetQueueNameToAutomaticallyAssignedQueueName, + RoutingKey: "", + Exchange: exchangeBroadcastMessages, + NoWait: false, + Arguments: nil, + }, + }, + gorabbitmqclient.ChannelQosSettingsDefault, + gorabbitmqclient.Config{ + ReconnectDelay: 1 * time.Second, + ReInitDelay: 1 * time.Second, + ResendDelay: 5 * time.Second, + }, + ConnectionAddress) - if err != nil { - logrus.Fatalln("Failed to declare queue, err:", err) - } + // give the connection sometime to setup + <-time.After(time.Second) - err = ch.QueueBind( - q.Name, // queue name - "", // routing key - exchangeBroadcastMessages, // exchange - false, - nil, - ) + /* + * websocketserver messages + */ + WebSocketChannelClosedChannel = make(chan *amqp.Error, 1) + WebsocketClient.Channel.NotifyClose(WebSocketChannelClosedChannel) - if err != nil { - logrus.Fatalln("Failed to declare queue, err:", err) - } - - msgs, err := ch.Consume( - q.Name, // queue - "", // consumer - true, // auto-ack - false, // exclusive - false, // no-local - false, // no-wait - nil, // args - ) - - if err != nil { - logrus.Fatalln("Failed to register consumer, err:", err) - } - - go func() { - for d := range msgs { - var receivedMessage structs.ReceivedMessage - - err = utils.UnmarshalReceivedMessage(d.Body, &receivedMessage) - - if err != nil { - logrus.Errorln("Failed to unmarshal received msg, err:", err) - } - - logrus.Debugln("RABBITMQ RECEIVED BROADCAST MESSAGE:", receivedMessage) - - socketclients.BroadcastMessage(structs.SendSocketMessage{Cmd: receivedMessage.Cmd, Body: receivedMessage.Body}) - } - }() + /* + * api broadcast messages + */ + go ApiBroadcastMessagesHandling() } - -func CreateClientBinding(socketClient *structs.SocketClient, id string) (queueName string, err error) { - q, err := Channel.QueueDeclare( - "", // name - false, // durable - false, // delete when unused - true, // exclusive - false, // no-wait - nil, // arguments - ) - - if err != nil { - logrus.Errorln("Failed to declare queue, err:", err) - return "", err - } - - err = Channel.QueueBind( - q.Name, // name - id, // key - exchangeWebsocketMessages, // exchange - false, // no-wait - nil, // arguments - ) - - if err != nil { - logrus.Errorln("Failed to bind queue, err:", err) - return "", err - } - - // set an ID for the consumer to delete the consumer after the socket user has disconnected - socketClient.RabbitMqConsumerId = uuid.New().String() - - msgs, err := Channel.Consume( - q.Name, // queue - socketClient.RabbitMqConsumerId, // consumer - true, // auto ack - false, // exclusive - false, // no local - false, // no wait - nil, // args - ) - - if err != nil { - logrus.Errorln("Failed to register consumer, err:", err) - return "", err - } - - go func() { - for d := range msgs { - var receivedMessage structs.ReceivedMessage - - err = utils.UnmarshalReceivedMessage(d.Body, &receivedMessage) - - if err != nil { - logrus.Errorln("Failed to unmarshal received msg, err:", err) - } - - logrus.Debugln("RABBITMQ RECEIVED MESSAGE:", receivedMessage) - - err = socketClient.SendMessage(structs.SendSocketMessage{Cmd: receivedMessage.Cmd, Body: receivedMessage.Body}) - - if err != nil { - logrus.Errorln("Failed to send message to client, err:", err) - } - } - }() - - return q.Name, nil -} - -func DeleteClient(consumerId string, qName string) error { - // delete consumer - err := Channel.Cancel(consumerId, false) - - if err != nil { - logrus.Errorln("Failed to delete consumer, err:", err) - return err - } - - // delete queue and queue binding - _, err = Channel.QueueDelete(qName, false, true, false) - - if err != nil { - logrus.Errorln("Failed to delete queue, err:", err) - return err - } - - return nil -} - -func PublishClientMessage(rabbitMqMessage structs.RabbitMqMessage) error { - msg, err := json.Marshal(rabbitMqMessage) - - if err != nil { - logrus.Errorln("Failed to marshal rabbitMqMessage, err:", err) - return err - } - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - err = Channel.PublishWithContext(ctx, - exchangeWebsocketMessages, // exchange - rabbitMqMessage.Rec, // routing key - false, // mandatory - false, // immediate - amqp.Publishing{ - ContentType: "application/json", - Body: msg, - }) - - if err != nil { - logrus.Errorln("Failed to publish client msg, err:", err) - return err - } - - return nil -} - -/* -func PublishBroadcastMessage(rabbitMqMessage structs.RabbitMqMessage) error { - msg, err := json.Marshal(rabbitMqMessage) - - if err != nil { - logrus.Errorln("Failed to marshal rabbitMqMessage, err:", err) - return err - } - - err = publishMessage(msg, MessagesQueue.Name) - - return err -} - -func publishMessage(body []byte, channelName string) error { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - err := Channel.PublishWithContext(ctx, - "messages", // exchange - "", // routing key - false, // mandatory - false, // immediate - amqp.Publishing{ - ContentType: "application/json", - Body: body, - }) - - if err != nil { - logrus.Errorln("Failed to publish a message, err:", err) - return err - } - - logrus.Printf("[x] Sent %s\n", body) - - return nil -} */ - -/* -func declareQueue(channel *amqp.Channel, name string, queue *amqp.Queue) { - q, err := channel.QueueDeclare( - name, // name - true, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments - ) - - if err != nil { - log.Fatalln("Failed to declare a queue", err) - } - - *queue = q -} -*/ diff --git a/modules/structs/socket.go b/modules/structs/socket.go index 9dac536..7079182 100644 --- a/modules/structs/socket.go +++ b/modules/structs/socket.go @@ -1,6 +1,7 @@ package structs import ( + "context" "encoding/json" "sync" @@ -13,6 +14,7 @@ type SocketClient struct { connMu sync.Mutex RabbitMqQueueName string RabbitMqConsumerId string + CancelFunc context.CancelFunc } type SocketMessage struct { diff --git a/socketserver/hub.go b/socketserver/hub.go index f446c4e..3702147 100644 --- a/socketserver/hub.go +++ b/socketserver/hub.go @@ -21,15 +21,13 @@ func RunHub() { case newSocketClient := <-register: uuid := uuid.New().String() - queueName, err := rabbitmq.CreateClientBinding(newSocketClient, uuid) + err := rabbitmq.CreateWSClientBinding(newSocketClient, uuid) if err != nil { logrus.Errorln("Failed to create client binding, err:", err) break } - newSocketClient.RabbitMqQueueName = queueName - cache.SocketClients[uuid] = newSocketClient logrus.Debugln("REGISTER CLIENT:", uuid) @@ -79,7 +77,8 @@ func RunHub() { logrus.Debugln("UNREGISTER CLIENT:", id) delete(cache.SocketClients, id) - rabbitmq.DeleteClient(client.RabbitMqConsumerId, client.RabbitMqQueueName) + client.CancelFunc() + rabbitmq.DeleteWSClient(client.RabbitMqConsumerId, client.RabbitMqQueueName) } } }