WebSocketServer/modules/rabbitmq/rabbitmq.go

301 lines
6.6 KiB
Go

package rabbitmq
import (
"context"
"encoding/json"
"fmt"
"time"
"clickandjoin.app/websocketserver/modules/config"
"clickandjoin.app/websocketserver/modules/structs"
"clickandjoin.app/websocketserver/modules/utils"
"clickandjoin.app/websocketserver/socketclients"
"github.com/google/uuid"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/sirupsen/logrus"
)
var Conn *amqp.Connection
var Channel *amqp.Channel
const (
exchangeWebsocketMessages = "cnj.websocketserver.messages"
exchangeBroadcastMessages = "cnj.api.broadcast.messages"
)
func getConnectionString() string {
cfg := &config.Cfg.RabbitMq
return fmt.Sprintf("amqp://%s:%s@%s/", cfg.Username, cfg.Password, cfg.Host)
}
func Init() {
conn, err := amqp.Dial(getConnectionString())
if err != nil {
logrus.Fatalln("RabbitMQ connection failed, err:", err)
}
ch, err := conn.Channel()
if err != nil {
logrus.Fatalln(err)
}
Channel = ch
/*
* websocketserver messages
*/
// creates a new exchange if one does not already exist
err = ch.ExchangeDeclare(
exchangeWebsocketMessages, // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
logrus.Fatalln("Failed to declare exchange, err:", err)
}
/*
* api broadcast messages
*/
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
logrus.Fatalln("Failed to declare queue, err:", err)
}
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
exchangeBroadcastMessages, // exchange
false,
nil,
)
if err != nil {
logrus.Fatalln("Failed to declare queue, err:", err)
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
logrus.Fatalln("Failed to register consumer, err:", err)
}
go func() {
for d := range msgs {
var receivedMessage structs.ReceivedMessage
err = utils.UnmarshalReceivedMessage(d.Body, &receivedMessage)
if err != nil {
logrus.Errorln("Failed to unmarshal received msg, err:", err)
}
logrus.Debugln("RABBITMQ RECEIVED BROADCAST MESSAGE:", receivedMessage)
socketclients.BroadcastMessage(structs.SendSocketMessage{Cmd: receivedMessage.Cmd, Body: receivedMessage.Body})
}
}()
}
func CreateClientBinding(socketClient *structs.SocketClient, id string) (queueName string, err error) {
q, err := Channel.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
logrus.Errorln("Failed to declare queue, err:", err)
return "", err
}
err = Channel.QueueBind(
q.Name, // name
id, // key
exchangeWebsocketMessages, // exchange
false, // no-wait
nil, // arguments
)
if err != nil {
logrus.Errorln("Failed to bind queue, err:", err)
return "", err
}
// set an ID for the consumer to delete the consumer after the socket user has disconnected
socketClient.RabbitMqConsumerId = uuid.New().String()
msgs, err := Channel.Consume(
q.Name, // queue
socketClient.RabbitMqConsumerId, // consumer
true, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
if err != nil {
logrus.Errorln("Failed to register consumer, err:", err)
return "", err
}
go func() {
for d := range msgs {
var receivedMessage structs.ReceivedMessage
err = utils.UnmarshalReceivedMessage(d.Body, &receivedMessage)
if err != nil {
logrus.Errorln("Failed to unmarshal received msg, err:", err)
}
logrus.Debugln("RABBITMQ RECEIVED MESSAGE:", receivedMessage)
err = socketClient.SendMessage(structs.SendSocketMessage{Cmd: receivedMessage.Cmd, Body: receivedMessage.Body})
if err != nil {
logrus.Errorln("Failed to send message to client, err:", err)
}
}
}()
return q.Name, nil
}
func DeleteClient(consumerId string, qName string) error {
// delete consumer
err := Channel.Cancel(consumerId, false)
if err != nil {
logrus.Errorln("Failed to delete consumer, err:", err)
return err
}
// delete queue and queue binding
_, err = Channel.QueueDelete(qName, false, true, false)
if err != nil {
logrus.Errorln("Failed to delete queue, err:", err)
return err
}
return nil
}
func PublishClientMessage(rabbitMqMessage structs.RabbitMqMessage) error {
msg, err := json.Marshal(rabbitMqMessage)
if err != nil {
logrus.Errorln("Failed to marshal rabbitMqMessage, err:", err)
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = Channel.PublishWithContext(ctx,
exchangeWebsocketMessages, // exchange
rabbitMqMessage.Rec, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: msg,
})
if err != nil {
logrus.Errorln("Failed to publish client msg, err:", err)
return err
}
return nil
}
/*
func PublishBroadcastMessage(rabbitMqMessage structs.RabbitMqMessage) error {
msg, err := json.Marshal(rabbitMqMessage)
if err != nil {
logrus.Errorln("Failed to marshal rabbitMqMessage, err:", err)
return err
}
err = publishMessage(msg, MessagesQueue.Name)
return err
}
func publishMessage(body []byte, channelName string) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := Channel.PublishWithContext(ctx,
"messages", // exchange
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: body,
})
if err != nil {
logrus.Errorln("Failed to publish a message, err:", err)
return err
}
logrus.Printf("[x] Sent %s\n", body)
return nil
} */
/*
func declareQueue(channel *amqp.Channel, name string, queue *amqp.Queue) {
q, err := channel.QueueDeclare(
name, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalln("Failed to declare a queue", err)
}
*queue = q
}
*/