236 lines
6.8 KiB
Go
236 lines
6.8 KiB
Go
package rabbitmq
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
|
|
"clickandjoin.app/websocketserver/modules/cache"
|
|
"clickandjoin.app/websocketserver/modules/structs"
|
|
"clickandjoin.app/websocketserver/modules/utils"
|
|
"clickandjoin.app/websocketserver/socketclients"
|
|
gocnjhelper "git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper"
|
|
"github.com/google/uuid"
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
)
|
|
|
|
func PublishClientMessage(rabbitMqMessage structs.RabbitMqMessage) error {
|
|
msg, err := json.Marshal(rabbitMqMessage)
|
|
|
|
if err != nil {
|
|
gocnjhelper.LogErrorf("Failed to marshal rabbitMqMessage, err: %S", err)
|
|
return err
|
|
}
|
|
|
|
gocnjhelper.LogDebugf("SENT MSG: %s %s", rabbitMqMessage.Rec, string(msg))
|
|
|
|
err = WebsocketClient.PushExchangeMessage(rabbitMqMessage.Rec, msg)
|
|
|
|
if err != nil {
|
|
gocnjhelper.LogErrorf("Failed to publish client msg, err: %s", err)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// For each user a queue is created in the websocket exchange channel where he receives his messages
|
|
func CreateWSClientBinding(socketClient *structs.SocketClient, id string) (err error) {
|
|
msgs, err := createWSClientQueueAndBindingConsumer(socketClient, id)
|
|
|
|
if err != nil {
|
|
gocnjhelper.LogErrorf("Failed to create ws client queue and binding consumer, err: %s", err)
|
|
return err
|
|
}
|
|
|
|
// canceled when websocket client disconnects
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
socketClient.CancelFunc = cancel
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
gocnjhelper.LogDebug("DONE!")
|
|
return
|
|
case <-WebSocketChannelClosedChannel:
|
|
// This case handles the event of closed channel e.g. abnormal shutdown
|
|
msgs, err = createWSClientQueueAndBindingConsumer(socketClient, id)
|
|
|
|
if err != nil {
|
|
// If the AMQP channel is not ready, it will continue the loop. Next
|
|
// iteration will enter this case because chClosedCh is closed by the
|
|
// library
|
|
continue
|
|
}
|
|
|
|
gocnjhelper.LogDebugf("Re-set ws client channel %s", socketClient.RabbitMqConsumerId)
|
|
|
|
// Re-set channel to receive notifications
|
|
// The library closes this channel after abnormal shutdown
|
|
WebSocketChannelClosedChannel = make(chan *amqp.Error, 1)
|
|
WebsocketClient.Channel.NotifyClose(WebSocketChannelClosedChannel)
|
|
|
|
case msg := <-msgs:
|
|
var receivedMessage structs.ReceivedMessage
|
|
|
|
gocnjhelper.LogDebugf("RABBITMQ RECEIVED MESSAGE: %s", msg.Body)
|
|
|
|
if err = utils.UnmarshalReceivedMessage(msg.Body, &receivedMessage); err != nil {
|
|
break
|
|
}
|
|
|
|
if receivedMessage.Cmd == utils.RabbitMqWsCmdInternalDeleteWebSocketSession {
|
|
gocnjhelper.LogDebugf("rabbitmq here %s", fmt.Sprintf("%v", receivedMessage.Body))
|
|
|
|
if socketClient, ok := cache.GetSocketClient(fmt.Sprintf("%v", receivedMessage.Body)); ok {
|
|
gocnjhelper.LogDebugf("rabbitmq del socketClient %s", socketClient)
|
|
cache.DeleteClient(fmt.Sprintf("%v", receivedMessage.Body))
|
|
socketClient.SendCloseMessage()
|
|
socketClient.CancelFunc()
|
|
DeleteWSClient(socketClient.RabbitMqConsumerId, socketClient.RabbitMqQueueName)
|
|
} else {
|
|
gocnjhelper.LogDebug("rabbitmq del socketclient else not found")
|
|
}
|
|
} else {
|
|
err = socketClient.SendMessage(structs.SendSocketMessage{Cmd: receivedMessage.Cmd, Body: receivedMessage.Body})
|
|
|
|
if err != nil {
|
|
gocnjhelper.LogErrorf("Failed to send message to client, err: %s", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
func createWSClientQueueAndBindingConsumer(socketClient *structs.SocketClient, id string) (msgs <-chan amqp.Delivery, err error) {
|
|
q, err := WebsocketClient.QueueDeclare(gocnjhelper.QueueDeclareSettings{
|
|
Name: "",
|
|
Durable: false,
|
|
DeleteWhenUnused: false,
|
|
Exclusive: true,
|
|
NoWait: false,
|
|
Arguments: nil,
|
|
})
|
|
|
|
if err != nil {
|
|
return msgs, err
|
|
}
|
|
|
|
// set an ID for the consumer to delete the consumer after the socket user has disconnected
|
|
socketClient.RabbitMqConsumerId = uuid.New().String()
|
|
|
|
// store queue name in the client to delete it when the client disconnects from the websocket
|
|
socketClient.RabbitMqQueueName = q.Name
|
|
|
|
err = WebsocketClient.QueueBind(gocnjhelper.QueueBindSettings{
|
|
QueueName: q.Name,
|
|
RoutingKey: id,
|
|
Exchange: exchangeWebsocketMessages,
|
|
NoWait: false,
|
|
Arguments: nil,
|
|
})
|
|
|
|
if err != nil {
|
|
gocnjhelper.LogErrorf("Failed to bind queue, err: %s", err)
|
|
return msgs, err
|
|
}
|
|
|
|
return WebsocketClient.ConsumeChannelMessages(gocnjhelper.ChannelConsumeSettings{
|
|
QueueName: q.Name,
|
|
Consumer: socketClient.RabbitMqConsumerId,
|
|
AutoAck: true,
|
|
Exclusive: false,
|
|
NoLocal: false,
|
|
NoWait: false,
|
|
Arguments: nil,
|
|
})
|
|
}
|
|
|
|
func DeleteWSClient(consumerId string, qName string) error {
|
|
// delete consumer
|
|
err := WebsocketClient.Channel.Cancel(consumerId, false)
|
|
|
|
if err != nil {
|
|
gocnjhelper.LogErrorf("Failed to delete consumer, err: %s", err)
|
|
return err
|
|
}
|
|
|
|
// delete queue and queue binding
|
|
_, err = WebsocketClient.Channel.QueueDelete(qName, false, true, false)
|
|
|
|
if err != nil {
|
|
gocnjhelper.LogErrorf("Failed to delete queue, err: %s", err)
|
|
return err
|
|
}
|
|
|
|
gocnjhelper.LogDebugf("deleted %s %s", consumerId, qName)
|
|
|
|
return nil
|
|
}
|
|
|
|
func ApiBroadcastMessagesHandling() {
|
|
msgs, err := apiBroadcastMessagesConsumer()
|
|
|
|
if err != nil {
|
|
gocnjhelper.LogErrorf("Failed to create api broadcast message consumer, err: %s", err)
|
|
return
|
|
}
|
|
|
|
channelClosedChannel := make(chan *amqp.Error, 1)
|
|
ApiBroadcastClient.Channel.NotifyClose(channelClosedChannel)
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-channelClosedChannel:
|
|
// This case handles the event of closed channel e.g. abnormal shutdown
|
|
msgs, err = apiBroadcastMessagesConsumer()
|
|
|
|
if err != nil {
|
|
// If the AMQP channel is not ready, it will continue the loop. Next
|
|
// iteration will enter this case because chClosedCh is closed by the
|
|
// library
|
|
continue
|
|
}
|
|
|
|
gocnjhelper.LogDebug("Re-set api broadcast channel")
|
|
|
|
// Re-set channel to receive notifications
|
|
// The library closes this channel after abnormal shutdown
|
|
channelClosedChannel = make(chan *amqp.Error, 1)
|
|
ApiBroadcastClient.Channel.NotifyClose(channelClosedChannel)
|
|
|
|
case msg := <-msgs:
|
|
var receivedMessage structs.ReceivedMessage
|
|
|
|
err = utils.UnmarshalReceivedMessage(msg.Body, &receivedMessage)
|
|
|
|
if err != nil {
|
|
gocnjhelper.LogErrorf("Failed to unmarshal received msg, err: %s", err)
|
|
}
|
|
|
|
gocnjhelper.LogDebugf("RABBITMQ RECEIVED BROADCAST MESSAGE: %s", receivedMessage)
|
|
|
|
socketclients.BroadcastMessage(structs.SendSocketMessage{Cmd: receivedMessage.Cmd, Body: receivedMessage.Body})
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func apiBroadcastMessagesConsumer() (msgs <-chan amqp.Delivery, err error) {
|
|
return ApiBroadcastClient.ConsumeChannelMessages(gocnjhelper.ChannelConsumeSettings{
|
|
QueueName: ApiBroadcastClient.AssignedQueueName,
|
|
Consumer: "",
|
|
AutoAck: true,
|
|
Exclusive: false,
|
|
NoLocal: false,
|
|
NoWait: false,
|
|
Arguments: nil,
|
|
})
|
|
}
|