parent
7413d07ecf
commit
3d75bf7950
|
@ -0,0 +1,483 @@
|
||||||
|
package gocnjhelper
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
amqp "github.com/rabbitmq/amqp091-go"
|
||||||
|
"github.com/rs/zerolog"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
errNotConnected = errors.New("not connected to a server")
|
||||||
|
errAlreadyClosed = errors.New("already closed: not connected to the server")
|
||||||
|
errShutdown = errors.New("client is shutting down")
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
DeliveryModeTransient = amqp.Transient
|
||||||
|
DeliveryModePersistent = amqp.Persistent
|
||||||
|
|
||||||
|
ContentTypeJson = "application/json"
|
||||||
|
|
||||||
|
SetQueueNameToAutomaticallyAssignedQueueName = "SetQueueNameToAutomaticallyAssignedQueueName"
|
||||||
|
Test123 = "test1234"
|
||||||
|
TestHaha = "testhaha"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Config struct {
|
||||||
|
ReconnectDelay time.Duration
|
||||||
|
ReInitDelay time.Duration
|
||||||
|
ResendDelay time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
var ConfigDefault = Config{
|
||||||
|
ReconnectDelay: 5 * time.Second,
|
||||||
|
ReInitDelay: 2 * time.Second,
|
||||||
|
ResendDelay: 5 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
var ChannelQosSettingsDefault = ChannelQosSettings{
|
||||||
|
PrefetchCount: 1,
|
||||||
|
PrefetchSize: 0,
|
||||||
|
Global: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
type ExchangeSettings struct {
|
||||||
|
Enabled bool
|
||||||
|
Name string
|
||||||
|
Type string
|
||||||
|
Durable bool
|
||||||
|
AutoDeleted bool
|
||||||
|
Internal bool
|
||||||
|
NoWait bool
|
||||||
|
Arguments amqp.Table
|
||||||
|
ExchangePublishSettings ExchangePublishSettings
|
||||||
|
}
|
||||||
|
|
||||||
|
type ExchangePublishSettings struct {
|
||||||
|
Mandatory bool
|
||||||
|
Immediate bool
|
||||||
|
ContentType string
|
||||||
|
}
|
||||||
|
|
||||||
|
type QueueSettings struct {
|
||||||
|
Enabled bool
|
||||||
|
Name string
|
||||||
|
Durable bool
|
||||||
|
DeleteWhenUnused bool
|
||||||
|
Exclusive bool
|
||||||
|
NoWait bool
|
||||||
|
Arguments amqp.Table
|
||||||
|
QueuePublishSettings QueuePublishSettings
|
||||||
|
QueueBindSettings QueueBindSettings
|
||||||
|
}
|
||||||
|
|
||||||
|
type QueuePublishSettings struct {
|
||||||
|
Mandatory bool
|
||||||
|
Immediate bool
|
||||||
|
DeliveryMode uint8
|
||||||
|
ContentType string
|
||||||
|
}
|
||||||
|
|
||||||
|
type ChannelConsumeSettings struct {
|
||||||
|
QueueName string
|
||||||
|
Consumer string
|
||||||
|
AutoAck bool
|
||||||
|
Exclusive bool
|
||||||
|
NoLocal bool
|
||||||
|
NoWait bool
|
||||||
|
Arguments amqp.Table
|
||||||
|
}
|
||||||
|
|
||||||
|
type QueueDeclareSettings struct {
|
||||||
|
Name string
|
||||||
|
Durable bool
|
||||||
|
DeleteWhenUnused bool
|
||||||
|
Exclusive bool
|
||||||
|
NoWait bool
|
||||||
|
Arguments amqp.Table
|
||||||
|
}
|
||||||
|
|
||||||
|
type QueueBindSettings struct {
|
||||||
|
Enabled bool
|
||||||
|
QueueName string
|
||||||
|
RoutingKey string
|
||||||
|
Exchange string
|
||||||
|
NoWait bool
|
||||||
|
Arguments amqp.Table
|
||||||
|
}
|
||||||
|
|
||||||
|
type ChannelQosSettings struct {
|
||||||
|
PrefetchCount int
|
||||||
|
PrefetchSize int
|
||||||
|
Global bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type Client struct {
|
||||||
|
exchangeSettings ExchangeSettings
|
||||||
|
queueSettings QueueSettings
|
||||||
|
channelQosSettings ChannelQosSettings
|
||||||
|
AssignedQueueName string // automatically assigned queue name if none is specified
|
||||||
|
config Config
|
||||||
|
logger zerolog.Logger
|
||||||
|
connection *amqp.Connection
|
||||||
|
Channel *amqp.Channel
|
||||||
|
done chan bool
|
||||||
|
notifyConnClose chan *amqp.Error
|
||||||
|
notifyChanClose chan *amqp.Error
|
||||||
|
notifyConfirm chan amqp.Confirmation
|
||||||
|
isReady bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetConnectionString(username string, password string, host string) string {
|
||||||
|
return fmt.Sprintf("amqp://%s:%s@%s/", username, password, host)
|
||||||
|
}
|
||||||
|
|
||||||
|
// New creates a new consumer state instance, and automatically
|
||||||
|
// attempts to connect to the server.
|
||||||
|
func NewClient(exchangeSettings ExchangeSettings, queueSettings QueueSettings, channelQosSettings ChannelQosSettings, config Config, addr string) *Client {
|
||||||
|
client := Client{
|
||||||
|
logger: zerolog.New(os.Stdout).With().Logger(),
|
||||||
|
exchangeSettings: exchangeSettings,
|
||||||
|
queueSettings: queueSettings,
|
||||||
|
channelQosSettings: channelQosSettings,
|
||||||
|
config: config,
|
||||||
|
done: make(chan bool),
|
||||||
|
}
|
||||||
|
|
||||||
|
go client.handleReconnect(addr)
|
||||||
|
return &client
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleReconnect will wait for a connection error on
|
||||||
|
// notifyConnClose, and then continuously attempt to reconnect.
|
||||||
|
func (client *Client) handleReconnect(addr string) {
|
||||||
|
for {
|
||||||
|
client.isReady = false
|
||||||
|
client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Attempting to connect")
|
||||||
|
|
||||||
|
conn, err := client.connect(addr)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Failed to connect. Retrying...")
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-client.done:
|
||||||
|
return
|
||||||
|
case <-time.After(client.config.ReconnectDelay):
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if done := client.handleReInit(conn); done {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// connect will create a new AMQP connection
|
||||||
|
func (client *Client) connect(addr string) (*amqp.Connection, error) {
|
||||||
|
conn, err := amqp.Dial(addr)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
client.changeConnection(conn)
|
||||||
|
client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Connected!")
|
||||||
|
return conn, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleReconnect will wait for a channel error
|
||||||
|
// and then continuously attempt to re-initialize both channels
|
||||||
|
func (client *Client) handleReInit(conn *amqp.Connection) bool {
|
||||||
|
for {
|
||||||
|
client.isReady = false
|
||||||
|
|
||||||
|
err := client.init(conn)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Failed to initialize channel. Retrying...")
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-client.done:
|
||||||
|
return true
|
||||||
|
case <-time.After(client.config.ReInitDelay):
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-client.done:
|
||||||
|
return true
|
||||||
|
case <-client.notifyConnClose:
|
||||||
|
client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Connection closed. Reconnecting...")
|
||||||
|
return false
|
||||||
|
case <-client.notifyChanClose:
|
||||||
|
client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Channel closed. Re-running init...")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// init will initialize channel & declare queue
|
||||||
|
func (client *Client) init(conn *amqp.Connection) error {
|
||||||
|
ch, err := conn.Channel()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = ch.Confirm(false)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = ch.Qos(
|
||||||
|
client.channelQosSettings.PrefetchCount,
|
||||||
|
client.channelQosSettings.PrefetchSize,
|
||||||
|
client.channelQosSettings.Global,
|
||||||
|
); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if client.exchangeSettings.Enabled {
|
||||||
|
err = ch.ExchangeDeclare(
|
||||||
|
client.exchangeSettings.Name,
|
||||||
|
client.exchangeSettings.Type,
|
||||||
|
client.exchangeSettings.Durable,
|
||||||
|
client.exchangeSettings.AutoDeleted,
|
||||||
|
client.exchangeSettings.Internal,
|
||||||
|
client.exchangeSettings.NoWait,
|
||||||
|
client.exchangeSettings.Arguments,
|
||||||
|
)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if client.queueSettings.Enabled {
|
||||||
|
q, err := ch.QueueDeclare(
|
||||||
|
client.queueSettings.Name,
|
||||||
|
client.queueSettings.Durable,
|
||||||
|
client.queueSettings.DeleteWhenUnused,
|
||||||
|
client.queueSettings.Exclusive,
|
||||||
|
client.queueSettings.NoWait,
|
||||||
|
client.queueSettings.Arguments,
|
||||||
|
)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if client.queueSettings.QueueBindSettings.Enabled {
|
||||||
|
queueBindName := client.queueSettings.QueueBindSettings.QueueName
|
||||||
|
|
||||||
|
if client.queueSettings.QueueBindSettings.QueueName == SetQueueNameToAutomaticallyAssignedQueueName {
|
||||||
|
queueBindName = q.Name
|
||||||
|
}
|
||||||
|
|
||||||
|
err = ch.QueueBind(
|
||||||
|
queueBindName,
|
||||||
|
client.queueSettings.QueueBindSettings.RoutingKey,
|
||||||
|
client.queueSettings.QueueBindSettings.Exchange,
|
||||||
|
client.queueSettings.QueueBindSettings.NoWait,
|
||||||
|
client.queueSettings.QueueBindSettings.Arguments,
|
||||||
|
)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
client.AssignedQueueName = q.Name
|
||||||
|
}
|
||||||
|
|
||||||
|
client.changeChannel(ch)
|
||||||
|
|
||||||
|
client.isReady = true
|
||||||
|
client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Setup!")
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// changeConnection takes a new connection to the queue,
|
||||||
|
// and updates the close listener to reflect this.
|
||||||
|
func (client *Client) changeConnection(connection *amqp.Connection) {
|
||||||
|
client.connection = connection
|
||||||
|
client.notifyConnClose = make(chan *amqp.Error, 1)
|
||||||
|
client.connection.NotifyClose(client.notifyConnClose)
|
||||||
|
}
|
||||||
|
|
||||||
|
// changeChannel takes a new channel to the queue,
|
||||||
|
// and updates the channel listeners to reflect this.
|
||||||
|
func (client *Client) changeChannel(channel *amqp.Channel) {
|
||||||
|
client.Channel = channel
|
||||||
|
client.notifyChanClose = make(chan *amqp.Error, 1)
|
||||||
|
client.notifyConfirm = make(chan amqp.Confirmation, 1)
|
||||||
|
client.Channel.NotifyClose(client.notifyChanClose)
|
||||||
|
client.Channel.NotifyPublish(client.notifyConfirm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Push will push data onto the queue, and wait for a confirm.
|
||||||
|
// If no confirms are received until within the resendTimeout,
|
||||||
|
// it continuously re-sends messages until a confirm is received.
|
||||||
|
// This will block until the server sends a confirm. Errors are
|
||||||
|
// only returned if the push action itself fails, see UnsafePush.
|
||||||
|
func (client *Client) PushExchangeMessage(routingKey string, data []byte) error {
|
||||||
|
if !client.isReady {
|
||||||
|
return errors.New("failed to push: not connected")
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
err := client.UnsafePush(data, routingKey)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Push failed. Retrying...")
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-client.done:
|
||||||
|
return errShutdown
|
||||||
|
case <-time.After(client.config.ResendDelay):
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case confirm := <-client.notifyConfirm:
|
||||||
|
if confirm.Ack {
|
||||||
|
client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Push confirmed!")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
case <-time.After(client.config.ResendDelay):
|
||||||
|
}
|
||||||
|
|
||||||
|
client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Push didn't confirm. Retrying...")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (client *Client) PushQueueMessage(data []byte) error {
|
||||||
|
return client.PushExchangeMessage("", data)
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnsafePush will push to the queue without checking for
|
||||||
|
// confirmation. It returns an error if it fails to connect.
|
||||||
|
// No guarantees are provided for whether the server will
|
||||||
|
// receive the message.
|
||||||
|
func (client *Client) UnsafePush(data []byte, routingKey string) error {
|
||||||
|
if !client.isReady {
|
||||||
|
return errNotConnected
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// exchange
|
||||||
|
if client.exchangeSettings.Name != "" {
|
||||||
|
return client.Channel.PublishWithContext(
|
||||||
|
ctx,
|
||||||
|
client.exchangeSettings.Name, // exchange
|
||||||
|
routingKey, // routing key
|
||||||
|
client.exchangeSettings.ExchangePublishSettings.Mandatory,
|
||||||
|
client.exchangeSettings.ExchangePublishSettings.Immediate,
|
||||||
|
amqp.Publishing{
|
||||||
|
ContentType: client.exchangeSettings.ExchangePublishSettings.ContentType,
|
||||||
|
Body: data,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// queue
|
||||||
|
return client.Channel.PublishWithContext(
|
||||||
|
ctx,
|
||||||
|
"", // exchange
|
||||||
|
client.queueSettings.Name, // routing key
|
||||||
|
client.queueSettings.QueuePublishSettings.Mandatory,
|
||||||
|
client.queueSettings.QueuePublishSettings.Immediate,
|
||||||
|
amqp.Publishing{
|
||||||
|
DeliveryMode: client.queueSettings.QueuePublishSettings.DeliveryMode,
|
||||||
|
ContentType: client.queueSettings.QueuePublishSettings.ContentType,
|
||||||
|
Body: data,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// can be used to open additional queues after client opening
|
||||||
|
func (client *Client) QueueDeclare(queueDeclareSettings QueueDeclareSettings) (amqp.Queue, error) {
|
||||||
|
if !client.isReady {
|
||||||
|
return amqp.Queue{}, errNotConnected
|
||||||
|
}
|
||||||
|
|
||||||
|
return client.Channel.QueueDeclare(
|
||||||
|
queueDeclareSettings.Name,
|
||||||
|
queueDeclareSettings.Durable,
|
||||||
|
queueDeclareSettings.DeleteWhenUnused,
|
||||||
|
queueDeclareSettings.Exclusive,
|
||||||
|
queueDeclareSettings.NoWait,
|
||||||
|
queueDeclareSettings.Arguments,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// can be used to specify further queue bindings after client opening
|
||||||
|
func (client *Client) QueueBind(queueBindSettings QueueBindSettings) error {
|
||||||
|
if !client.isReady {
|
||||||
|
return errNotConnected
|
||||||
|
}
|
||||||
|
|
||||||
|
return client.Channel.QueueBind(
|
||||||
|
queueBindSettings.QueueName,
|
||||||
|
queueBindSettings.RoutingKey,
|
||||||
|
queueBindSettings.Exchange,
|
||||||
|
queueBindSettings.NoWait,
|
||||||
|
queueBindSettings.Arguments,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Consume will continuously put queue items on the channel.
|
||||||
|
// It is required to call delivery.Ack when it has been
|
||||||
|
// successfully processed, or delivery.Nack when it fails.
|
||||||
|
// Ignoring this will cause data to build up on the server.
|
||||||
|
func (client *Client) ConsumeChannelMessages(channelConsumeSettings ChannelConsumeSettings) (<-chan amqp.Delivery, error) {
|
||||||
|
if !client.isReady {
|
||||||
|
return nil, errNotConnected
|
||||||
|
}
|
||||||
|
|
||||||
|
return client.Channel.Consume(
|
||||||
|
channelConsumeSettings.QueueName,
|
||||||
|
channelConsumeSettings.Consumer,
|
||||||
|
channelConsumeSettings.AutoAck,
|
||||||
|
channelConsumeSettings.Exclusive,
|
||||||
|
channelConsumeSettings.NoLocal,
|
||||||
|
channelConsumeSettings.NoWait,
|
||||||
|
channelConsumeSettings.Arguments,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close will cleanly shutdown the channel and connection.
|
||||||
|
func (client *Client) Close() error {
|
||||||
|
if !client.isReady {
|
||||||
|
return errAlreadyClosed
|
||||||
|
}
|
||||||
|
|
||||||
|
close(client.done)
|
||||||
|
|
||||||
|
err := client.Channel.Close()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = client.connection.Close()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
client.isReady = false
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
Loading…
Reference in New Issue