go-cnj-helper/rabbitmqclient.go

482 lines
12 KiB
Go

package gocnjhelper
import (
"context"
"errors"
"fmt"
"os"
"time"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/rs/zerolog"
)
var (
errNotConnected = errors.New("not connected to a server")
errAlreadyClosed = errors.New("already closed: not connected to the server")
errShutdown = errors.New("client is shutting down")
)
const (
DeliveryModeTransient = amqp.Transient
DeliveryModePersistent = amqp.Persistent
ContentTypeJson = "application/json"
SetQueueNameToAutomaticallyAssignedQueueName = "SetQueueNameToAutomaticallyAssignedQueueName"
)
type Config struct {
ReconnectDelay time.Duration
ReInitDelay time.Duration
ResendDelay time.Duration
}
var ConfigDefault = Config{
ReconnectDelay: 5 * time.Second,
ReInitDelay: 2 * time.Second,
ResendDelay: 5 * time.Second,
}
var ChannelQosSettingsDefault = ChannelQosSettings{
PrefetchCount: 1,
PrefetchSize: 0,
Global: false,
}
type ExchangeSettings struct {
Enabled bool
Name string
Type string
Durable bool
AutoDeleted bool
Internal bool
NoWait bool
Arguments amqp.Table
ExchangePublishSettings ExchangePublishSettings
}
type ExchangePublishSettings struct {
Mandatory bool
Immediate bool
ContentType string
}
type QueueSettings struct {
Enabled bool
Name string
Durable bool
DeleteWhenUnused bool
Exclusive bool
NoWait bool
Arguments amqp.Table
QueuePublishSettings QueuePublishSettings
QueueBindSettings QueueBindSettings
}
type QueuePublishSettings struct {
Mandatory bool
Immediate bool
DeliveryMode uint8
ContentType string
}
type ChannelConsumeSettings struct {
QueueName string
Consumer string
AutoAck bool
Exclusive bool
NoLocal bool
NoWait bool
Arguments amqp.Table
}
type QueueDeclareSettings struct {
Name string
Durable bool
DeleteWhenUnused bool
Exclusive bool
NoWait bool
Arguments amqp.Table
}
type QueueBindSettings struct {
Enabled bool
QueueName string
RoutingKey string
Exchange string
NoWait bool
Arguments amqp.Table
}
type ChannelQosSettings struct {
PrefetchCount int
PrefetchSize int
Global bool
}
type Client struct {
exchangeSettings ExchangeSettings
queueSettings QueueSettings
channelQosSettings ChannelQosSettings
AssignedQueueName string // automatically assigned queue name if none is specified
config Config
logger zerolog.Logger
connection *amqp.Connection
Channel *amqp.Channel
done chan bool
notifyConnClose chan *amqp.Error
notifyChanClose chan *amqp.Error
notifyConfirm chan amqp.Confirmation
isReady bool
}
func GetConnectionString(username string, password string, host string) string {
return fmt.Sprintf("amqp://%s:%s@%s/", username, password, host)
}
// New creates a new consumer state instance, and automatically
// attempts to connect to the server.
func NewClient(exchangeSettings ExchangeSettings, queueSettings QueueSettings, channelQosSettings ChannelQosSettings, config Config, addr string) *Client {
client := Client{
logger: zerolog.New(os.Stdout).With().Logger(),
exchangeSettings: exchangeSettings,
queueSettings: queueSettings,
channelQosSettings: channelQosSettings,
config: config,
done: make(chan bool),
}
go client.handleReconnect(addr)
return &client
}
// handleReconnect will wait for a connection error on
// notifyConnClose, and then continuously attempt to reconnect.
func (client *Client) handleReconnect(addr string) {
for {
client.isReady = false
client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Attempting to connect")
conn, err := client.connect(addr)
if err != nil {
client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Failed to connect. Retrying...")
select {
case <-client.done:
return
case <-time.After(client.config.ReconnectDelay):
}
continue
}
if done := client.handleReInit(conn); done {
break
}
}
}
// connect will create a new AMQP connection
func (client *Client) connect(addr string) (*amqp.Connection, error) {
conn, err := amqp.Dial(addr)
if err != nil {
return nil, err
}
client.changeConnection(conn)
client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Connected!")
return conn, nil
}
// handleReconnect will wait for a channel error
// and then continuously attempt to re-initialize both channels
func (client *Client) handleReInit(conn *amqp.Connection) bool {
for {
client.isReady = false
err := client.init(conn)
if err != nil {
client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Failed to initialize channel. Retrying...")
select {
case <-client.done:
return true
case <-time.After(client.config.ReInitDelay):
}
continue
}
select {
case <-client.done:
return true
case <-client.notifyConnClose:
client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Connection closed. Reconnecting...")
return false
case <-client.notifyChanClose:
client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Channel closed. Re-running init...")
}
}
}
// init will initialize channel & declare queue
func (client *Client) init(conn *amqp.Connection) error {
ch, err := conn.Channel()
if err != nil {
return err
}
err = ch.Confirm(false)
if err != nil {
return err
}
if err = ch.Qos(
client.channelQosSettings.PrefetchCount,
client.channelQosSettings.PrefetchSize,
client.channelQosSettings.Global,
); err != nil {
return err
}
if client.exchangeSettings.Enabled {
err = ch.ExchangeDeclare(
client.exchangeSettings.Name,
client.exchangeSettings.Type,
client.exchangeSettings.Durable,
client.exchangeSettings.AutoDeleted,
client.exchangeSettings.Internal,
client.exchangeSettings.NoWait,
client.exchangeSettings.Arguments,
)
if err != nil {
return err
}
}
if client.queueSettings.Enabled {
q, err := ch.QueueDeclare(
client.queueSettings.Name,
client.queueSettings.Durable,
client.queueSettings.DeleteWhenUnused,
client.queueSettings.Exclusive,
client.queueSettings.NoWait,
client.queueSettings.Arguments,
)
if err != nil {
return err
}
if client.queueSettings.QueueBindSettings.Enabled {
queueBindName := client.queueSettings.QueueBindSettings.QueueName
if client.queueSettings.QueueBindSettings.QueueName == SetQueueNameToAutomaticallyAssignedQueueName {
queueBindName = q.Name
}
err = ch.QueueBind(
queueBindName,
client.queueSettings.QueueBindSettings.RoutingKey,
client.queueSettings.QueueBindSettings.Exchange,
client.queueSettings.QueueBindSettings.NoWait,
client.queueSettings.QueueBindSettings.Arguments,
)
if err != nil {
return err
}
}
client.AssignedQueueName = q.Name
}
client.changeChannel(ch)
client.isReady = true
client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Setup!")
return nil
}
// changeConnection takes a new connection to the queue,
// and updates the close listener to reflect this.
func (client *Client) changeConnection(connection *amqp.Connection) {
client.connection = connection
client.notifyConnClose = make(chan *amqp.Error, 1)
client.connection.NotifyClose(client.notifyConnClose)
}
// changeChannel takes a new channel to the queue,
// and updates the channel listeners to reflect this.
func (client *Client) changeChannel(channel *amqp.Channel) {
client.Channel = channel
client.notifyChanClose = make(chan *amqp.Error, 1)
client.notifyConfirm = make(chan amqp.Confirmation, 1)
client.Channel.NotifyClose(client.notifyChanClose)
client.Channel.NotifyPublish(client.notifyConfirm)
}
// Push will push data onto the queue, and wait for a confirm.
// If no confirms are received until within the resendTimeout,
// it continuously re-sends messages until a confirm is received.
// This will block until the server sends a confirm. Errors are
// only returned if the push action itself fails, see UnsafePush.
func (client *Client) PushExchangeMessage(routingKey string, data []byte) error {
if !client.isReady {
return errors.New("failed to push: not connected")
}
for {
err := client.UnsafePush(data, routingKey)
if err != nil {
client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Push failed. Retrying...")
select {
case <-client.done:
return errShutdown
case <-time.After(client.config.ResendDelay):
}
continue
}
select {
case confirm := <-client.notifyConfirm:
if confirm.Ack {
client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Push confirmed!")
return nil
}
case <-time.After(client.config.ResendDelay):
}
client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Push didn't confirm. Retrying...")
}
}
func (client *Client) PushQueueMessage(data []byte) error {
return client.PushExchangeMessage("", data)
}
// UnsafePush will push to the queue without checking for
// confirmation. It returns an error if it fails to connect.
// No guarantees are provided for whether the server will
// receive the message.
func (client *Client) UnsafePush(data []byte, routingKey string) error {
if !client.isReady {
return errNotConnected
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// exchange
if client.exchangeSettings.Name != "" {
return client.Channel.PublishWithContext(
ctx,
client.exchangeSettings.Name, // exchange
routingKey, // routing key
client.exchangeSettings.ExchangePublishSettings.Mandatory,
client.exchangeSettings.ExchangePublishSettings.Immediate,
amqp.Publishing{
ContentType: client.exchangeSettings.ExchangePublishSettings.ContentType,
Body: data,
})
}
// queue
return client.Channel.PublishWithContext(
ctx,
"", // exchange
client.queueSettings.Name, // routing key
client.queueSettings.QueuePublishSettings.Mandatory,
client.queueSettings.QueuePublishSettings.Immediate,
amqp.Publishing{
DeliveryMode: client.queueSettings.QueuePublishSettings.DeliveryMode,
ContentType: client.queueSettings.QueuePublishSettings.ContentType,
Body: data,
},
)
}
// can be used to open additional queues after client opening
func (client *Client) QueueDeclare(queueDeclareSettings QueueDeclareSettings) (amqp.Queue, error) {
if !client.isReady {
return amqp.Queue{}, errNotConnected
}
return client.Channel.QueueDeclare(
queueDeclareSettings.Name,
queueDeclareSettings.Durable,
queueDeclareSettings.DeleteWhenUnused,
queueDeclareSettings.Exclusive,
queueDeclareSettings.NoWait,
queueDeclareSettings.Arguments,
)
}
// can be used to specify further queue bindings after client opening
func (client *Client) QueueBind(queueBindSettings QueueBindSettings) error {
if !client.isReady {
return errNotConnected
}
return client.Channel.QueueBind(
queueBindSettings.QueueName,
queueBindSettings.RoutingKey,
queueBindSettings.Exchange,
queueBindSettings.NoWait,
queueBindSettings.Arguments,
)
}
// Consume will continuously put queue items on the channel.
// It is required to call delivery.Ack when it has been
// successfully processed, or delivery.Nack when it fails.
// Ignoring this will cause data to build up on the server.
func (client *Client) ConsumeChannelMessages(channelConsumeSettings ChannelConsumeSettings) (<-chan amqp.Delivery, error) {
if !client.isReady {
return nil, errNotConnected
}
return client.Channel.Consume(
channelConsumeSettings.QueueName,
channelConsumeSettings.Consumer,
channelConsumeSettings.AutoAck,
channelConsumeSettings.Exclusive,
channelConsumeSettings.NoLocal,
channelConsumeSettings.NoWait,
channelConsumeSettings.Arguments,
)
}
// Close will cleanly shutdown the channel and connection.
func (client *Client) Close() error {
if !client.isReady {
return errAlreadyClosed
}
close(client.done)
err := client.Channel.Close()
if err != nil {
return err
}
err = client.connection.Close()
if err != nil {
return err
}
client.isReady = false
return nil
}