package rabbitmq import ( "encoding/json" "clickandjoin.app/emailserver/mailer" "clickandjoin.app/emailserver/modules/structs" gocnjhelper "git.ex.umbach.dev/ClickandJoin/go-cnj-helper" amqp "github.com/rabbitmq/amqp091-go" ) func MailMessagesHandling() { msgs, err := mailMessagesConsumer() if err != nil { gocnjhelper.LogErrorf("Failed to create mail queue and binding consumer, err: %s", err) return } channelClosedChannel := make(chan *amqp.Error, 1) MailsClient.Channel.NotifyClose(channelClosedChannel) for { select { case <-channelClosedChannel: // This case handles the event of closed channel e.g. abnormal shutdown msgs, err = mailMessagesConsumer() if err != nil { // If the AMQP channel is not ready, it will continue the loop. Next // iteration will enter this case because chClosedCh is closed by the // library continue } gocnjhelper.LogDebug("Re-set api broadcast channel") // Re-set channel to receive notifications // The library closes this channel after abnormal shutdown channelClosedChannel = make(chan *amqp.Error, 1) MailsClient.Channel.NotifyClose(channelClosedChannel) case msg := <-msgs: gocnjhelper.LogDebug("RECV MAIL") mailMessage := structs.RabbitMqMailMessage{} gocnjhelper.LogDebug(string(msg.Body)) if err := json.Unmarshal(msg.Body, &mailMessage); err != nil { gocnjhelper.LogErrorf("Failed to unmarshal json message, err: %s", err) continue } err = mailer.NewMail(structs.Mail{ To: []string{mailMessage.UserMail}, TemplateId: mailMessage.TemplateId, LanguageId: mailMessage.LanguageId, BodyData: mailMessage.BodyData}) if err != nil { gocnjhelper.LogErrorf("Failed to send new mail, err: %s", err) continue } gocnjhelper.LogDebugf("Mail sent to: %s bodyData: %s templateId: %s languageId: %s", mailMessage.UserMail, mailMessage.BodyData, mailMessage.TemplateId, mailMessage.LanguageId) msg.Ack(false) } } } func mailMessagesConsumer() (msgs <-chan amqp.Delivery, err error) { return MailsClient.ConsumeChannelMessages(gocnjhelper.ChannelConsumeSettings{ QueueName: queueMails, Consumer: "", AutoAck: false, Exclusive: false, NoLocal: false, NoWait: false, Arguments: nil, }) }