implemented gorabbitmqclient

alpha
alex 2023-01-08 21:21:01 +01:00
parent 488a179dda
commit a0253ec239
4 changed files with 119 additions and 13 deletions

1
go.mod
View File

@ -3,6 +3,7 @@ module clickandjoin.app/emailserver
go 1.19
require (
git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.24 // indirect
github.com/rabbitmq/amqp091-go v1.5.0 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/tdewolff/minify/v2 v2.12.4 // indirect

4
go.sum
View File

@ -1,3 +1,7 @@
git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.23 h1:FYMFWE9gXAXtvLzyVmv9yp/AmQos1OfTuVZy6DnQrEw=
git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.23/go.mod h1:KPbHNtFhttE/TtXZcorq6LKpCigkTaF0qzmB2p7nFsg=
git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.24 h1:rvBwQLc5keM6kkAGL9qNQiOlvT5GDfgnYDn3viHVDR8=
git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client v1.0.24/go.mod h1:KPbHNtFhttE/TtXZcorq6LKpCigkTaF0qzmB2p7nFsg=
github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927/go.mod h1:h/aW8ynjgkuj+NQRlZcDbAbM1ORAbXjXX77sX7T289U=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

View File

@ -0,0 +1,88 @@
package rabbitmq
import (
"encoding/json"
"strings"
"clickandjoin.app/emailserver/mailer"
"clickandjoin.app/emailserver/modules/structs"
gorabbitmqclient "git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/sirupsen/logrus"
)
func MailMessagesHandling() {
msgs, err := mailMessagesConsumer()
if err != nil {
logrus.Errorln("Failed to create ws client queue and binding consumer, err:", err)
return
}
channelClosedChannel := make(chan *amqp.Error, 1)
MailsClient.Channel.NotifyClose(channelClosedChannel)
for {
select {
case <-channelClosedChannel:
// This case handles the event of closed channel e.g. abnormal shutdown
msgs, err = mailMessagesConsumer()
if err != nil {
// If the AMQP channel is not ready, it will continue the loop. Next
// iteration will enter this case because chClosedCh is closed by the
// library
continue
}
logrus.Debugln("Re-set api broadcast channel")
// Re-set channel to receive notifications
// The library closes this channel after abnormal shutdown
channelClosedChannel = make(chan *amqp.Error, 1)
MailsClient.Channel.NotifyClose(channelClosedChannel)
case msg := <-msgs:
logrus.Debugln("RECV MAIL")
mailMessage := structs.RabbitMqMailMessage{}
if err := json.Unmarshal(msg.Body, &mailMessage); err != nil {
logrus.Errorln("Failed to unmarshal json message, err:", err)
continue
}
logrus.Println(mailMessage.UserMail)
// only for testing
if !strings.HasSuffix(mailMessage.UserMail, "@roese.dev") && !strings.HasSuffix(mailMessage.UserMail, "@umbach.dev") {
mailMessage.UserMail = "info@clickandjoin.de"
}
err = mailer.NewMail(structs.Mail{
To: []string{mailMessage.UserMail},
TemplateId: mailMessage.TemplateId,
LanguageId: mailMessage.LanguageId,
BodyData: mailMessage.BodyData})
if err != nil {
logrus.Errorln("Failed to send new mail, err:", err)
continue
}
msg.Ack(false)
}
}
}
func mailMessagesConsumer() (msgs <-chan amqp.Delivery, err error) {
return MailsClient.ConsumeChannelMessages(gorabbitmqclient.ChannelConsumeSettings{
QueueName: queueMails,
Consumer: "",
AutoAck: false,
Exclusive: false,
NoLocal: false,
NoWait: false,
Arguments: nil,
})
}

View File

@ -1,28 +1,40 @@
package rabbitmq
import (
"encoding/json"
"fmt"
"strings"
"time"
"clickandjoin.app/emailserver/mailer"
"clickandjoin.app/emailserver/modules/config"
"clickandjoin.app/emailserver/modules/structs"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/sirupsen/logrus"
gorabbitmqclient "git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client"
)
var Conn *amqp.Connection
var Channel *amqp.Channel
const queueMails = "cnj.mails"
func getConnectionString() string {
cfg := &config.Cfg.RabbitMq
var MailsClient *gorabbitmqclient.Client
return fmt.Sprintf("amqp://%s:%s@%s/", cfg.Username, cfg.Password, cfg.Host)
func Init() {
cfg := config.Cfg
MailsClient = gorabbitmqclient.NewClient(
cfg.Debug,
gorabbitmqclient.ExchangeSettings{},
gorabbitmqclient.QueueSettings{},
gorabbitmqclient.ChannelQosSettingsDefault,
gorabbitmqclient.Config{
ReconnectDelay: 1 * time.Second,
ReInitDelay: 1 * time.Second,
ResendDelay: 5 * time.Second,
},
gorabbitmqclient.GetConnectionString(
cfg.RabbitMq.Username,
cfg.RabbitMq.Password,
cfg.RabbitMq.Host))
<-time.After(time.Second)
MailMessagesHandling()
}
/*
func Init() {
conn, err := amqp.Dial(getConnectionString())
@ -99,3 +111,4 @@ func Init() {
<-forever
}
*/