WebSocketServer/modules/rabbitmq/helper.go

221 lines
6.0 KiB
Go

package rabbitmq
import (
"context"
"encoding/json"
"clickandjoin.app/websocketserver/modules/structs"
"clickandjoin.app/websocketserver/modules/utils"
"clickandjoin.app/websocketserver/socketclients"
gorabbitmqclient "git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client"
"github.com/google/uuid"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/sirupsen/logrus"
)
func PublishClientMessage(rabbitMqMessage structs.RabbitMqMessage) error {
msg, err := json.Marshal(rabbitMqMessage)
if err != nil {
logrus.Errorln("Failed to marshal rabbitMqMessage, err:", err)
return err
}
err = WebsocketClient.PushExchangeMessage(rabbitMqMessage.Rec, msg)
if err != nil {
logrus.Errorln("Failed to publish client msg, err:", err)
return err
}
return nil
}
// For each user a queue is created in the websocket exchange channel where he receives his messages
func CreateWSClientBinding(socketClient *structs.SocketClient, id string) (err error) {
msgs, err := createWSClientQueueAndBindingConsumer(socketClient, id)
if err != nil {
logrus.Errorln("Failed to create ws client queue and binding consumer, err:", err)
return err
}
// canceled when websocket client disconnects
ctx, cancel := context.WithCancel(context.Background())
socketClient.CancelFunc = cancel
go func() {
for {
select {
case <-ctx.Done():
logrus.Println("DONE!")
return
case <-WebSocketChannelClosedChannel:
// This case handles the event of closed channel e.g. abnormal shutdown
msgs, err = createWSClientQueueAndBindingConsumer(socketClient, id)
if err != nil {
// If the AMQP channel is not ready, it will continue the loop. Next
// iteration will enter this case because chClosedCh is closed by the
// library
continue
}
logrus.Debugln("Re-set ws client channel", socketClient.RabbitMqConsumerId)
// Re-set channel to receive notifications
// The library closes this channel after abnormal shutdown
WebSocketChannelClosedChannel = make(chan *amqp.Error, 1)
WebsocketClient.Channel.NotifyClose(WebSocketChannelClosedChannel)
case msg := <-msgs:
var receivedMessage structs.ReceivedMessage
err = utils.UnmarshalReceivedMessage(msg.Body, &receivedMessage)
if err != nil {
logrus.Errorln("Failed to unmarshal received msg, err:", err)
}
logrus.Debugln("RABBITMQ RECEIVED MESSAGE:", receivedMessage)
err = socketClient.SendMessage(structs.SendSocketMessage{Cmd: receivedMessage.Cmd, Body: receivedMessage.Body})
if err != nil {
logrus.Errorln("Failed to send message to client, err:", err)
}
}
}
}()
return nil
}
func createWSClientQueueAndBindingConsumer(socketClient *structs.SocketClient, id string) (msgs <-chan amqp.Delivery, err error) {
q, err := WebsocketClient.QueueDeclare(gorabbitmqclient.QueueDeclareSettings{
Name: "",
Durable: false,
DeleteWhenUnused: false,
Exclusive: true,
NoWait: false,
Arguments: nil,
})
if err != nil {
return msgs, err
}
// set an ID for the consumer to delete the consumer after the socket user has disconnected
socketClient.RabbitMqConsumerId = uuid.New().String()
// store queue name in the client to delete it when the client disconnects from the websocket
socketClient.RabbitMqQueueName = q.Name
err = WebsocketClient.QueueBind(gorabbitmqclient.QueueBindSettings{
QueueName: q.Name,
RoutingKey: id,
Exchange: exchangeWebsocketMessages,
NoWait: false,
Arguments: nil,
})
if err != nil {
logrus.Errorln("Failed to bind queue, err:", err)
return msgs, err
}
return WebsocketClient.ConsumeChannelMessages(gorabbitmqclient.ChannelConsumeSettings{
QueueName: q.Name,
Consumer: socketClient.RabbitMqConsumerId,
AutoAck: true,
Exclusive: false,
NoLocal: false,
NoWait: false,
Arguments: nil,
})
}
func DeleteWSClient(consumerId string, qName string) error {
// delete consumer
err := WebsocketClient.Channel.Cancel(consumerId, false)
if err != nil {
logrus.Errorln("Failed to delete consumer, err:", err)
return err
}
// delete queue and queue binding
_, err = WebsocketClient.Channel.QueueDelete(qName, false, true, false)
if err != nil {
logrus.Errorln("Failed to delete queue, err:", err)
return err
}
logrus.Println("deleted", consumerId, qName)
return nil
}
func ApiBroadcastMessagesHandling() {
msgs, err := apiBroadcastMessagesConsumer()
if err != nil {
logrus.Errorln("Failed to create api broadcast message consumer, err:", err)
return
}
channelClosedChannel := make(chan *amqp.Error, 1)
ApiBroadcastClient.Channel.NotifyClose(channelClosedChannel)
go func() {
for {
select {
case <-channelClosedChannel:
// This case handles the event of closed channel e.g. abnormal shutdown
msgs, err = apiBroadcastMessagesConsumer()
if err != nil {
// If the AMQP channel is not ready, it will continue the loop. Next
// iteration will enter this case because chClosedCh is closed by the
// library
continue
}
logrus.Debugln("Re-set api broadcast channel")
// Re-set channel to receive notifications
// The library closes this channel after abnormal shutdown
channelClosedChannel = make(chan *amqp.Error, 1)
ApiBroadcastClient.Channel.NotifyClose(channelClosedChannel)
case msg := <-msgs:
var receivedMessage structs.ReceivedMessage
err = utils.UnmarshalReceivedMessage(msg.Body, &receivedMessage)
if err != nil {
logrus.Errorln("Failed to unmarshal received msg, err:", err)
}
logrus.Debugln("RABBITMQ RECEIVED BROADCAST MESSAGE:", receivedMessage)
socketclients.BroadcastMessage(structs.SendSocketMessage{Cmd: receivedMessage.Cmd, Body: receivedMessage.Body})
}
}
}()
}
func apiBroadcastMessagesConsumer() (msgs <-chan amqp.Delivery, err error) {
return ApiBroadcastClient.ConsumeChannelMessages(gorabbitmqclient.ChannelConsumeSettings{
QueueName: ApiBroadcastClient.AssignedQueueName,
Consumer: "",
AutoAck: true,
Exclusive: false,
NoLocal: false,
NoWait: false,
Arguments: nil,
})
}