From 3bd81e27d66bdef41ecc7365358e09c3f5e85f8a Mon Sep 17 00:00:00 2001 From: alex Date: Sun, 8 Jan 2023 19:58:31 +0100 Subject: [PATCH] implemented go-rabbitmq-client --- go.mod | 1 + go.sum | 2 + modules/rabbitmq/helper.go | 74 ++++++++++++++++++++++ modules/rabbitmq/rabbitmq.go | 119 +++++++++++------------------------ 4 files changed, 113 insertions(+), 83 deletions(-) create mode 100644 modules/rabbitmq/helper.go diff --git a/go.mod b/go.mod index d59c33d..25eee8f 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module clickandjoin.app/serversenteventsserver go 1.19 require ( + git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.23 // indirect github.com/andybalholm/brotli v1.0.4 // indirect github.com/gofiber/adaptor/v2 v2.1.30 // indirect github.com/gofiber/fiber/v2 v2.40.1 // indirect diff --git a/go.sum b/go.sum index b96580d..f3f31d0 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.23 h1:FYMFWE9gXAXtvLzyVmv9yp/AmQos1OfTuVZy6DnQrEw= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.23/go.mod h1:KPbHNtFhttE/TtXZcorq6LKpCigkTaF0qzmB2p7nFsg= github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/modules/rabbitmq/helper.go b/modules/rabbitmq/helper.go new file mode 100644 index 0000000..54a8a57 --- /dev/null +++ b/modules/rabbitmq/helper.go @@ -0,0 +1,74 @@ +package rabbitmq + +import ( + "clickandjoin.app/serversenteventsserver/modules/cache" + "clickandjoin.app/serversenteventsserver/modules/structs" + "clickandjoin.app/serversenteventsserver/modules/utils" + gorabbitmqclient "git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client" + amqp "github.com/rabbitmq/amqp091-go" + "github.com/sirupsen/logrus" +) + +func ApiBroadcastMessagesHandling() { + msgs, err := apiBroadcastMessagesConsumer() + + if err != nil { + logrus.Errorln("Failed to create api broadcast message consumer, err:", err) + return + } + + channelClosedChannel := make(chan *amqp.Error, 1) + ApiBroadcastClient.Channel.NotifyClose(channelClosedChannel) + + go func() { + for { + select { + case <-channelClosedChannel: + // This case handles the event of closed channel e.g. abnormal shutdown + msgs, err = apiBroadcastMessagesConsumer() + + if err != nil { + // If the AMQP channel is not ready, it will continue the loop. Next + // iteration will enter this case because chClosedCh is closed by the + // library + continue + } + + logrus.Debugln("Re-set api broadcast channel") + + // Re-set channel to receive notifications + // The library closes this channel after abnormal shutdown + channelClosedChannel = make(chan *amqp.Error, 1) + ApiBroadcastClient.Channel.NotifyClose(channelClosedChannel) + + case msg := <-msgs: + var receivedMessage structs.ReceivedMessage + + err = utils.UnmarshalReceivedMessage(msg.Body, &receivedMessage) + + if err != nil { + logrus.Errorln("Failed to unmarshal received msg, err:", err) + } + + logrus.Debugln("RABBITMQ RECEIVED BROADCAST MESSAGE:", receivedMessage) + logrus.Debugln("SENDING MSG TO", len(cache.SSEClients), "CLIENTS") + + for id, sseClient := range cache.SSEClients { + sseClient.MessageChan <- structs.SSEClientChanMessage{ClientId: id, Message: string(msg.Body)} + } + } + } + }() +} + +func apiBroadcastMessagesConsumer() (msgs <-chan amqp.Delivery, err error) { + return ApiBroadcastClient.ConsumeChannelMessages(gorabbitmqclient.ChannelConsumeSettings{ + QueueName: ApiBroadcastClient.AssignedQueueName, + Consumer: "", + AutoAck: true, + Exclusive: false, + NoLocal: false, + NoWait: false, + Arguments: nil, + }) +} diff --git a/modules/rabbitmq/rabbitmq.go b/modules/rabbitmq/rabbitmq.go index d921b0d..9af5306 100644 --- a/modules/rabbitmq/rabbitmq.go +++ b/modules/rabbitmq/rabbitmq.go @@ -1,101 +1,54 @@ package rabbitmq import ( - "fmt" + "time" - "clickandjoin.app/serversenteventsserver/modules/cache" "clickandjoin.app/serversenteventsserver/modules/config" - "clickandjoin.app/serversenteventsserver/modules/structs" - "clickandjoin.app/serversenteventsserver/modules/utils" - amqp "github.com/rabbitmq/amqp091-go" - "github.com/sirupsen/logrus" + gorabbitmqclient "git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client" ) -var Conn *amqp.Connection -var Channel *amqp.Channel - const exchangeBroadcastMessages = "cnj.api.broadcast.messages" -func getConnectionString() string { - cfg := &config.Cfg.RabbitMq - - return fmt.Sprintf("amqp://%s:%s@%s/", cfg.Username, cfg.Password, cfg.Host) -} +var ApiBroadcastClient *gorabbitmqclient.Client func Init() { - conn, err := amqp.Dial(getConnectionString()) - - if err != nil { - logrus.Fatalln("RabbitMQ connection failed, err:", err) - } - - ch, err := conn.Channel() - - if err != nil { - logrus.Fatalln(err) - } - - Channel = ch + cfg := config.Cfg /* * api broadcast messages */ + ApiBroadcastClient = gorabbitmqclient.NewClient( + cfg.Debug, + gorabbitmqclient.ExchangeSettings{}, + gorabbitmqclient.QueueSettings{ + Name: "", + Durable: false, + DeleteWhenUnused: false, + Exclusive: true, + NoWait: false, + Arguments: nil, + QueueBindSettings: gorabbitmqclient.QueueBindSettings{ + Enabled: true, + QueueName: gorabbitmqclient.SetQueueNameToAutomaticallyAssignedQueueName, + RoutingKey: "", + Exchange: exchangeBroadcastMessages, + NoWait: false, + Arguments: nil, + }, + }, + gorabbitmqclient.ChannelQosSettingsDefault, + gorabbitmqclient.Config{ + ReconnectDelay: 1 * time.Second, + ReInitDelay: 1 * time.Second, + ResendDelay: 5 * time.Second, + }, + gorabbitmqclient.GetConnectionString( + cfg.RabbitMq.Username, + cfg.RabbitMq.Password, + cfg.RabbitMq.Host)) - q, err := ch.QueueDeclare( - "", // name - false, // durable - false, // delete when unused - true, // exclusive - false, // no-wait - nil, // arguments - ) + // give the connection sometime to setup + <-time.After(time.Second) - if err != nil { - logrus.Fatalln("Failed to declare queue, err:", err) - } - - err = ch.QueueBind( - q.Name, // queue name - "", // routing key - exchangeBroadcastMessages, // exchange - false, - nil, - ) - - if err != nil { - logrus.Fatalln("Failed to declare queue, err:", err) - } - - msgs, err := ch.Consume( - q.Name, // queue - "", // consumer - true, // auto-ack - false, // exclusive - false, // no-local - false, // no-wait - nil, // args - ) - - if err != nil { - logrus.Fatalln("Failed to register consumer, err:", err) - } - - go func() { - for d := range msgs { - var receivedMessage structs.ReceivedMessage - - err = utils.UnmarshalReceivedMessage(d.Body, &receivedMessage) - - if err != nil { - logrus.Errorln("Failed to unmarshal received msg, err:", err) - } - - logrus.Debugln("RABBITMQ RECEIVED BROADCAST MESSAGE:", receivedMessage) - logrus.Debugln("SENDING MSG TO", len(cache.SSEClients), "CLIENTS") - - for id, sseClient := range cache.SSEClients { - sseClient.MessageChan <- structs.SSEClientChanMessage{ClientId: id, Message: string(d.Body)} - } - } - }() + go ApiBroadcastMessagesHandling() }