ServerSentEventsServer/modules/rabbitmq/rabbitmq.go

102 lines
2.1 KiB
Go

package rabbitmq
import (
"fmt"
"clickandjoin.app/serversenteventsserver/modules/cache"
"clickandjoin.app/serversenteventsserver/modules/config"
"clickandjoin.app/serversenteventsserver/modules/structs"
"clickandjoin.app/serversenteventsserver/modules/utils"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/sirupsen/logrus"
)
var Conn *amqp.Connection
var Channel *amqp.Channel
const exchangeBroadcastMessages = "cnj.api.broadcast.messages"
func getConnectionString() string {
cfg := &config.Cfg.RabbitMq
return fmt.Sprintf("amqp://%s:%s@%s/", cfg.Username, cfg.Password, cfg.Host)
}
func Init() {
conn, err := amqp.Dial(getConnectionString())
if err != nil {
logrus.Fatalln("RabbitMQ connection failed, err:", err)
}
ch, err := conn.Channel()
if err != nil {
logrus.Fatalln(err)
}
Channel = ch
/*
* api broadcast messages
*/
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
logrus.Fatalln("Failed to declare queue, err:", err)
}
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
exchangeBroadcastMessages, // exchange
false,
nil,
)
if err != nil {
logrus.Fatalln("Failed to declare queue, err:", err)
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
logrus.Fatalln("Failed to register consumer, err:", err)
}
go func() {
for d := range msgs {
var receivedMessage structs.ReceivedMessage
err = utils.UnmarshalReceivedMessage(d.Body, &receivedMessage)
if err != nil {
logrus.Errorln("Failed to unmarshal received msg, err:", err)
}
logrus.Debugln("RABBITMQ RECEIVED BROADCAST MESSAGE:", receivedMessage)
logrus.Debugln("SENDING MSG TO", len(cache.SSEClients), "CLIENTS")
for id, sseClient := range cache.SSEClients {
sseClient.MessageChan <- structs.SSEClientChanMessage{ClientId: id, Message: string(d.Body)}
}
}
}()
}