package rabbitmq import ( "time" "clickandjoin.app/websocketserver/modules/config" gocnjhelper "git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper" amqp "github.com/rabbitmq/amqp091-go" ) const ( exchangeWebsocketMessages = "cnj.websocketserver.messages" exchangeBroadcastMessages = "cnj.api.broadcast.messages" ) var ( WebsocketClient *gocnjhelper.Client ApiBroadcastClient *gocnjhelper.Client ConnectionAddress string WebSocketChannelClosedChannel chan *amqp.Error // informs when the connection has been lost ) func Init() { cfg := config.Cfg ConnectionAddress = gocnjhelper.GetConnectionString(cfg.RabbitMq.Username, cfg.RabbitMq.Password, cfg.RabbitMq.Host) /* * websocketserver messages */ WebsocketClient = gocnjhelper.NewClient( gocnjhelper.ExchangeSettings{ Enabled: true, Name: exchangeWebsocketMessages, Type: "direct", Durable: true, AutoDeleted: false, Internal: false, NoWait: false, Arguments: nil, ExchangePublishSettings: gocnjhelper.ExchangePublishSettings{ Mandatory: false, Immediate: false, ContentType: gocnjhelper.ContentTypeJson, }, }, gocnjhelper.QueueSettings{}, gocnjhelper.ChannelQosSettingsDefault, gocnjhelper.Config{ ReconnectDelay: 1 * time.Second, ReInitDelay: 1 * time.Second, ResendDelay: 5 * time.Second, }, ConnectionAddress) /* * api broadcast messages */ ApiBroadcastClient = gocnjhelper.NewClient( gocnjhelper.ExchangeSettings{}, gocnjhelper.QueueSettings{ Enabled: true, Name: "", Durable: false, DeleteWhenUnused: false, Exclusive: true, NoWait: false, Arguments: nil, QueueBindSettings: gocnjhelper.QueueBindSettings{ Enabled: true, QueueName: gocnjhelper.SetQueueNameToAutomaticallyAssignedQueueName, RoutingKey: "", Exchange: exchangeBroadcastMessages, NoWait: false, Arguments: nil, }, }, gocnjhelper.ChannelQosSettingsDefault, gocnjhelper.Config{ ReconnectDelay: 1 * time.Second, ReInitDelay: 1 * time.Second, ResendDelay: 5 * time.Second, }, ConnectionAddress) // give the connection sometime to setup <-time.After(time.Second) /* * websocketserver messages */ WebSocketChannelClosedChannel = make(chan *amqp.Error, 1) WebsocketClient.Channel.NotifyClose(WebSocketChannelClosedChannel) /* * api broadcast messages */ go ApiBroadcastMessagesHandling() }