99 lines
1.8 KiB
Go
99 lines
1.8 KiB
Go
package rabbitmq
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
|
|
"git.umbach.dev/app-idea/mailer/modules/config"
|
|
"git.umbach.dev/app-idea/mailer/modules/mailer"
|
|
log "github.com/sirupsen/logrus"
|
|
"github.com/streadway/amqp"
|
|
)
|
|
|
|
var Conn *amqp.Connection
|
|
var MailQueue amqp.Queue
|
|
var Channel *amqp.Channel
|
|
|
|
type MailInput struct {
|
|
Mail string `json:"m"`
|
|
TemplateId int `json:"t"`
|
|
LanguageId int `json:"l"`
|
|
BodyData map[string]interface{} `json:"d"`
|
|
}
|
|
|
|
func getConnectionString() string {
|
|
cfg := &config.Cfg.RabbitMq
|
|
|
|
return fmt.Sprintf("amqp://%s:%s@%s/", cfg.Username, cfg.Password, cfg.Host)
|
|
}
|
|
|
|
func Init() {
|
|
conn, err := amqp.Dial(getConnectionString())
|
|
|
|
if err != nil {
|
|
log.Fatalln("Failed to connect to RabbitMQ", err)
|
|
}
|
|
|
|
Conn = conn
|
|
|
|
ch, err := conn.Channel()
|
|
|
|
if err != nil {
|
|
log.Fatalln("Failed to open a channel", err)
|
|
}
|
|
|
|
//defer ch.Close()
|
|
|
|
Channel = ch
|
|
|
|
err = ch.Qos(
|
|
1, // prefetch count
|
|
0, // prefetch size
|
|
false, // global
|
|
)
|
|
|
|
if err != nil {
|
|
log.Fatalln("Failed to set QoS", err)
|
|
}
|
|
|
|
msgs, err := ch.Consume(
|
|
"mails", // queue
|
|
"", // consumer
|
|
false, // auto-ack
|
|
false, // exclusive
|
|
false, // no-local
|
|
false, // no-wait
|
|
nil, // args
|
|
)
|
|
|
|
if err != nil {
|
|
log.Fatalln("Consume err", err)
|
|
}
|
|
|
|
forever := make(chan []byte)
|
|
|
|
go func() {
|
|
for d := range msgs {
|
|
log.Debugf("Recieved Message: %s\n", d.Body)
|
|
|
|
log.Info("aa")
|
|
|
|
mail := MailInput{}
|
|
|
|
if err := json.Unmarshal(d.Body, &mail); err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
log.Infoln("input", mail)
|
|
|
|
if err = mailer.NewMail([]string{mail.Mail}, mail.TemplateId, mail.LanguageId, mail.BodyData); err == nil {
|
|
d.Ack(false)
|
|
}
|
|
}
|
|
}()
|
|
|
|
log.Info("Successfully Connected to our RabbitMQ Instance")
|
|
log.Debug(" [*] - Waiting for messages")
|
|
<-forever
|
|
}
|