From 3d75bf7950f687322b91009fe2caa33e96e84a1f Mon Sep 17 00:00:00 2001 From: alex Date: Fri, 17 Feb 2023 07:51:13 +0100 Subject: [PATCH] changed file name --- rabbitmqclient.go | 483 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 483 insertions(+) create mode 100644 rabbitmqclient.go diff --git a/rabbitmqclient.go b/rabbitmqclient.go new file mode 100644 index 0000000..c256128 --- /dev/null +++ b/rabbitmqclient.go @@ -0,0 +1,483 @@ +package gocnjhelper + +import ( + "context" + "errors" + "fmt" + "os" + "time" + + amqp "github.com/rabbitmq/amqp091-go" + "github.com/rs/zerolog" +) + +var ( + errNotConnected = errors.New("not connected to a server") + errAlreadyClosed = errors.New("already closed: not connected to the server") + errShutdown = errors.New("client is shutting down") +) + +const ( + DeliveryModeTransient = amqp.Transient + DeliveryModePersistent = amqp.Persistent + + ContentTypeJson = "application/json" + + SetQueueNameToAutomaticallyAssignedQueueName = "SetQueueNameToAutomaticallyAssignedQueueName" + Test123 = "test1234" + TestHaha = "testhaha" +) + +type Config struct { + ReconnectDelay time.Duration + ReInitDelay time.Duration + ResendDelay time.Duration +} + +var ConfigDefault = Config{ + ReconnectDelay: 5 * time.Second, + ReInitDelay: 2 * time.Second, + ResendDelay: 5 * time.Second, +} + +var ChannelQosSettingsDefault = ChannelQosSettings{ + PrefetchCount: 1, + PrefetchSize: 0, + Global: false, +} + +type ExchangeSettings struct { + Enabled bool + Name string + Type string + Durable bool + AutoDeleted bool + Internal bool + NoWait bool + Arguments amqp.Table + ExchangePublishSettings ExchangePublishSettings +} + +type ExchangePublishSettings struct { + Mandatory bool + Immediate bool + ContentType string +} + +type QueueSettings struct { + Enabled bool + Name string + Durable bool + DeleteWhenUnused bool + Exclusive bool + NoWait bool + Arguments amqp.Table + QueuePublishSettings QueuePublishSettings + QueueBindSettings QueueBindSettings +} + +type QueuePublishSettings struct { + Mandatory bool + Immediate bool + DeliveryMode uint8 + ContentType string +} + +type ChannelConsumeSettings struct { + QueueName string + Consumer string + AutoAck bool + Exclusive bool + NoLocal bool + NoWait bool + Arguments amqp.Table +} + +type QueueDeclareSettings struct { + Name string + Durable bool + DeleteWhenUnused bool + Exclusive bool + NoWait bool + Arguments amqp.Table +} + +type QueueBindSettings struct { + Enabled bool + QueueName string + RoutingKey string + Exchange string + NoWait bool + Arguments amqp.Table +} + +type ChannelQosSettings struct { + PrefetchCount int + PrefetchSize int + Global bool +} + +type Client struct { + exchangeSettings ExchangeSettings + queueSettings QueueSettings + channelQosSettings ChannelQosSettings + AssignedQueueName string // automatically assigned queue name if none is specified + config Config + logger zerolog.Logger + connection *amqp.Connection + Channel *amqp.Channel + done chan bool + notifyConnClose chan *amqp.Error + notifyChanClose chan *amqp.Error + notifyConfirm chan amqp.Confirmation + isReady bool +} + +func GetConnectionString(username string, password string, host string) string { + return fmt.Sprintf("amqp://%s:%s@%s/", username, password, host) +} + +// New creates a new consumer state instance, and automatically +// attempts to connect to the server. +func NewClient(exchangeSettings ExchangeSettings, queueSettings QueueSettings, channelQosSettings ChannelQosSettings, config Config, addr string) *Client { + client := Client{ + logger: zerolog.New(os.Stdout).With().Logger(), + exchangeSettings: exchangeSettings, + queueSettings: queueSettings, + channelQosSettings: channelQosSettings, + config: config, + done: make(chan bool), + } + + go client.handleReconnect(addr) + return &client +} + +// handleReconnect will wait for a connection error on +// notifyConnClose, and then continuously attempt to reconnect. +func (client *Client) handleReconnect(addr string) { + for { + client.isReady = false + client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Attempting to connect") + + conn, err := client.connect(addr) + + if err != nil { + client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Failed to connect. Retrying...") + + select { + case <-client.done: + return + case <-time.After(client.config.ReconnectDelay): + } + continue + } + + if done := client.handleReInit(conn); done { + break + } + } +} + +// connect will create a new AMQP connection +func (client *Client) connect(addr string) (*amqp.Connection, error) { + conn, err := amqp.Dial(addr) + + if err != nil { + return nil, err + } + + client.changeConnection(conn) + client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Connected!") + return conn, nil +} + +// handleReconnect will wait for a channel error +// and then continuously attempt to re-initialize both channels +func (client *Client) handleReInit(conn *amqp.Connection) bool { + for { + client.isReady = false + + err := client.init(conn) + + if err != nil { + client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Failed to initialize channel. Retrying...") + + select { + case <-client.done: + return true + case <-time.After(client.config.ReInitDelay): + } + continue + } + + select { + case <-client.done: + return true + case <-client.notifyConnClose: + client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Connection closed. Reconnecting...") + return false + case <-client.notifyChanClose: + client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Channel closed. Re-running init...") + } + } +} + +// init will initialize channel & declare queue +func (client *Client) init(conn *amqp.Connection) error { + ch, err := conn.Channel() + + if err != nil { + return err + } + + err = ch.Confirm(false) + + if err != nil { + return err + } + + if err = ch.Qos( + client.channelQosSettings.PrefetchCount, + client.channelQosSettings.PrefetchSize, + client.channelQosSettings.Global, + ); err != nil { + return err + } + + if client.exchangeSettings.Enabled { + err = ch.ExchangeDeclare( + client.exchangeSettings.Name, + client.exchangeSettings.Type, + client.exchangeSettings.Durable, + client.exchangeSettings.AutoDeleted, + client.exchangeSettings.Internal, + client.exchangeSettings.NoWait, + client.exchangeSettings.Arguments, + ) + + if err != nil { + return err + } + } + + if client.queueSettings.Enabled { + q, err := ch.QueueDeclare( + client.queueSettings.Name, + client.queueSettings.Durable, + client.queueSettings.DeleteWhenUnused, + client.queueSettings.Exclusive, + client.queueSettings.NoWait, + client.queueSettings.Arguments, + ) + + if err != nil { + return err + } + + if client.queueSettings.QueueBindSettings.Enabled { + queueBindName := client.queueSettings.QueueBindSettings.QueueName + + if client.queueSettings.QueueBindSettings.QueueName == SetQueueNameToAutomaticallyAssignedQueueName { + queueBindName = q.Name + } + + err = ch.QueueBind( + queueBindName, + client.queueSettings.QueueBindSettings.RoutingKey, + client.queueSettings.QueueBindSettings.Exchange, + client.queueSettings.QueueBindSettings.NoWait, + client.queueSettings.QueueBindSettings.Arguments, + ) + + if err != nil { + return err + } + } + + client.AssignedQueueName = q.Name + } + + client.changeChannel(ch) + + client.isReady = true + client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Setup!") + + return nil +} + +// changeConnection takes a new connection to the queue, +// and updates the close listener to reflect this. +func (client *Client) changeConnection(connection *amqp.Connection) { + client.connection = connection + client.notifyConnClose = make(chan *amqp.Error, 1) + client.connection.NotifyClose(client.notifyConnClose) +} + +// changeChannel takes a new channel to the queue, +// and updates the channel listeners to reflect this. +func (client *Client) changeChannel(channel *amqp.Channel) { + client.Channel = channel + client.notifyChanClose = make(chan *amqp.Error, 1) + client.notifyConfirm = make(chan amqp.Confirmation, 1) + client.Channel.NotifyClose(client.notifyChanClose) + client.Channel.NotifyPublish(client.notifyConfirm) +} + +// Push will push data onto the queue, and wait for a confirm. +// If no confirms are received until within the resendTimeout, +// it continuously re-sends messages until a confirm is received. +// This will block until the server sends a confirm. Errors are +// only returned if the push action itself fails, see UnsafePush. +func (client *Client) PushExchangeMessage(routingKey string, data []byte) error { + if !client.isReady { + return errors.New("failed to push: not connected") + } + + for { + err := client.UnsafePush(data, routingKey) + + if err != nil { + client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Push failed. Retrying...") + + select { + case <-client.done: + return errShutdown + case <-time.After(client.config.ResendDelay): + } + continue + } + + select { + case confirm := <-client.notifyConfirm: + if confirm.Ack { + client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Push confirmed!") + return nil + } + case <-time.After(client.config.ResendDelay): + } + + client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Push didn't confirm. Retrying...") + } +} + +func (client *Client) PushQueueMessage(data []byte) error { + return client.PushExchangeMessage("", data) +} + +// UnsafePush will push to the queue without checking for +// confirmation. It returns an error if it fails to connect. +// No guarantees are provided for whether the server will +// receive the message. +func (client *Client) UnsafePush(data []byte, routingKey string) error { + if !client.isReady { + return errNotConnected + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // exchange + if client.exchangeSettings.Name != "" { + return client.Channel.PublishWithContext( + ctx, + client.exchangeSettings.Name, // exchange + routingKey, // routing key + client.exchangeSettings.ExchangePublishSettings.Mandatory, + client.exchangeSettings.ExchangePublishSettings.Immediate, + amqp.Publishing{ + ContentType: client.exchangeSettings.ExchangePublishSettings.ContentType, + Body: data, + }) + } + + // queue + return client.Channel.PublishWithContext( + ctx, + "", // exchange + client.queueSettings.Name, // routing key + client.queueSettings.QueuePublishSettings.Mandatory, + client.queueSettings.QueuePublishSettings.Immediate, + amqp.Publishing{ + DeliveryMode: client.queueSettings.QueuePublishSettings.DeliveryMode, + ContentType: client.queueSettings.QueuePublishSettings.ContentType, + Body: data, + }, + ) +} + +// can be used to open additional queues after client opening +func (client *Client) QueueDeclare(queueDeclareSettings QueueDeclareSettings) (amqp.Queue, error) { + if !client.isReady { + return amqp.Queue{}, errNotConnected + } + + return client.Channel.QueueDeclare( + queueDeclareSettings.Name, + queueDeclareSettings.Durable, + queueDeclareSettings.DeleteWhenUnused, + queueDeclareSettings.Exclusive, + queueDeclareSettings.NoWait, + queueDeclareSettings.Arguments, + ) +} + +// can be used to specify further queue bindings after client opening +func (client *Client) QueueBind(queueBindSettings QueueBindSettings) error { + if !client.isReady { + return errNotConnected + } + + return client.Channel.QueueBind( + queueBindSettings.QueueName, + queueBindSettings.RoutingKey, + queueBindSettings.Exchange, + queueBindSettings.NoWait, + queueBindSettings.Arguments, + ) +} + +// Consume will continuously put queue items on the channel. +// It is required to call delivery.Ack when it has been +// successfully processed, or delivery.Nack when it fails. +// Ignoring this will cause data to build up on the server. +func (client *Client) ConsumeChannelMessages(channelConsumeSettings ChannelConsumeSettings) (<-chan amqp.Delivery, error) { + if !client.isReady { + return nil, errNotConnected + } + + return client.Channel.Consume( + channelConsumeSettings.QueueName, + channelConsumeSettings.Consumer, + channelConsumeSettings.AutoAck, + channelConsumeSettings.Exclusive, + channelConsumeSettings.NoLocal, + channelConsumeSettings.NoWait, + channelConsumeSettings.Arguments, + ) +} + +// Close will cleanly shutdown the channel and connection. +func (client *Client) Close() error { + if !client.isReady { + return errAlreadyClosed + } + + close(client.done) + + err := client.Channel.Close() + + if err != nil { + return err + } + + err = client.connection.Close() + + if err != nil { + return err + } + + client.isReady = false + + return nil +}