added routing key and changed way of pushing a message

alpha v1.0.5
alex 2023-01-07 20:00:30 +01:00
parent bf159f747c
commit 76448483b9
1 changed files with 8 additions and 4 deletions

View File

@ -258,13 +258,13 @@ func (client *Client) changeChannel(channel *amqp.Channel) {
// it continuously re-sends messages until a confirm is received.
// This will block until the server sends a confirm. Errors are
// only returned if the push action itself fails, see UnsafePush.
func (client *Client) PushMessage(data []byte) error {
func (client *Client) PushExchangeMessage(routingKey string, data []byte) error {
if !client.isReady {
return errors.New("failed to push: not connected")
}
for {
err := client.UnsafePush(data)
err := client.UnsafePush(data, routingKey)
if err != nil {
client.logger.Println("Push failed. Retrying...")
@ -290,11 +290,15 @@ func (client *Client) PushMessage(data []byte) error {
}
}
func (client *Client) PushQueueMessage(data []byte) error {
return client.PushExchangeMessage("", data)
}
// UnsafePush will push to the queue without checking for
// confirmation. It returns an error if it fails to connect.
// No guarantees are provided for whether the server will
// receive the message.
func (client *Client) UnsafePush(data []byte) error {
func (client *Client) UnsafePush(data []byte, routingKey string) error {
if !client.isReady {
return errNotConnected
}
@ -307,7 +311,7 @@ func (client *Client) UnsafePush(data []byte) error {
return client.channel.PublishWithContext(
ctx,
client.exchangeSettings.Name, // exchange
"", // routing key
routingKey, // routing key
client.exchangeSettings.ExchangePublishSettings.Mandatory,
client.exchangeSettings.ExchangePublishSettings.Immediate,
amqp.Publishing{