package rabbitmq import ( "encoding/json" "clickandjoin.app/managementsystem/modules/cache" "clickandjoin.app/managementsystem/socketclients" gocnjhelper "git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper" amqp "github.com/rabbitmq/amqp091-go" ) func LogsMessagesHandling() { msgs, err := logsMessagesConsumer() if err != nil { gocnjhelper.LogErrorf("Failed to create logs queue and binding consumer, err: %s", err) return } channelClosedChannel := make(chan *amqp.Error, 1) LogsClient.Channel.NotifyClose(channelClosedChannel) for { select { case <-channelClosedChannel: // This case handles the event of closed channel e.g. abnormal shutdown msgs, err = logsMessagesConsumer() if err != nil { // If the AMQP channel is not ready, it will continue the loop. Next // iteration will enter this case because chClosedCh is closed by the // library continue } gocnjhelper.LogDebug("Re-set logs channel") // Re-set channel to receive notifications // The library closes this channel after abnormal shutdown channelClosedChannel = make(chan *amqp.Error, 1) LogsClient.Channel.NotifyClose(channelClosedChannel) case msg := <-msgs: gocnjhelper.LogDebugf("RECV msg %s", msg) logMessage := gocnjhelper.RabbitMqLogMessage{} if err := json.Unmarshal(msg.Body, &logMessage); err != nil { gocnjhelper.LogErrorf("Failed to unmarshal json message, err: %s", err) continue } socketclients.BroadcastLogMessage(logMessage) cache.AddLastLogMessage(logMessage) msg.Ack(false) } } } func logsMessagesConsumer() (msgs <-chan amqp.Delivery, err error) { return LogsClient.ConsumeChannelMessages(gocnjhelper.ChannelConsumeSettings{ QueueName: queueLogs, Consumer: "", AutoAck: false, Exclusive: false, NoLocal: false, NoWait: false, Arguments: nil, }) }