MailerServer/modules/rabbitmq/rabbitmq.go

102 lines
2.0 KiB
Go

package rabbitmq
import (
"encoding/json"
"fmt"
"strings"
"clickandjoin.app/emailserver/mailer"
"clickandjoin.app/emailserver/modules/config"
"clickandjoin.app/emailserver/modules/structs"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/sirupsen/logrus"
)
var Conn *amqp.Connection
var Channel *amqp.Channel
const queueMails = "cnj.mails"
func getConnectionString() string {
cfg := &config.Cfg.RabbitMq
return fmt.Sprintf("amqp://%s:%s@%s/", cfg.Username, cfg.Password, cfg.Host)
}
func Init() {
conn, err := amqp.Dial(getConnectionString())
if err != nil {
logrus.Fatalln("RabbitMQ connection failed, err:", err)
}
ch, err := conn.Channel()
if err != nil {
logrus.Fatalln(err)
}
Channel = ch
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
if err != nil {
logrus.Fatalln("Failed to set Qos, err:", err)
}
msgs, err := ch.Consume(
queueMails, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
logrus.Fatalln("Failed to consume mails, err:", err)
}
var forever chan struct{}
go func() {
for d := range msgs {
logrus.Debugln("RECV MAIL")
mailMessage := structs.RabbitMqMailMessage{}
if err := json.Unmarshal(d.Body, &mailMessage); err != nil {
logrus.Errorln("Failed to unmarshal json message, err:", err)
continue
}
logrus.Println(mailMessage.UserMail)
// only for testing
if !strings.HasSuffix(mailMessage.UserMail, "@roese.dev") && !strings.HasSuffix(mailMessage.UserMail, "@umbach.dev") {
mailMessage.UserMail = "info@clickandjoin.de"
}
err = mailer.NewMail(structs.Mail{
To: []string{mailMessage.UserMail},
TemplateId: mailMessage.TemplateId,
LanguageId: mailMessage.LanguageId,
BodyData: mailMessage.BodyData})
if err != nil {
logrus.Errorln("Failed to send new mail, err:", err)
continue
}
d.Ack(false)
}
}()
<-forever
}