parent
faff3c27c2
commit
8b73e68c7e
33
client.go
33
client.go
|
@ -4,10 +4,11 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -122,7 +123,7 @@ type Client struct {
|
|||
channelQosSettings ChannelQosSettings
|
||||
AssignedQueueName string // automatically assigned queue name if none is specified
|
||||
config Config
|
||||
logger *logrus.Logger
|
||||
logger zerolog.Logger
|
||||
connection *amqp.Connection
|
||||
Channel *amqp.Channel
|
||||
done chan bool
|
||||
|
@ -138,9 +139,9 @@ func GetConnectionString(username string, password string, host string) string {
|
|||
|
||||
// New creates a new consumer state instance, and automatically
|
||||
// attempts to connect to the server.
|
||||
func NewClient(debugMode bool, exchangeSettings ExchangeSettings, queueSettings QueueSettings, channelQosSettings ChannelQosSettings, config Config, addr string) *Client {
|
||||
func NewClient(exchangeSettings ExchangeSettings, queueSettings QueueSettings, channelQosSettings ChannelQosSettings, config Config, addr string) *Client {
|
||||
client := Client{
|
||||
logger: logrus.New(),
|
||||
logger: zerolog.New(os.Stdout).With().Logger(),
|
||||
exchangeSettings: exchangeSettings,
|
||||
queueSettings: queueSettings,
|
||||
channelQosSettings: channelQosSettings,
|
||||
|
@ -148,10 +149,6 @@ func NewClient(debugMode bool, exchangeSettings ExchangeSettings, queueSettings
|
|||
done: make(chan bool),
|
||||
}
|
||||
|
||||
if debugMode {
|
||||
client.logger.SetLevel(logrus.DebugLevel)
|
||||
}
|
||||
|
||||
go client.handleReconnect(addr)
|
||||
return &client
|
||||
}
|
||||
|
@ -161,12 +158,12 @@ func NewClient(debugMode bool, exchangeSettings ExchangeSettings, queueSettings
|
|||
func (client *Client) handleReconnect(addr string) {
|
||||
for {
|
||||
client.isReady = false
|
||||
client.logger.Debugln("Attempting to connect")
|
||||
client.logger.Debug().Msg("Attempting to connect")
|
||||
|
||||
conn, err := client.connect(addr)
|
||||
|
||||
if err != nil {
|
||||
client.logger.Debugln("Failed to connect. Retrying...")
|
||||
client.logger.Debug().Msg("Failed to connect. Retrying...")
|
||||
|
||||
select {
|
||||
case <-client.done:
|
||||
|
@ -191,7 +188,7 @@ func (client *Client) connect(addr string) (*amqp.Connection, error) {
|
|||
}
|
||||
|
||||
client.changeConnection(conn)
|
||||
client.logger.Debugln("Connected!")
|
||||
client.logger.Debug().Msg("Connected!")
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
|
@ -204,7 +201,7 @@ func (client *Client) handleReInit(conn *amqp.Connection) bool {
|
|||
err := client.init(conn)
|
||||
|
||||
if err != nil {
|
||||
client.logger.Debugln("Failed to initialize channel. Retrying...")
|
||||
client.logger.Debug().Msg("Failed to initialize channel. Retrying...")
|
||||
|
||||
select {
|
||||
case <-client.done:
|
||||
|
@ -218,10 +215,10 @@ func (client *Client) handleReInit(conn *amqp.Connection) bool {
|
|||
case <-client.done:
|
||||
return true
|
||||
case <-client.notifyConnClose:
|
||||
client.logger.Debugln("Connection closed. Reconnecting...")
|
||||
client.logger.Debug().Msg("Connection closed. Reconnecting...")
|
||||
return false
|
||||
case <-client.notifyChanClose:
|
||||
client.logger.Debugln("Channel closed. Re-running init...")
|
||||
client.logger.Debug().Msg("Channel closed. Re-running init...")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -304,7 +301,7 @@ func (client *Client) init(conn *amqp.Connection) error {
|
|||
client.changeChannel(ch)
|
||||
|
||||
client.isReady = true
|
||||
client.logger.Debugln("Setup!")
|
||||
client.logger.Debug().Msg("Setup!")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -341,7 +338,7 @@ func (client *Client) PushExchangeMessage(routingKey string, data []byte) error
|
|||
err := client.UnsafePush(data, routingKey)
|
||||
|
||||
if err != nil {
|
||||
client.logger.Debugln("Push failed. Retrying...")
|
||||
client.logger.Debug().Msg("Push failed. Retrying...")
|
||||
|
||||
select {
|
||||
case <-client.done:
|
||||
|
@ -354,13 +351,13 @@ func (client *Client) PushExchangeMessage(routingKey string, data []byte) error
|
|||
select {
|
||||
case confirm := <-client.notifyConfirm:
|
||||
if confirm.Ack {
|
||||
client.logger.Debugln("Push confirmed!")
|
||||
client.logger.Debug().Msg("Push confirmed!")
|
||||
return nil
|
||||
}
|
||||
case <-time.After(client.config.ResendDelay):
|
||||
}
|
||||
|
||||
client.logger.Debugln("Push didn't confirm. Retrying...")
|
||||
client.logger.Debug().Msg("Push didn't confirm. Retrying...")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
8
go.mod
8
go.mod
|
@ -2,11 +2,13 @@ module git.clickandjoin.umbach.dev/ClickandJoin/go-rabbitmq-client
|
|||
|
||||
go 1.19
|
||||
|
||||
require (
|
||||
github.com/rabbitmq/amqp091-go v1.5.0
|
||||
github.com/rs/zerolog v1.29.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/mattn/go-colorable v0.1.13 // indirect
|
||||
github.com/mattn/go-isatty v0.0.17 // indirect
|
||||
github.com/rabbitmq/amqp091-go v1.5.0 // indirect
|
||||
github.com/rs/zerolog v1.29.0 // indirect
|
||||
github.com/sirupsen/logrus v1.9.0 // indirect
|
||||
golang.org/x/sys v0.5.0 // indirect
|
||||
)
|
||||
|
|
6
go.sum
6
go.sum
|
@ -1,6 +1,5 @@
|
|||
github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
|
@ -19,11 +18,10 @@ github.com/rabbitmq/amqp091-go v1.5.0/go.mod h1:JsV0ofX5f1nwOGafb8L5rBItt9GyhfQf
|
|||
github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
|
||||
github.com/rs/zerolog v1.29.0 h1:Zes4hju04hjbvkVkOhdl2HpZa+0PmVwigmo8XoORE5w=
|
||||
github.com/rs/zerolog v1.29.0/go.mod h1:NILgTygv/Uej1ra5XxGf82ZFSLk58MFGAUS2o6usyD0=
|
||||
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
|
||||
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
|
||||
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
|
||||
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
|
@ -42,8 +40,6 @@ golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7w
|
|||
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
|
||||
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
|
||||
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
|
|
Loading…
Reference in New Issue