added queue binding

alpha v1.0.12
alex 2023-01-07 21:30:10 +01:00
parent 33baab20dd
commit e65a271af0
1 changed files with 24 additions and 10 deletions

View File

@ -22,6 +22,8 @@ const (
DeliveryModePersistent = amqp.Persistent
ContentTypeJson = "application/json"
SetQueueNameToAutomaticallyAssignedQueueName = "SetQueueNameToAutomaticallyAssignedQueueName"
)
type Config struct {
@ -62,6 +64,7 @@ type QueueSettings struct {
Arguments amqp.Table
QueuePublishSettings QueuePublishSettings
QueueConsumeSettings QueueConsumeSettings
QueueBindSettings QueueBindSettings
}
type QueuePublishSettings struct {
@ -81,6 +84,7 @@ type QueueConsumeSettings struct {
}
type QueueBindSettings struct {
Enabled bool
QueueName string
RoutingKey string
Exchange string
@ -236,6 +240,26 @@ func (client *Client) init(conn *amqp.Connection) error {
return err
}
if client.queueSettings.QueueBindSettings.Enabled {
queueBindName := client.queueSettings.QueueBindSettings.QueueName
if client.queueSettings.QueueBindSettings.QueueName == SetQueueNameToAutomaticallyAssignedQueueName {
queueBindName = q.Name
}
err = ch.QueueBind(
queueBindName,
client.queueSettings.QueueBindSettings.RoutingKey,
client.queueSettings.QueueBindSettings.Exchange,
client.queueSettings.QueueBindSettings.NoWait,
client.queueSettings.QueueBindSettings.Arguments,
)
if err != nil {
return err
}
}
client.AssignedQueueName = q.Name
client.changeChannel(ch)
@ -247,16 +271,6 @@ func (client *Client) init(conn *amqp.Connection) error {
return nil
}
func (client *Client) QueueBind(queueBindSettings QueueBindSettings) error {
return client.Channel.QueueBind(
queueBindSettings.QueueName,
queueBindSettings.RoutingKey,
queueBindSettings.Exchange,
queueBindSettings.NoWait,
queueBindSettings.Arguments,
)
}
// changeConnection takes a new connection to the queue,
// and updates the close listener to reflect this.
func (client *Client) changeConnection(connection *amqp.Connection) {