implemented go-rabbitmq-client

alpha
alex 2023-01-08 19:58:31 +01:00
parent d8ee159296
commit 3bd81e27d6
4 changed files with 113 additions and 83 deletions

1
go.mod
View File

@ -3,6 +3,7 @@ module clickandjoin.app/serversenteventsserver
go 1.19
require (
git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.23 // indirect
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/gofiber/adaptor/v2 v2.1.30 // indirect
github.com/gofiber/fiber/v2 v2.40.1 // indirect

2
go.sum
View File

@ -1,3 +1,5 @@
git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.23 h1:FYMFWE9gXAXtvLzyVmv9yp/AmQos1OfTuVZy6DnQrEw=
git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.23/go.mod h1:KPbHNtFhttE/TtXZcorq6LKpCigkTaF0qzmB2p7nFsg=
github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY=
github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

View File

@ -0,0 +1,74 @@
package rabbitmq
import (
"clickandjoin.app/serversenteventsserver/modules/cache"
"clickandjoin.app/serversenteventsserver/modules/structs"
"clickandjoin.app/serversenteventsserver/modules/utils"
gorabbitmqclient "git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/sirupsen/logrus"
)
func ApiBroadcastMessagesHandling() {
msgs, err := apiBroadcastMessagesConsumer()
if err != nil {
logrus.Errorln("Failed to create api broadcast message consumer, err:", err)
return
}
channelClosedChannel := make(chan *amqp.Error, 1)
ApiBroadcastClient.Channel.NotifyClose(channelClosedChannel)
go func() {
for {
select {
case <-channelClosedChannel:
// This case handles the event of closed channel e.g. abnormal shutdown
msgs, err = apiBroadcastMessagesConsumer()
if err != nil {
// If the AMQP channel is not ready, it will continue the loop. Next
// iteration will enter this case because chClosedCh is closed by the
// library
continue
}
logrus.Debugln("Re-set api broadcast channel")
// Re-set channel to receive notifications
// The library closes this channel after abnormal shutdown
channelClosedChannel = make(chan *amqp.Error, 1)
ApiBroadcastClient.Channel.NotifyClose(channelClosedChannel)
case msg := <-msgs:
var receivedMessage structs.ReceivedMessage
err = utils.UnmarshalReceivedMessage(msg.Body, &receivedMessage)
if err != nil {
logrus.Errorln("Failed to unmarshal received msg, err:", err)
}
logrus.Debugln("RABBITMQ RECEIVED BROADCAST MESSAGE:", receivedMessage)
logrus.Debugln("SENDING MSG TO", len(cache.SSEClients), "CLIENTS")
for id, sseClient := range cache.SSEClients {
sseClient.MessageChan <- structs.SSEClientChanMessage{ClientId: id, Message: string(msg.Body)}
}
}
}
}()
}
func apiBroadcastMessagesConsumer() (msgs <-chan amqp.Delivery, err error) {
return ApiBroadcastClient.ConsumeChannelMessages(gorabbitmqclient.ChannelConsumeSettings{
QueueName: ApiBroadcastClient.AssignedQueueName,
Consumer: "",
AutoAck: true,
Exclusive: false,
NoLocal: false,
NoWait: false,
Arguments: nil,
})
}

View File

@ -1,101 +1,54 @@
package rabbitmq
import (
"fmt"
"time"
"clickandjoin.app/serversenteventsserver/modules/cache"
"clickandjoin.app/serversenteventsserver/modules/config"
"clickandjoin.app/serversenteventsserver/modules/structs"
"clickandjoin.app/serversenteventsserver/modules/utils"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/sirupsen/logrus"
gorabbitmqclient "git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client"
)
var Conn *amqp.Connection
var Channel *amqp.Channel
const exchangeBroadcastMessages = "cnj.api.broadcast.messages"
func getConnectionString() string {
cfg := &config.Cfg.RabbitMq
return fmt.Sprintf("amqp://%s:%s@%s/", cfg.Username, cfg.Password, cfg.Host)
}
var ApiBroadcastClient *gorabbitmqclient.Client
func Init() {
conn, err := amqp.Dial(getConnectionString())
if err != nil {
logrus.Fatalln("RabbitMQ connection failed, err:", err)
}
ch, err := conn.Channel()
if err != nil {
logrus.Fatalln(err)
}
Channel = ch
cfg := config.Cfg
/*
* api broadcast messages
*/
ApiBroadcastClient = gorabbitmqclient.NewClient(
cfg.Debug,
gorabbitmqclient.ExchangeSettings{},
gorabbitmqclient.QueueSettings{
Name: "",
Durable: false,
DeleteWhenUnused: false,
Exclusive: true,
NoWait: false,
Arguments: nil,
QueueBindSettings: gorabbitmqclient.QueueBindSettings{
Enabled: true,
QueueName: gorabbitmqclient.SetQueueNameToAutomaticallyAssignedQueueName,
RoutingKey: "",
Exchange: exchangeBroadcastMessages,
NoWait: false,
Arguments: nil,
},
},
gorabbitmqclient.ChannelQosSettingsDefault,
gorabbitmqclient.Config{
ReconnectDelay: 1 * time.Second,
ReInitDelay: 1 * time.Second,
ResendDelay: 5 * time.Second,
},
gorabbitmqclient.GetConnectionString(
cfg.RabbitMq.Username,
cfg.RabbitMq.Password,
cfg.RabbitMq.Host))
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
// give the connection sometime to setup
<-time.After(time.Second)
if err != nil {
logrus.Fatalln("Failed to declare queue, err:", err)
}
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
exchangeBroadcastMessages, // exchange
false,
nil,
)
if err != nil {
logrus.Fatalln("Failed to declare queue, err:", err)
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
logrus.Fatalln("Failed to register consumer, err:", err)
}
go func() {
for d := range msgs {
var receivedMessage structs.ReceivedMessage
err = utils.UnmarshalReceivedMessage(d.Body, &receivedMessage)
if err != nil {
logrus.Errorln("Failed to unmarshal received msg, err:", err)
}
logrus.Debugln("RABBITMQ RECEIVED BROADCAST MESSAGE:", receivedMessage)
logrus.Debugln("SENDING MSG TO", len(cache.SSEClients), "CLIENTS")
for id, sseClient := range cache.SSEClients {
sseClient.MessageChan <- structs.SSEClientChanMessage{ClientId: id, Message: string(d.Body)}
}
}
}()
go ApiBroadcastMessagesHandling()
}