added func to manually declare queue and bind queue

alpha v1.0.13
alex 2023-01-08 11:34:48 +01:00
parent e65a271af0
commit 2b4a1de3cf
1 changed files with 90 additions and 36 deletions

126
client.go
View File

@ -38,6 +38,12 @@ var ConfigDefault = Config{
ResendDelay: 5 * time.Second,
}
var ChannelQosSettingsDefault = ChannelQosSettings{
PrefetchCount: 1,
PrefetchSize: 0,
Global: false,
}
type ExchangeSettings struct {
Name string
Type string
@ -63,7 +69,6 @@ type QueueSettings struct {
NoWait bool
Arguments amqp.Table
QueuePublishSettings QueuePublishSettings
QueueConsumeSettings QueueConsumeSettings
QueueBindSettings QueueBindSettings
}
@ -74,7 +79,8 @@ type QueuePublishSettings struct {
ContentType string
}
type QueueConsumeSettings struct {
type ChannelConsumeSettings struct {
QueueName string
Consumer string
AutoAck bool
Exclusive bool
@ -83,6 +89,15 @@ type QueueConsumeSettings struct {
Arguments amqp.Table
}
type QueueDeclareSettings struct {
Name string
Durable bool
DeleteWhenUnused bool
Exclusive bool
NoWait bool
Arguments amqp.Table
}
type QueueBindSettings struct {
Enabled bool
QueueName string
@ -92,19 +107,26 @@ type QueueBindSettings struct {
Arguments amqp.Table
}
type ChannelQosSettings struct {
PrefetchCount int
PrefetchSize int
Global bool
}
type Client struct {
exchangeSettings ExchangeSettings
queueSettings QueueSettings
AssignedQueueName string // automatically assigned queue name if none is specified
config Config
logger *log.Logger
connection *amqp.Connection
Channel *amqp.Channel
done chan bool
notifyConnClose chan *amqp.Error
notifyChanClose chan *amqp.Error
notifyConfirm chan amqp.Confirmation
isReady bool
exchangeSettings ExchangeSettings
queueSettings QueueSettings
channelQosSettings ChannelQosSettings
AssignedQueueName string // automatically assigned queue name if none is specified
config Config
logger *log.Logger
connection *amqp.Connection
Channel *amqp.Channel
done chan bool
notifyConnClose chan *amqp.Error
notifyChanClose chan *amqp.Error
notifyConfirm chan amqp.Confirmation
isReady bool
}
func GetConnectionString(username string, password string, host string) string {
@ -113,13 +135,14 @@ func GetConnectionString(username string, password string, host string) string {
// New creates a new consumer state instance, and automatically
// attempts to connect to the server.
func NewClient(exchangeSettings ExchangeSettings, queueSettings QueueSettings, config Config, addr string) *Client {
func NewClient(exchangeSettings ExchangeSettings, queueSettings QueueSettings, channelQosSettings ChannelQosSettings, config Config, addr string) *Client {
client := Client{
logger: log.New(os.Stdout, "", log.LstdFlags),
exchangeSettings: exchangeSettings,
queueSettings: queueSettings,
config: config,
done: make(chan bool),
logger: log.New(os.Stdout, "", log.LstdFlags),
exchangeSettings: exchangeSettings,
queueSettings: queueSettings,
channelQosSettings: channelQosSettings,
config: config,
done: make(chan bool),
}
go client.handleReconnect(addr)
@ -210,6 +233,14 @@ func (client *Client) init(conn *amqp.Connection) error {
return err
}
if err = ch.Qos(
client.channelQosSettings.PrefetchCount,
client.channelQosSettings.PrefetchSize,
client.channelQosSettings.Global,
); err != nil {
return err
}
if client.exchangeSettings.Name != "" {
err = ch.ExchangeDeclare(
client.exchangeSettings.Name,
@ -371,31 +402,54 @@ func (client *Client) UnsafePush(data []byte, routingKey string) error {
)
}
// can be used to open additional queues after client opening
func (client *Client) QueueDeclare(queueDeclareSettings QueueDeclareSettings) (amqp.Queue, error) {
if !client.isReady {
return amqp.Queue{}, errNotConnected
}
return client.Channel.QueueDeclare(
queueDeclareSettings.Name,
queueDeclareSettings.Durable,
queueDeclareSettings.DeleteWhenUnused,
queueDeclareSettings.Exclusive,
queueDeclareSettings.NoWait,
queueDeclareSettings.Arguments,
)
}
// can be used to specify further queue bindings after client opening
func (client *Client) QueueBind(queueBindSettings QueueBindSettings) error {
if !client.isReady {
return errNotConnected
}
return client.Channel.QueueBind(
queueBindSettings.QueueName,
queueBindSettings.RoutingKey,
queueBindSettings.Exchange,
queueBindSettings.NoWait,
queueBindSettings.Arguments,
)
}
// Consume will continuously put queue items on the channel.
// It is required to call delivery.Ack when it has been
// successfully processed, or delivery.Nack when it fails.
// Ignoring this will cause data to build up on the server.
func (client *Client) ConsumeQueueMessages() (<-chan amqp.Delivery, error) {
func (client *Client) ConsumeChannelMessages(channelConsumeSettings ChannelConsumeSettings) (<-chan amqp.Delivery, error) {
if !client.isReady {
return nil, errNotConnected
}
if err := client.Channel.Qos(
1,
0,
false,
); err != nil {
return nil, err
}
return client.Channel.Consume(
client.queueSettings.Name,
client.queueSettings.QueueConsumeSettings.Consumer,
client.queueSettings.QueueConsumeSettings.AutoAck,
client.queueSettings.QueueConsumeSettings.Exclusive,
client.queueSettings.QueueConsumeSettings.NoLocal,
client.queueSettings.QueueConsumeSettings.NoWait,
client.queueSettings.QueueConsumeSettings.Arguments,
channelConsumeSettings.QueueName,
channelConsumeSettings.Consumer,
channelConsumeSettings.AutoAck,
channelConsumeSettings.Exclusive,
channelConsumeSettings.NoLocal,
channelConsumeSettings.NoWait,
channelConsumeSettings.Arguments,
)
}