parent
76448483b9
commit
1cb47904e2
18
client.go
18
client.go
|
@ -85,7 +85,7 @@ type Client struct {
|
|||
config Config
|
||||
logger *log.Logger
|
||||
connection *amqp.Connection
|
||||
channel *amqp.Channel
|
||||
Channel *amqp.Channel
|
||||
done chan bool
|
||||
notifyConnClose chan *amqp.Error
|
||||
notifyChanClose chan *amqp.Error
|
||||
|
@ -246,11 +246,11 @@ func (client *Client) changeConnection(connection *amqp.Connection) {
|
|||
// changeChannel takes a new channel to the queue,
|
||||
// and updates the channel listeners to reflect this.
|
||||
func (client *Client) changeChannel(channel *amqp.Channel) {
|
||||
client.channel = channel
|
||||
client.Channel = channel
|
||||
client.notifyChanClose = make(chan *amqp.Error, 1)
|
||||
client.notifyConfirm = make(chan amqp.Confirmation, 1)
|
||||
client.channel.NotifyClose(client.notifyChanClose)
|
||||
client.channel.NotifyPublish(client.notifyConfirm)
|
||||
client.Channel.NotifyClose(client.notifyChanClose)
|
||||
client.Channel.NotifyPublish(client.notifyConfirm)
|
||||
}
|
||||
|
||||
// Push will push data onto the queue, and wait for a confirm.
|
||||
|
@ -308,7 +308,7 @@ func (client *Client) UnsafePush(data []byte, routingKey string) error {
|
|||
|
||||
// exchange
|
||||
if client.exchangeSettings.Name != "" {
|
||||
return client.channel.PublishWithContext(
|
||||
return client.Channel.PublishWithContext(
|
||||
ctx,
|
||||
client.exchangeSettings.Name, // exchange
|
||||
routingKey, // routing key
|
||||
|
@ -321,7 +321,7 @@ func (client *Client) UnsafePush(data []byte, routingKey string) error {
|
|||
}
|
||||
|
||||
// queue
|
||||
return client.channel.PublishWithContext(
|
||||
return client.Channel.PublishWithContext(
|
||||
ctx,
|
||||
"", // exchange
|
||||
client.queueSettings.Name, // routing key
|
||||
|
@ -344,7 +344,7 @@ func (client *Client) ConsumeQueueMessages() (<-chan amqp.Delivery, error) {
|
|||
return nil, errNotConnected
|
||||
}
|
||||
|
||||
if err := client.channel.Qos(
|
||||
if err := client.Channel.Qos(
|
||||
1,
|
||||
0,
|
||||
false,
|
||||
|
@ -352,7 +352,7 @@ func (client *Client) ConsumeQueueMessages() (<-chan amqp.Delivery, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
return client.channel.Consume(
|
||||
return client.Channel.Consume(
|
||||
client.queueSettings.Name,
|
||||
"",
|
||||
client.queueSettings.QueueConsumeSettings.AutoAck,
|
||||
|
@ -371,7 +371,7 @@ func (client *Client) Close() error {
|
|||
|
||||
close(client.done)
|
||||
|
||||
err := client.channel.Close()
|
||||
err := client.Channel.Close()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
Loading…
Reference in New Issue