parent
3d75bf7950
commit
4eaf7b7b13
483
client.go
483
client.go
|
@ -1,483 +0,0 @@
|
|||
package gocnjhelper
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
var (
|
||||
errNotConnected = errors.New("not connected to a server")
|
||||
errAlreadyClosed = errors.New("already closed: not connected to the server")
|
||||
errShutdown = errors.New("client is shutting down")
|
||||
)
|
||||
|
||||
const (
|
||||
DeliveryModeTransient = amqp.Transient
|
||||
DeliveryModePersistent = amqp.Persistent
|
||||
|
||||
ContentTypeJson = "application/json"
|
||||
|
||||
SetQueueNameToAutomaticallyAssignedQueueName = "SetQueueNameToAutomaticallyAssignedQueueName"
|
||||
Test123 = "test1234"
|
||||
TestHaha = "testhaha"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
ReconnectDelay time.Duration
|
||||
ReInitDelay time.Duration
|
||||
ResendDelay time.Duration
|
||||
}
|
||||
|
||||
var ConfigDefault = Config{
|
||||
ReconnectDelay: 5 * time.Second,
|
||||
ReInitDelay: 2 * time.Second,
|
||||
ResendDelay: 5 * time.Second,
|
||||
}
|
||||
|
||||
var ChannelQosSettingsDefault = ChannelQosSettings{
|
||||
PrefetchCount: 1,
|
||||
PrefetchSize: 0,
|
||||
Global: false,
|
||||
}
|
||||
|
||||
type ExchangeSettings struct {
|
||||
Enabled bool
|
||||
Name string
|
||||
Type string
|
||||
Durable bool
|
||||
AutoDeleted bool
|
||||
Internal bool
|
||||
NoWait bool
|
||||
Arguments amqp.Table
|
||||
ExchangePublishSettings ExchangePublishSettings
|
||||
}
|
||||
|
||||
type ExchangePublishSettings struct {
|
||||
Mandatory bool
|
||||
Immediate bool
|
||||
ContentType string
|
||||
}
|
||||
|
||||
type QueueSettings struct {
|
||||
Enabled bool
|
||||
Name string
|
||||
Durable bool
|
||||
DeleteWhenUnused bool
|
||||
Exclusive bool
|
||||
NoWait bool
|
||||
Arguments amqp.Table
|
||||
QueuePublishSettings QueuePublishSettings
|
||||
QueueBindSettings QueueBindSettings
|
||||
}
|
||||
|
||||
type QueuePublishSettings struct {
|
||||
Mandatory bool
|
||||
Immediate bool
|
||||
DeliveryMode uint8
|
||||
ContentType string
|
||||
}
|
||||
|
||||
type ChannelConsumeSettings struct {
|
||||
QueueName string
|
||||
Consumer string
|
||||
AutoAck bool
|
||||
Exclusive bool
|
||||
NoLocal bool
|
||||
NoWait bool
|
||||
Arguments amqp.Table
|
||||
}
|
||||
|
||||
type QueueDeclareSettings struct {
|
||||
Name string
|
||||
Durable bool
|
||||
DeleteWhenUnused bool
|
||||
Exclusive bool
|
||||
NoWait bool
|
||||
Arguments amqp.Table
|
||||
}
|
||||
|
||||
type QueueBindSettings struct {
|
||||
Enabled bool
|
||||
QueueName string
|
||||
RoutingKey string
|
||||
Exchange string
|
||||
NoWait bool
|
||||
Arguments amqp.Table
|
||||
}
|
||||
|
||||
type ChannelQosSettings struct {
|
||||
PrefetchCount int
|
||||
PrefetchSize int
|
||||
Global bool
|
||||
}
|
||||
|
||||
type Client struct {
|
||||
exchangeSettings ExchangeSettings
|
||||
queueSettings QueueSettings
|
||||
channelQosSettings ChannelQosSettings
|
||||
AssignedQueueName string // automatically assigned queue name if none is specified
|
||||
config Config
|
||||
logger zerolog.Logger
|
||||
connection *amqp.Connection
|
||||
Channel *amqp.Channel
|
||||
done chan bool
|
||||
notifyConnClose chan *amqp.Error
|
||||
notifyChanClose chan *amqp.Error
|
||||
notifyConfirm chan amqp.Confirmation
|
||||
isReady bool
|
||||
}
|
||||
|
||||
func GetConnectionString(username string, password string, host string) string {
|
||||
return fmt.Sprintf("amqp://%s:%s@%s/", username, password, host)
|
||||
}
|
||||
|
||||
// New creates a new consumer state instance, and automatically
|
||||
// attempts to connect to the server.
|
||||
func NewClient(exchangeSettings ExchangeSettings, queueSettings QueueSettings, channelQosSettings ChannelQosSettings, config Config, addr string) *Client {
|
||||
client := Client{
|
||||
logger: zerolog.New(os.Stdout).With().Logger(),
|
||||
exchangeSettings: exchangeSettings,
|
||||
queueSettings: queueSettings,
|
||||
channelQosSettings: channelQosSettings,
|
||||
config: config,
|
||||
done: make(chan bool),
|
||||
}
|
||||
|
||||
go client.handleReconnect(addr)
|
||||
return &client
|
||||
}
|
||||
|
||||
// handleReconnect will wait for a connection error on
|
||||
// notifyConnClose, and then continuously attempt to reconnect.
|
||||
func (client *Client) handleReconnect(addr string) {
|
||||
for {
|
||||
client.isReady = false
|
||||
client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Attempting to connect")
|
||||
|
||||
conn, err := client.connect(addr)
|
||||
|
||||
if err != nil {
|
||||
client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Failed to connect. Retrying...")
|
||||
|
||||
select {
|
||||
case <-client.done:
|
||||
return
|
||||
case <-time.After(client.config.ReconnectDelay):
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if done := client.handleReInit(conn); done {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// connect will create a new AMQP connection
|
||||
func (client *Client) connect(addr string) (*amqp.Connection, error) {
|
||||
conn, err := amqp.Dial(addr)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client.changeConnection(conn)
|
||||
client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Connected!")
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
// handleReconnect will wait for a channel error
|
||||
// and then continuously attempt to re-initialize both channels
|
||||
func (client *Client) handleReInit(conn *amqp.Connection) bool {
|
||||
for {
|
||||
client.isReady = false
|
||||
|
||||
err := client.init(conn)
|
||||
|
||||
if err != nil {
|
||||
client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Failed to initialize channel. Retrying...")
|
||||
|
||||
select {
|
||||
case <-client.done:
|
||||
return true
|
||||
case <-time.After(client.config.ReInitDelay):
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case <-client.done:
|
||||
return true
|
||||
case <-client.notifyConnClose:
|
||||
client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Connection closed. Reconnecting...")
|
||||
return false
|
||||
case <-client.notifyChanClose:
|
||||
client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Channel closed. Re-running init...")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// init will initialize channel & declare queue
|
||||
func (client *Client) init(conn *amqp.Connection) error {
|
||||
ch, err := conn.Channel()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = ch.Confirm(false)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = ch.Qos(
|
||||
client.channelQosSettings.PrefetchCount,
|
||||
client.channelQosSettings.PrefetchSize,
|
||||
client.channelQosSettings.Global,
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if client.exchangeSettings.Enabled {
|
||||
err = ch.ExchangeDeclare(
|
||||
client.exchangeSettings.Name,
|
||||
client.exchangeSettings.Type,
|
||||
client.exchangeSettings.Durable,
|
||||
client.exchangeSettings.AutoDeleted,
|
||||
client.exchangeSettings.Internal,
|
||||
client.exchangeSettings.NoWait,
|
||||
client.exchangeSettings.Arguments,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if client.queueSettings.Enabled {
|
||||
q, err := ch.QueueDeclare(
|
||||
client.queueSettings.Name,
|
||||
client.queueSettings.Durable,
|
||||
client.queueSettings.DeleteWhenUnused,
|
||||
client.queueSettings.Exclusive,
|
||||
client.queueSettings.NoWait,
|
||||
client.queueSettings.Arguments,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if client.queueSettings.QueueBindSettings.Enabled {
|
||||
queueBindName := client.queueSettings.QueueBindSettings.QueueName
|
||||
|
||||
if client.queueSettings.QueueBindSettings.QueueName == SetQueueNameToAutomaticallyAssignedQueueName {
|
||||
queueBindName = q.Name
|
||||
}
|
||||
|
||||
err = ch.QueueBind(
|
||||
queueBindName,
|
||||
client.queueSettings.QueueBindSettings.RoutingKey,
|
||||
client.queueSettings.QueueBindSettings.Exchange,
|
||||
client.queueSettings.QueueBindSettings.NoWait,
|
||||
client.queueSettings.QueueBindSettings.Arguments,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
client.AssignedQueueName = q.Name
|
||||
}
|
||||
|
||||
client.changeChannel(ch)
|
||||
|
||||
client.isReady = true
|
||||
client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Setup!")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// changeConnection takes a new connection to the queue,
|
||||
// and updates the close listener to reflect this.
|
||||
func (client *Client) changeConnection(connection *amqp.Connection) {
|
||||
client.connection = connection
|
||||
client.notifyConnClose = make(chan *amqp.Error, 1)
|
||||
client.connection.NotifyClose(client.notifyConnClose)
|
||||
}
|
||||
|
||||
// changeChannel takes a new channel to the queue,
|
||||
// and updates the channel listeners to reflect this.
|
||||
func (client *Client) changeChannel(channel *amqp.Channel) {
|
||||
client.Channel = channel
|
||||
client.notifyChanClose = make(chan *amqp.Error, 1)
|
||||
client.notifyConfirm = make(chan amqp.Confirmation, 1)
|
||||
client.Channel.NotifyClose(client.notifyChanClose)
|
||||
client.Channel.NotifyPublish(client.notifyConfirm)
|
||||
}
|
||||
|
||||
// Push will push data onto the queue, and wait for a confirm.
|
||||
// If no confirms are received until within the resendTimeout,
|
||||
// it continuously re-sends messages until a confirm is received.
|
||||
// This will block until the server sends a confirm. Errors are
|
||||
// only returned if the push action itself fails, see UnsafePush.
|
||||
func (client *Client) PushExchangeMessage(routingKey string, data []byte) error {
|
||||
if !client.isReady {
|
||||
return errors.New("failed to push: not connected")
|
||||
}
|
||||
|
||||
for {
|
||||
err := client.UnsafePush(data, routingKey)
|
||||
|
||||
if err != nil {
|
||||
client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Push failed. Retrying...")
|
||||
|
||||
select {
|
||||
case <-client.done:
|
||||
return errShutdown
|
||||
case <-time.After(client.config.ResendDelay):
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case confirm := <-client.notifyConfirm:
|
||||
if confirm.Ack {
|
||||
client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Push confirmed!")
|
||||
return nil
|
||||
}
|
||||
case <-time.After(client.config.ResendDelay):
|
||||
}
|
||||
|
||||
client.logger.Debug().Str("lib", "go-cnj-helper").Msg("Push didn't confirm. Retrying...")
|
||||
}
|
||||
}
|
||||
|
||||
func (client *Client) PushQueueMessage(data []byte) error {
|
||||
return client.PushExchangeMessage("", data)
|
||||
}
|
||||
|
||||
// UnsafePush will push to the queue without checking for
|
||||
// confirmation. It returns an error if it fails to connect.
|
||||
// No guarantees are provided for whether the server will
|
||||
// receive the message.
|
||||
func (client *Client) UnsafePush(data []byte, routingKey string) error {
|
||||
if !client.isReady {
|
||||
return errNotConnected
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// exchange
|
||||
if client.exchangeSettings.Name != "" {
|
||||
return client.Channel.PublishWithContext(
|
||||
ctx,
|
||||
client.exchangeSettings.Name, // exchange
|
||||
routingKey, // routing key
|
||||
client.exchangeSettings.ExchangePublishSettings.Mandatory,
|
||||
client.exchangeSettings.ExchangePublishSettings.Immediate,
|
||||
amqp.Publishing{
|
||||
ContentType: client.exchangeSettings.ExchangePublishSettings.ContentType,
|
||||
Body: data,
|
||||
})
|
||||
}
|
||||
|
||||
// queue
|
||||
return client.Channel.PublishWithContext(
|
||||
ctx,
|
||||
"", // exchange
|
||||
client.queueSettings.Name, // routing key
|
||||
client.queueSettings.QueuePublishSettings.Mandatory,
|
||||
client.queueSettings.QueuePublishSettings.Immediate,
|
||||
amqp.Publishing{
|
||||
DeliveryMode: client.queueSettings.QueuePublishSettings.DeliveryMode,
|
||||
ContentType: client.queueSettings.QueuePublishSettings.ContentType,
|
||||
Body: data,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// can be used to open additional queues after client opening
|
||||
func (client *Client) QueueDeclare(queueDeclareSettings QueueDeclareSettings) (amqp.Queue, error) {
|
||||
if !client.isReady {
|
||||
return amqp.Queue{}, errNotConnected
|
||||
}
|
||||
|
||||
return client.Channel.QueueDeclare(
|
||||
queueDeclareSettings.Name,
|
||||
queueDeclareSettings.Durable,
|
||||
queueDeclareSettings.DeleteWhenUnused,
|
||||
queueDeclareSettings.Exclusive,
|
||||
queueDeclareSettings.NoWait,
|
||||
queueDeclareSettings.Arguments,
|
||||
)
|
||||
}
|
||||
|
||||
// can be used to specify further queue bindings after client opening
|
||||
func (client *Client) QueueBind(queueBindSettings QueueBindSettings) error {
|
||||
if !client.isReady {
|
||||
return errNotConnected
|
||||
}
|
||||
|
||||
return client.Channel.QueueBind(
|
||||
queueBindSettings.QueueName,
|
||||
queueBindSettings.RoutingKey,
|
||||
queueBindSettings.Exchange,
|
||||
queueBindSettings.NoWait,
|
||||
queueBindSettings.Arguments,
|
||||
)
|
||||
}
|
||||
|
||||
// Consume will continuously put queue items on the channel.
|
||||
// It is required to call delivery.Ack when it has been
|
||||
// successfully processed, or delivery.Nack when it fails.
|
||||
// Ignoring this will cause data to build up on the server.
|
||||
func (client *Client) ConsumeChannelMessages(channelConsumeSettings ChannelConsumeSettings) (<-chan amqp.Delivery, error) {
|
||||
if !client.isReady {
|
||||
return nil, errNotConnected
|
||||
}
|
||||
|
||||
return client.Channel.Consume(
|
||||
channelConsumeSettings.QueueName,
|
||||
channelConsumeSettings.Consumer,
|
||||
channelConsumeSettings.AutoAck,
|
||||
channelConsumeSettings.Exclusive,
|
||||
channelConsumeSettings.NoLocal,
|
||||
channelConsumeSettings.NoWait,
|
||||
channelConsumeSettings.Arguments,
|
||||
)
|
||||
}
|
||||
|
||||
// Close will cleanly shutdown the channel and connection.
|
||||
func (client *Client) Close() error {
|
||||
if !client.isReady {
|
||||
return errAlreadyClosed
|
||||
}
|
||||
|
||||
close(client.done)
|
||||
|
||||
err := client.Channel.Close()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = client.connection.Close()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
client.isReady = false
|
||||
|
||||
return nil
|
||||
}
|
Loading…
Reference in New Issue