package gocnjhelper import ( "context" "errors" "fmt" "os" "time" amqp "github.com/rabbitmq/amqp091-go" "github.com/rs/zerolog" ) var ( errNotConnected = errors.New("not connected to a server") errAlreadyClosed = errors.New("already closed: not connected to the server") errShutdown = errors.New("client is shutting down") ) const ( DeliveryModeTransient = amqp.Transient DeliveryModePersistent = amqp.Persistent ContentTypeJson = "application/json" SetQueueNameToAutomaticallyAssignedQueueName = "SetQueueNameToAutomaticallyAssignedQueueName" Test123 = "test1234" TestHaha = "testhaha" ) type Config struct { ReconnectDelay time.Duration ReInitDelay time.Duration ResendDelay time.Duration } var ConfigDefault = Config{ ReconnectDelay: 5 * time.Second, ReInitDelay: 2 * time.Second, ResendDelay: 5 * time.Second, } var ChannelQosSettingsDefault = ChannelQosSettings{ PrefetchCount: 1, PrefetchSize: 0, Global: false, } type ExchangeSettings struct { Enabled bool Name string Type string Durable bool AutoDeleted bool Internal bool NoWait bool Arguments amqp.Table ExchangePublishSettings ExchangePublishSettings } type ExchangePublishSettings struct { Mandatory bool Immediate bool ContentType string } type QueueSettings struct { Enabled bool Name string Durable bool DeleteWhenUnused bool Exclusive bool NoWait bool Arguments amqp.Table QueuePublishSettings QueuePublishSettings QueueBindSettings QueueBindSettings } type QueuePublishSettings struct { Mandatory bool Immediate bool DeliveryMode uint8 ContentType string } type ChannelConsumeSettings struct { QueueName string Consumer string AutoAck bool Exclusive bool NoLocal bool NoWait bool Arguments amqp.Table } type QueueDeclareSettings struct { Name string Durable bool DeleteWhenUnused bool Exclusive bool NoWait bool Arguments amqp.Table } type QueueBindSettings struct { Enabled bool QueueName string RoutingKey string Exchange string NoWait bool Arguments amqp.Table } type ChannelQosSettings struct { PrefetchCount int PrefetchSize int Global bool } type Client struct { exchangeSettings ExchangeSettings queueSettings QueueSettings channelQosSettings ChannelQosSettings AssignedQueueName string // automatically assigned queue name if none is specified config Config logger zerolog.Logger connection *amqp.Connection Channel *amqp.Channel done chan bool notifyConnClose chan *amqp.Error notifyChanClose chan *amqp.Error notifyConfirm chan amqp.Confirmation isReady bool } func GetConnectionString(username string, password string, host string) string { return fmt.Sprintf("amqp://%s:%s@%s/", username, password, host) } // New creates a new consumer state instance, and automatically // attempts to connect to the server. func NewClient(exchangeSettings ExchangeSettings, queueSettings QueueSettings, channelQosSettings ChannelQosSettings, config Config, addr string) *Client { client := Client{ logger: zerolog.New(os.Stdout).With().Logger(), exchangeSettings: exchangeSettings, queueSettings: queueSettings, channelQosSettings: channelQosSettings, config: config, done: make(chan bool), } go client.handleReconnect(addr) return &client } // handleReconnect will wait for a connection error on // notifyConnClose, and then continuously attempt to reconnect. func (client *Client) handleReconnect(addr string) { for { client.isReady = false client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Attempting to connect") conn, err := client.connect(addr) if err != nil { client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Failed to connect. Retrying...") select { case <-client.done: return case <-time.After(client.config.ReconnectDelay): } continue } if done := client.handleReInit(conn); done { break } } } // connect will create a new AMQP connection func (client *Client) connect(addr string) (*amqp.Connection, error) { conn, err := amqp.Dial(addr) if err != nil { return nil, err } client.changeConnection(conn) client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Connected!") return conn, nil } // handleReconnect will wait for a channel error // and then continuously attempt to re-initialize both channels func (client *Client) handleReInit(conn *amqp.Connection) bool { for { client.isReady = false err := client.init(conn) if err != nil { client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Failed to initialize channel. Retrying...") select { case <-client.done: return true case <-time.After(client.config.ReInitDelay): } continue } select { case <-client.done: return true case <-client.notifyConnClose: client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Connection closed. Reconnecting...") return false case <-client.notifyChanClose: client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Channel closed. Re-running init...") } } } // init will initialize channel & declare queue func (client *Client) init(conn *amqp.Connection) error { ch, err := conn.Channel() if err != nil { return err } err = ch.Confirm(false) if err != nil { return err } if err = ch.Qos( client.channelQosSettings.PrefetchCount, client.channelQosSettings.PrefetchSize, client.channelQosSettings.Global, ); err != nil { return err } if client.exchangeSettings.Enabled { err = ch.ExchangeDeclare( client.exchangeSettings.Name, client.exchangeSettings.Type, client.exchangeSettings.Durable, client.exchangeSettings.AutoDeleted, client.exchangeSettings.Internal, client.exchangeSettings.NoWait, client.exchangeSettings.Arguments, ) if err != nil { return err } } if client.queueSettings.Enabled { q, err := ch.QueueDeclare( client.queueSettings.Name, client.queueSettings.Durable, client.queueSettings.DeleteWhenUnused, client.queueSettings.Exclusive, client.queueSettings.NoWait, client.queueSettings.Arguments, ) if err != nil { return err } if client.queueSettings.QueueBindSettings.Enabled { queueBindName := client.queueSettings.QueueBindSettings.QueueName if client.queueSettings.QueueBindSettings.QueueName == SetQueueNameToAutomaticallyAssignedQueueName { queueBindName = q.Name } err = ch.QueueBind( queueBindName, client.queueSettings.QueueBindSettings.RoutingKey, client.queueSettings.QueueBindSettings.Exchange, client.queueSettings.QueueBindSettings.NoWait, client.queueSettings.QueueBindSettings.Arguments, ) if err != nil { return err } } client.AssignedQueueName = q.Name } client.changeChannel(ch) client.isReady = true client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Setup!") return nil } // changeConnection takes a new connection to the queue, // and updates the close listener to reflect this. func (client *Client) changeConnection(connection *amqp.Connection) { client.connection = connection client.notifyConnClose = make(chan *amqp.Error, 1) client.connection.NotifyClose(client.notifyConnClose) } // changeChannel takes a new channel to the queue, // and updates the channel listeners to reflect this. func (client *Client) changeChannel(channel *amqp.Channel) { client.Channel = channel client.notifyChanClose = make(chan *amqp.Error, 1) client.notifyConfirm = make(chan amqp.Confirmation, 1) client.Channel.NotifyClose(client.notifyChanClose) client.Channel.NotifyPublish(client.notifyConfirm) } // Push will push data onto the queue, and wait for a confirm. // If no confirms are received until within the resendTimeout, // it continuously re-sends messages until a confirm is received. // This will block until the server sends a confirm. Errors are // only returned if the push action itself fails, see UnsafePush. func (client *Client) PushExchangeMessage(routingKey string, data []byte) error { if !client.isReady { return errors.New("failed to push: not connected") } for { err := client.UnsafePush(data, routingKey) if err != nil { client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Push failed. Retrying...") select { case <-client.done: return errShutdown case <-time.After(client.config.ResendDelay): } continue } select { case confirm := <-client.notifyConfirm: if confirm.Ack { client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Push confirmed!") return nil } case <-time.After(client.config.ResendDelay): } client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Push didn't confirm. Retrying...") } } func (client *Client) PushQueueMessage(data []byte) error { return client.PushExchangeMessage("", data) } // UnsafePush will push to the queue without checking for // confirmation. It returns an error if it fails to connect. // No guarantees are provided for whether the server will // receive the message. func (client *Client) UnsafePush(data []byte, routingKey string) error { if !client.isReady { return errNotConnected } ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() // exchange if client.exchangeSettings.Name != "" { return client.Channel.PublishWithContext( ctx, client.exchangeSettings.Name, // exchange routingKey, // routing key client.exchangeSettings.ExchangePublishSettings.Mandatory, client.exchangeSettings.ExchangePublishSettings.Immediate, amqp.Publishing{ ContentType: client.exchangeSettings.ExchangePublishSettings.ContentType, Body: data, }) } // queue return client.Channel.PublishWithContext( ctx, "", // exchange client.queueSettings.Name, // routing key client.queueSettings.QueuePublishSettings.Mandatory, client.queueSettings.QueuePublishSettings.Immediate, amqp.Publishing{ DeliveryMode: client.queueSettings.QueuePublishSettings.DeliveryMode, ContentType: client.queueSettings.QueuePublishSettings.ContentType, Body: data, }, ) } // can be used to open additional queues after client opening func (client *Client) QueueDeclare(queueDeclareSettings QueueDeclareSettings) (amqp.Queue, error) { if !client.isReady { return amqp.Queue{}, errNotConnected } return client.Channel.QueueDeclare( queueDeclareSettings.Name, queueDeclareSettings.Durable, queueDeclareSettings.DeleteWhenUnused, queueDeclareSettings.Exclusive, queueDeclareSettings.NoWait, queueDeclareSettings.Arguments, ) } // can be used to specify further queue bindings after client opening func (client *Client) QueueBind(queueBindSettings QueueBindSettings) error { if !client.isReady { return errNotConnected } return client.Channel.QueueBind( queueBindSettings.QueueName, queueBindSettings.RoutingKey, queueBindSettings.Exchange, queueBindSettings.NoWait, queueBindSettings.Arguments, ) } // Consume will continuously put queue items on the channel. // It is required to call delivery.Ack when it has been // successfully processed, or delivery.Nack when it fails. // Ignoring this will cause data to build up on the server. func (client *Client) ConsumeChannelMessages(channelConsumeSettings ChannelConsumeSettings) (<-chan amqp.Delivery, error) { if !client.isReady { return nil, errNotConnected } return client.Channel.Consume( channelConsumeSettings.QueueName, channelConsumeSettings.Consumer, channelConsumeSettings.AutoAck, channelConsumeSettings.Exclusive, channelConsumeSettings.NoLocal, channelConsumeSettings.NoWait, channelConsumeSettings.Arguments, ) } // Close will cleanly shutdown the channel and connection. func (client *Client) Close() error { if !client.isReady { return errAlreadyClosed } close(client.done) err := client.Channel.Close() if err != nil { return err } err = client.connection.Close() if err != nil { return err } client.isReady = false return nil }