87 lines
2.3 KiB
Go
87 lines
2.3 KiB
Go
package rabbitmq
|
|
|
|
import (
|
|
"encoding/json"
|
|
|
|
"clickandjoin.app/emailserver/mailer"
|
|
"clickandjoin.app/emailserver/modules/structs"
|
|
gocnjhelper "git.ex.umbach.dev/ClickandJoin/go-cnj-helper"
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
)
|
|
|
|
func MailMessagesHandling() {
|
|
msgs, err := mailMessagesConsumer()
|
|
|
|
if err != nil {
|
|
gocnjhelper.LogErrorf("Failed to create mail queue and binding consumer, err: %s", err)
|
|
return
|
|
}
|
|
|
|
channelClosedChannel := make(chan *amqp.Error, 1)
|
|
MailsClient.Channel.NotifyClose(channelClosedChannel)
|
|
|
|
for {
|
|
select {
|
|
case <-channelClosedChannel:
|
|
// This case handles the event of closed channel e.g. abnormal shutdown
|
|
msgs, err = mailMessagesConsumer()
|
|
|
|
if err != nil {
|
|
// If the AMQP channel is not ready, it will continue the loop. Next
|
|
// iteration will enter this case because chClosedCh is closed by the
|
|
// library
|
|
continue
|
|
}
|
|
|
|
gocnjhelper.LogDebug("Re-set api broadcast channel")
|
|
|
|
// Re-set channel to receive notifications
|
|
// The library closes this channel after abnormal shutdown
|
|
channelClosedChannel = make(chan *amqp.Error, 1)
|
|
MailsClient.Channel.NotifyClose(channelClosedChannel)
|
|
|
|
case msg := <-msgs:
|
|
gocnjhelper.LogDebug("RECV MAIL")
|
|
|
|
mailMessage := structs.RabbitMqMailMessage{}
|
|
|
|
gocnjhelper.LogDebug(string(msg.Body))
|
|
|
|
if err := json.Unmarshal(msg.Body, &mailMessage); err != nil {
|
|
gocnjhelper.LogErrorf("Failed to unmarshal json message, err: %s", err)
|
|
continue
|
|
}
|
|
|
|
err = mailer.NewMail(structs.Mail{
|
|
To: []string{mailMessage.UserMail},
|
|
TemplateId: mailMessage.TemplateId,
|
|
LanguageId: mailMessage.LanguageId,
|
|
BodyData: mailMessage.BodyData})
|
|
|
|
if err != nil {
|
|
gocnjhelper.LogErrorf("Failed to send new mail, err: %s", err)
|
|
continue
|
|
}
|
|
|
|
gocnjhelper.LogDebugf("Mail sent to: %s bodyData: %s templateId: %s languageId: %s", mailMessage.UserMail,
|
|
mailMessage.BodyData,
|
|
mailMessage.TemplateId,
|
|
mailMessage.LanguageId)
|
|
|
|
msg.Ack(false)
|
|
}
|
|
}
|
|
}
|
|
|
|
func mailMessagesConsumer() (msgs <-chan amqp.Delivery, err error) {
|
|
return MailsClient.ConsumeChannelMessages(gocnjhelper.ChannelConsumeSettings{
|
|
QueueName: queueMails,
|
|
Consumer: "",
|
|
AutoAck: false,
|
|
Exclusive: false,
|
|
NoLocal: false,
|
|
NoWait: false,
|
|
Arguments: nil,
|
|
})
|
|
}
|