package rabbitmq import ( "context" "encoding/json" "clickandjoin.app/websocketserver/modules/structs" "clickandjoin.app/websocketserver/modules/utils" "clickandjoin.app/websocketserver/socketclients" gorabbitmqclient "git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client" "github.com/google/uuid" amqp "github.com/rabbitmq/amqp091-go" "github.com/sirupsen/logrus" ) func PublishClientMessage(rabbitMqMessage structs.RabbitMqMessage) error { msg, err := json.Marshal(rabbitMqMessage) if err != nil { logrus.Errorln("Failed to marshal rabbitMqMessage, err:", err) return err } err = WebsocketClient.PushExchangeMessage(rabbitMqMessage.Rec, msg) if err != nil { logrus.Errorln("Failed to publish client msg, err:", err) return err } return nil } // For each user a queue is created in the websocket exchange channel where he receives his messages func CreateWSClientBinding(socketClient *structs.SocketClient, id string) (err error) { msgs, err := createWSClientQueueAndBindingConsumer(socketClient, id) if err != nil { logrus.Errorln("Failed to create ws client queue and binding consumer, err:", err) return err } // canceled when websocket client disconnects ctx, cancel := context.WithCancel(context.Background()) socketClient.CancelFunc = cancel go func() { for { select { case <-ctx.Done(): logrus.Println("DONE!") return case <-WebSocketChannelClosedChannel: // This case handles the event of closed channel e.g. abnormal shutdown msgs, err = createWSClientQueueAndBindingConsumer(socketClient, id) if err != nil { // If the AMQP channel is not ready, it will continue the loop. Next // iteration will enter this case because chClosedCh is closed by the // library continue } logrus.Debugln("Re-set ws client channel", socketClient.RabbitMqConsumerId) // Re-set channel to receive notifications // The library closes this channel after abnormal shutdown WebSocketChannelClosedChannel = make(chan *amqp.Error, 1) WebsocketClient.Channel.NotifyClose(WebSocketChannelClosedChannel) case msg := <-msgs: var receivedMessage structs.ReceivedMessage err = utils.UnmarshalReceivedMessage(msg.Body, &receivedMessage) if err != nil { logrus.Errorln("Failed to unmarshal received msg, err:", err) } logrus.Debugln("RABBITMQ RECEIVED MESSAGE:", receivedMessage) err = socketClient.SendMessage(structs.SendSocketMessage{Cmd: receivedMessage.Cmd, Body: receivedMessage.Body}) if err != nil { logrus.Errorln("Failed to send message to client, err:", err) } } } }() return nil } func createWSClientQueueAndBindingConsumer(socketClient *structs.SocketClient, id string) (msgs <-chan amqp.Delivery, err error) { q, err := WebsocketClient.QueueDeclare(gorabbitmqclient.QueueDeclareSettings{ Name: "", Durable: false, DeleteWhenUnused: false, Exclusive: true, NoWait: false, Arguments: nil, }) if err != nil { return msgs, err } // set an ID for the consumer to delete the consumer after the socket user has disconnected socketClient.RabbitMqConsumerId = uuid.New().String() // store queue name in the client to delete it when the client disconnects from the websocket socketClient.RabbitMqQueueName = q.Name err = WebsocketClient.QueueBind(gorabbitmqclient.QueueBindSettings{ QueueName: q.Name, RoutingKey: id, Exchange: exchangeWebsocketMessages, NoWait: false, Arguments: nil, }) if err != nil { logrus.Errorln("Failed to bind queue, err:", err) return msgs, err } return WebsocketClient.ConsumeChannelMessages(gorabbitmqclient.ChannelConsumeSettings{ QueueName: q.Name, Consumer: socketClient.RabbitMqConsumerId, AutoAck: true, Exclusive: false, NoLocal: false, NoWait: false, Arguments: nil, }) } func DeleteWSClient(consumerId string, qName string) error { // delete consumer err := WebsocketClient.Channel.Cancel(consumerId, false) if err != nil { logrus.Errorln("Failed to delete consumer, err:", err) return err } // delete queue and queue binding _, err = WebsocketClient.Channel.QueueDelete(qName, false, true, false) if err != nil { logrus.Errorln("Failed to delete queue, err:", err) return err } logrus.Println("deleted", consumerId, qName) return nil } func ApiBroadcastMessagesHandling() { msgs, err := apiBroadcastMessagesConsumer() if err != nil { logrus.Errorln("Failed to create api broadcast message consumer, err:", err) return } channelClosedChannel := make(chan *amqp.Error, 1) ApiBroadcastClient.Channel.NotifyClose(channelClosedChannel) go func() { for { select { case <-channelClosedChannel: // This case handles the event of closed channel e.g. abnormal shutdown msgs, err = apiBroadcastMessagesConsumer() if err != nil { // If the AMQP channel is not ready, it will continue the loop. Next // iteration will enter this case because chClosedCh is closed by the // library continue } logrus.Debugln("Re-set api broadcast channel") // Re-set channel to receive notifications // The library closes this channel after abnormal shutdown channelClosedChannel = make(chan *amqp.Error, 1) WebsocketClient.Channel.NotifyClose(channelClosedChannel) case msg := <-msgs: var receivedMessage structs.ReceivedMessage err = utils.UnmarshalReceivedMessage(msg.Body, &receivedMessage) if err != nil { logrus.Errorln("Failed to unmarshal received msg, err:", err) } logrus.Debugln("RABBITMQ RECEIVED BROADCAST MESSAGE:", receivedMessage) socketclients.BroadcastMessage(structs.SendSocketMessage{Cmd: receivedMessage.Cmd, Body: receivedMessage.Body}) } } }() } func apiBroadcastMessagesConsumer() (msgs <-chan amqp.Delivery, err error) { return ApiBroadcastClient.ConsumeChannelMessages(gorabbitmqclient.ChannelConsumeSettings{ QueueName: ApiBroadcastClient.AssignedQueueName, Consumer: "", AutoAck: true, Exclusive: false, NoLocal: false, NoWait: false, Arguments: nil, }) }