package rabbitmq import ( "encoding/json" "fmt" "jannex/log-manager/modules/loghandler" "jannex/log-manager/modules/structs" "time" gocnjhelper "git.ex.umbach.dev/ClickandJoin/go-cnj-helper" amqp "github.com/rabbitmq/amqp091-go" ) func LogsMessagesHandling() { msgs, err := mailMessagesConsumer() if err != nil { gocnjhelper.LogErrorf("Failed to create mail queue and binding consumer, err: %s", err) return } channelClosedChannel := make(chan *amqp.Error, 1) LogsClient.Channel.NotifyClose(channelClosedChannel) for { select { case <-channelClosedChannel: // This case handles the event of closed channel e.g. abnormal shutdown msgs, err = mailMessagesConsumer() if err != nil { // If the AMQP channel is not ready, it will continue the loop. Next // iteration will enter this case because chClosedCh is closed by the // library continue } fmt.Println("Re-set api broadcast channel") // Re-set channel to receive notifications // The library closes this channel after abnormal shutdown channelClosedChannel = make(chan *amqp.Error, 1) LogsClient.Channel.NotifyClose(channelClosedChannel) case msg := <-msgs: fmt.Printf("Received a message: %s\n", msg.Body) logMessage := []structs.RabbitMqLogMessage{} gocnjhelper.LogDebug(string(msg.Body)) if err := json.Unmarshal(msg.Body, &logMessage); err != nil { fmt.Printf("Failed to unmarshal json message, err: %s\n", err) continue } fmt.Println(logMessage) var msgs []string date := time.Now().Format("15:04:05") for _, mailMessage := range logMessage { msgs = append(msgs, date+" "+mailMessage.Msg) } // TODO: dynamic type - currently only mailer is used for logs loghandler.AddLog(structs.LogBody{ Type: "mailer", Logs: msgs, }) msg.Ack(false) } } } func mailMessagesConsumer() (msgs <-chan amqp.Delivery, err error) { return LogsClient.ConsumeChannelMessages(gocnjhelper.ChannelConsumeSettings{ QueueName: queueLogs, Consumer: "", AutoAck: false, Exclusive: false, NoLocal: false, NoWait: false, Arguments: nil, }) }