package rabbitmq import ( "clickandjoin.app/serversenteventsserver/modules/cache" "clickandjoin.app/serversenteventsserver/modules/structs" "clickandjoin.app/serversenteventsserver/modules/utils" gocnjhelper "git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper" amqp "github.com/rabbitmq/amqp091-go" ) func ApiBroadcastMessagesHandling() { msgs, err := apiBroadcastMessagesConsumer() if err != nil { gocnjhelper.LogErrorf("Failed to create api broadcast message consumer, err: %s", err) return } channelClosedChannel := make(chan *amqp.Error, 1) ApiBroadcastClient.Channel.NotifyClose(channelClosedChannel) go func() { for { select { case <-channelClosedChannel: // This case handles the event of closed channel e.g. abnormal shutdown msgs, err = apiBroadcastMessagesConsumer() if err != nil { // If the AMQP channel is not ready, it will continue the loop. Next // iteration will enter this case because chClosedCh is closed by the // library continue } gocnjhelper.LogDebug("Re-set api broadcast channel") // Re-set channel to receive notifications // The library closes this channel after abnormal shutdown channelClosedChannel = make(chan *amqp.Error, 1) ApiBroadcastClient.Channel.NotifyClose(channelClosedChannel) case msg := <-msgs: var receivedMessage structs.ReceivedMessage err = utils.UnmarshalReceivedMessage(msg.Body, &receivedMessage) if err != nil { gocnjhelper.LogErrorf("Failed to unmarshal received msg, err: %s", err) } gocnjhelper.LogDebugf("RABBITMQ RECEIVED BROADCAST MESSAGE: %s", receivedMessage) gocnjhelper.LogDebugf("SENDING MSG TO %s %s", len(cache.SSEClients), "CLIENTS") for id, sseClient := range cache.SSEClients { sseClient.MessageChan <- structs.SSEClientChanMessage{ClientId: id, Message: string(msg.Body)} } } } }() } func apiBroadcastMessagesConsumer() (msgs <-chan amqp.Delivery, err error) { return ApiBroadcastClient.ConsumeChannelMessages(gocnjhelper.ChannelConsumeSettings{ QueueName: ApiBroadcastClient.AssignedQueueName, Consumer: "", AutoAck: true, Exclusive: false, NoLocal: false, NoWait: false, Arguments: nil, }) }