From a0253ec239d7b6d41b813a0e519b2068fd190a48 Mon Sep 17 00:00:00 2001 From: alex Date: Sun, 8 Jan 2023 21:21:01 +0100 Subject: [PATCH] implemented gorabbitmqclient --- go.mod | 1 + go.sum | 4 ++ modules/rabbitmq/helper.go | 88 ++++++++++++++++++++++++++++++++++++ modules/rabbitmq/rabbitmq.go | 39 ++++++++++------ 4 files changed, 119 insertions(+), 13 deletions(-) create mode 100644 modules/rabbitmq/helper.go diff --git a/go.mod b/go.mod index 9c2c128..20e1345 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module clickandjoin.app/emailserver go 1.19 require ( + git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.24 // indirect github.com/rabbitmq/amqp091-go v1.5.0 // indirect github.com/sirupsen/logrus v1.9.0 // indirect github.com/tdewolff/minify/v2 v2.12.4 // indirect diff --git a/go.sum b/go.sum index 79f27c7..f3786ad 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,7 @@ +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.23 h1:FYMFWE9gXAXtvLzyVmv9yp/AmQos1OfTuVZy6DnQrEw= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.23/go.mod h1:KPbHNtFhttE/TtXZcorq6LKpCigkTaF0qzmB2p7nFsg= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.24 h1:rvBwQLc5keM6kkAGL9qNQiOlvT5GDfgnYDn3viHVDR8= +git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.24/go.mod h1:KPbHNtFhttE/TtXZcorq6LKpCigkTaF0qzmB2p7nFsg= github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927/go.mod h1:h/aW8ynjgkuj+NQRlZcDbAbM1ORAbXjXX77sX7T289U= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/modules/rabbitmq/helper.go b/modules/rabbitmq/helper.go new file mode 100644 index 0000000..018cba1 --- /dev/null +++ b/modules/rabbitmq/helper.go @@ -0,0 +1,88 @@ +package rabbitmq + +import ( + "encoding/json" + "strings" + + "clickandjoin.app/emailserver/mailer" + "clickandjoin.app/emailserver/modules/structs" + gorabbitmqclient "git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client" + amqp "github.com/rabbitmq/amqp091-go" + "github.com/sirupsen/logrus" +) + +func MailMessagesHandling() { + msgs, err := mailMessagesConsumer() + + if err != nil { + logrus.Errorln("Failed to create ws client queue and binding consumer, err:", err) + return + } + + channelClosedChannel := make(chan *amqp.Error, 1) + MailsClient.Channel.NotifyClose(channelClosedChannel) + + for { + select { + case <-channelClosedChannel: + // This case handles the event of closed channel e.g. abnormal shutdown + msgs, err = mailMessagesConsumer() + + if err != nil { + // If the AMQP channel is not ready, it will continue the loop. Next + // iteration will enter this case because chClosedCh is closed by the + // library + continue + } + + logrus.Debugln("Re-set api broadcast channel") + + // Re-set channel to receive notifications + // The library closes this channel after abnormal shutdown + channelClosedChannel = make(chan *amqp.Error, 1) + MailsClient.Channel.NotifyClose(channelClosedChannel) + + case msg := <-msgs: + logrus.Debugln("RECV MAIL") + + mailMessage := structs.RabbitMqMailMessage{} + + if err := json.Unmarshal(msg.Body, &mailMessage); err != nil { + logrus.Errorln("Failed to unmarshal json message, err:", err) + continue + } + + logrus.Println(mailMessage.UserMail) + + // only for testing + if !strings.HasSuffix(mailMessage.UserMail, "@roese.dev") && !strings.HasSuffix(mailMessage.UserMail, "@umbach.dev") { + mailMessage.UserMail = "info@clickandjoin.de" + } + + err = mailer.NewMail(structs.Mail{ + To: []string{mailMessage.UserMail}, + TemplateId: mailMessage.TemplateId, + LanguageId: mailMessage.LanguageId, + BodyData: mailMessage.BodyData}) + + if err != nil { + logrus.Errorln("Failed to send new mail, err:", err) + continue + } + + msg.Ack(false) + } + } +} + +func mailMessagesConsumer() (msgs <-chan amqp.Delivery, err error) { + return MailsClient.ConsumeChannelMessages(gorabbitmqclient.ChannelConsumeSettings{ + QueueName: queueMails, + Consumer: "", + AutoAck: false, + Exclusive: false, + NoLocal: false, + NoWait: false, + Arguments: nil, + }) +} diff --git a/modules/rabbitmq/rabbitmq.go b/modules/rabbitmq/rabbitmq.go index 5660534..5e5d124 100644 --- a/modules/rabbitmq/rabbitmq.go +++ b/modules/rabbitmq/rabbitmq.go @@ -1,28 +1,40 @@ package rabbitmq import ( - "encoding/json" - "fmt" - "strings" + "time" - "clickandjoin.app/emailserver/mailer" "clickandjoin.app/emailserver/modules/config" - "clickandjoin.app/emailserver/modules/structs" - amqp "github.com/rabbitmq/amqp091-go" - "github.com/sirupsen/logrus" + gorabbitmqclient "git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client" ) -var Conn *amqp.Connection -var Channel *amqp.Channel - const queueMails = "cnj.mails" -func getConnectionString() string { - cfg := &config.Cfg.RabbitMq +var MailsClient *gorabbitmqclient.Client - return fmt.Sprintf("amqp://%s:%s@%s/", cfg.Username, cfg.Password, cfg.Host) +func Init() { + cfg := config.Cfg + + MailsClient = gorabbitmqclient.NewClient( + cfg.Debug, + gorabbitmqclient.ExchangeSettings{}, + gorabbitmqclient.QueueSettings{}, + gorabbitmqclient.ChannelQosSettingsDefault, + gorabbitmqclient.Config{ + ReconnectDelay: 1 * time.Second, + ReInitDelay: 1 * time.Second, + ResendDelay: 5 * time.Second, + }, + gorabbitmqclient.GetConnectionString( + cfg.RabbitMq.Username, + cfg.RabbitMq.Password, + cfg.RabbitMq.Host)) + + <-time.After(time.Second) + + MailMessagesHandling() } +/* func Init() { conn, err := amqp.Dial(getConnectionString()) @@ -99,3 +111,4 @@ func Init() { <-forever } +*/