WebSocketServer/modules/rabbitmq/rabbitmq.go

102 lines
2.5 KiB
Go

package rabbitmq
import (
"time"
"clickandjoin.app/websocketserver/modules/config"
gocnjhelper "git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper"
amqp "github.com/rabbitmq/amqp091-go"
)
const (
exchangeWebsocketMessages = "cnj.websocketserver.messages"
exchangeBroadcastMessages = "cnj.api.broadcast.messages"
)
var (
WebsocketClient *gocnjhelper.Client
ApiBroadcastClient *gocnjhelper.Client
ConnectionAddress string
WebSocketChannelClosedChannel chan *amqp.Error // informs when the connection has been lost
)
func Init() {
cfg := config.Cfg
ConnectionAddress = gocnjhelper.GetConnectionString(cfg.RabbitMq.Username, cfg.RabbitMq.Password, cfg.RabbitMq.Host)
/*
* websocketserver messages
*/
WebsocketClient = gocnjhelper.NewClient(
gocnjhelper.ExchangeSettings{
Enabled: true,
Name: exchangeWebsocketMessages,
Type: "direct",
Durable: true,
AutoDeleted: false,
Internal: false,
NoWait: false,
Arguments: nil,
ExchangePublishSettings: gocnjhelper.ExchangePublishSettings{
Mandatory: false,
Immediate: false,
ContentType: gocnjhelper.ContentTypeJson,
},
},
gocnjhelper.QueueSettings{},
gocnjhelper.ChannelQosSettingsDefault,
gocnjhelper.Config{
ReconnectDelay: 1 * time.Second,
ReInitDelay: 1 * time.Second,
ResendDelay: 5 * time.Second,
},
ConnectionAddress)
/*
* api broadcast messages
*/
ApiBroadcastClient = gocnjhelper.NewClient(
gocnjhelper.ExchangeSettings{},
gocnjhelper.QueueSettings{
Enabled: true,
Name: "",
Durable: false,
DeleteWhenUnused: false,
Exclusive: true,
NoWait: false,
Arguments: nil,
QueueBindSettings: gocnjhelper.QueueBindSettings{
Enabled: true,
QueueName: gocnjhelper.SetQueueNameToAutomaticallyAssignedQueueName,
RoutingKey: "",
Exchange: exchangeBroadcastMessages,
NoWait: false,
Arguments: nil,
},
},
gocnjhelper.ChannelQosSettingsDefault,
gocnjhelper.Config{
ReconnectDelay: 1 * time.Second,
ReInitDelay: 1 * time.Second,
ResendDelay: 5 * time.Second,
},
ConnectionAddress)
// give the connection sometime to setup
<-time.After(time.Second)
/*
* websocketserver messages
*/
WebSocketChannelClosedChannel = make(chan *amqp.Error, 1)
WebsocketClient.Channel.NotifyClose(WebSocketChannelClosedChannel)
/*
* api broadcast messages
*/
go ApiBroadcastMessagesHandling()
}