ServerSentEventsServer/modules/rabbitmq/helper.go

74 lines
2.2 KiB
Go

package rabbitmq
import (
"clickandjoin.app/serversenteventsserver/modules/cache"
"clickandjoin.app/serversenteventsserver/modules/structs"
"clickandjoin.app/serversenteventsserver/modules/utils"
gocnjhelper "git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper"
amqp "github.com/rabbitmq/amqp091-go"
)
func ApiBroadcastMessagesHandling() {
msgs, err := apiBroadcastMessagesConsumer()
if err != nil {
gocnjhelper.LogErrorf("Failed to create api broadcast message consumer, err: %s", err)
return
}
channelClosedChannel := make(chan *amqp.Error, 1)
ApiBroadcastClient.Channel.NotifyClose(channelClosedChannel)
go func() {
for {
select {
case <-channelClosedChannel:
// This case handles the event of closed channel e.g. abnormal shutdown
msgs, err = apiBroadcastMessagesConsumer()
if err != nil {
// If the AMQP channel is not ready, it will continue the loop. Next
// iteration will enter this case because chClosedCh is closed by the
// library
continue
}
gocnjhelper.LogDebug("Re-set api broadcast channel")
// Re-set channel to receive notifications
// The library closes this channel after abnormal shutdown
channelClosedChannel = make(chan *amqp.Error, 1)
ApiBroadcastClient.Channel.NotifyClose(channelClosedChannel)
case msg := <-msgs:
var receivedMessage structs.ReceivedMessage
err = utils.UnmarshalReceivedMessage(msg.Body, &receivedMessage)
if err != nil {
gocnjhelper.LogErrorf("Failed to unmarshal received msg, err: %s", err)
}
gocnjhelper.LogDebugf("RABBITMQ RECEIVED BROADCAST MESSAGE: %s", receivedMessage)
gocnjhelper.LogDebugf("SENDING MSG TO %s %s", len(cache.SSEClients), "CLIENTS")
for id, sseClient := range cache.SSEClients {
sseClient.MessageChan <- structs.SSEClientChanMessage{ClientId: id, Message: string(msg.Body)}
}
}
}
}()
}
func apiBroadcastMessagesConsumer() (msgs <-chan amqp.Delivery, err error) {
return ApiBroadcastClient.ConsumeChannelMessages(gocnjhelper.ChannelConsumeSettings{
QueueName: ApiBroadcastClient.AssignedQueueName,
Consumer: "",
AutoAck: true,
Exclusive: false,
NoLocal: false,
NoWait: false,
Arguments: nil,
})
}