added rabbitmq
parent
3b59f215c8
commit
0106ac298b
|
@ -8,3 +8,8 @@ SCYLLADB_HOST=127.0.0.1
|
|||
SCYLLADB_USERNAME=user
|
||||
SCYLLADB_PASSWORD=password
|
||||
SCYLLADB_KEYSPACE=keyspace
|
||||
|
||||
# RabbitMQ
|
||||
RABBITMQ_HOST=127.0.0.1
|
||||
RABBITMQ_USERNAME=guest
|
||||
RABBITMQ_PASSWORD=guest
|
4
go.mod
4
go.mod
|
@ -3,7 +3,7 @@ module clickandjoin.app/managementsystem
|
|||
go 1.18
|
||||
|
||||
require (
|
||||
git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.39
|
||||
git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.43
|
||||
github.com/gocql/gocql v0.0.0-20211015133455-b225f9b53fa1
|
||||
github.com/gofiber/fiber/v2 v2.42.0
|
||||
github.com/joho/godotenv v1.5.0
|
||||
|
@ -20,7 +20,7 @@ require (
|
|||
github.com/mattn/go-isatty v0.0.17 // indirect
|
||||
github.com/mattn/go-runewidth v0.0.14 // indirect
|
||||
github.com/philhofer/fwd v1.1.2 // indirect
|
||||
github.com/rabbitmq/amqp091-go v1.5.0 // indirect
|
||||
github.com/rabbitmq/amqp091-go v1.7.0 // indirect
|
||||
github.com/rivo/uniseg v0.4.3 // indirect
|
||||
github.com/rs/zerolog v1.29.0 // indirect
|
||||
github.com/savsgio/dictpool v0.0.0-20221023140959-7bf2e61cea94 // indirect
|
||||
|
|
15
go.sum
15
go.sum
|
@ -2,6 +2,12 @@ git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.36 h1:tqx39cmvtID3lb
|
|||
git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.36/go.mod h1:RnqJsFYGR0cdxbeQjB1dDlr/MeKgnMkc7XoZtFDhMbo=
|
||||
git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.39 h1:H9YLpd4hvd0wXzKW050cQIHCg6rTchfKTzxp1dgXYLY=
|
||||
git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.39/go.mod h1:Jzc4/4ntrOLMOZYnUjkr1uBCVtRvPbEbQD+8kwBOdf4=
|
||||
git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.41 h1:VuB46ELSlxz3wKSNenic9q8XBjoZ1yMHG6/hA3XJBe8=
|
||||
git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.41/go.mod h1:Jzc4/4ntrOLMOZYnUjkr1uBCVtRvPbEbQD+8kwBOdf4=
|
||||
git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.42 h1:eiZ3QgIEYjziYExD+h4moROVR5KlXdN/cv6NVJmJ6t8=
|
||||
git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.42/go.mod h1:Jzc4/4ntrOLMOZYnUjkr1uBCVtRvPbEbQD+8kwBOdf4=
|
||||
git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.43 h1:5CtlOuz7EWOOYU9SyI7tSYrFpNHm4zmwR+tQ2KhH4rw=
|
||||
git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.43/go.mod h1:Jzc4/4ntrOLMOZYnUjkr1uBCVtRvPbEbQD+8kwBOdf4=
|
||||
github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY=
|
||||
github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
|
||||
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY=
|
||||
|
@ -10,6 +16,7 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4Yn
|
|||
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
|
||||
github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/gocql/gocql v0.0.0-20211015133455-b225f9b53fa1 h1:px9qUCy/RNJNsfCam4m2IxWGxNuimkrioEF0vrrbPsg=
|
||||
github.com/gocql/gocql v0.0.0-20211015133455-b225f9b53fa1/go.mod h1:3gM2c4D3AnkISwBxGnMMsS8Oy4y2lhbPRsH4xnJrHG8=
|
||||
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
|
||||
|
@ -49,6 +56,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
|
|||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/rabbitmq/amqp091-go v1.5.0 h1:VouyHPBu1CrKyJVfteGknGOGCzmOz0zcv/tONLkb7rg=
|
||||
github.com/rabbitmq/amqp091-go v1.5.0/go.mod h1:JsV0ofX5f1nwOGafb8L5rBItt9GyhfQfcJj+oyz0dGg=
|
||||
github.com/rabbitmq/amqp091-go v1.7.0 h1:V5CF5qPem5OGSnEo8BoSbsDGwejg6VUJsKEdneaoTUo=
|
||||
github.com/rabbitmq/amqp091-go v1.7.0/go.mod h1:wfClAtY0C7bOHxd3GjmF26jEHn+rR/0B3+YV+Vn9/NI=
|
||||
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
|
||||
github.com/rivo/uniseg v0.4.3 h1:utMvzDsuh3suAEnhH0RdHmoPbU648o6CvXxTx4SBMOw=
|
||||
github.com/rivo/uniseg v0.4.3/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
|
||||
|
@ -65,8 +74,11 @@ github.com/scylladb/go-reflectx v1.0.1/go.mod h1:rWnOfDIRWBGN0miMLIcoPt/Dhi2doCM
|
|||
github.com/scylladb/gocqlx/v2 v2.8.0 h1:f/oIgoEPjKDKd+RIoeHqexsIQVIbalVmT+axwvUqQUg=
|
||||
github.com/scylladb/gocqlx/v2 v2.8.0/go.mod h1:4/+cga34PVqjhgSoo5Nr2fX1MQIqZB5eCE5DK4xeDig=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
github.com/tinylib/msgp v1.1.6/go.mod h1:75BAfg2hauQhs3qedfdDZmWAPcFMAvJE5b9rGOMufyw=
|
||||
github.com/tinylib/msgp v1.1.8 h1:FCXC1xanKO4I8plpHGH2P7koL/RzZs12l/+r7vakfm0=
|
||||
github.com/tinylib/msgp v1.1.8/go.mod h1:qkpG+2ldGg4xRFmx+jfTvZPxfGFhi64BcnL9vkCm/Tw=
|
||||
|
@ -81,6 +93,8 @@ github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1
|
|||
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
|
||||
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
|
||||
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
|
||||
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
|
||||
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
|
@ -148,3 +162,4 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
|
|||
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
|
||||
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
|
5
main.go
5
main.go
|
@ -16,6 +16,7 @@ package main
|
|||
|
||||
import (
|
||||
"clickandjoin.app/managementsystem/modules/config"
|
||||
"clickandjoin.app/managementsystem/modules/rabbitmq"
|
||||
"clickandjoin.app/managementsystem/modules/scylladb"
|
||||
"clickandjoin.app/managementsystem/routers/router"
|
||||
gocnjhelper "git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper"
|
||||
|
@ -26,9 +27,11 @@ import (
|
|||
func init() {
|
||||
config.LoadConfig()
|
||||
|
||||
gocnjhelper.InitLogger(config.Cfg.Debug, true, false)
|
||||
gocnjhelper.InitLogger(config.Cfg.Debug, true, false, "", "", "")
|
||||
|
||||
scylladb.InitDatabase()
|
||||
|
||||
go rabbitmq.Init()
|
||||
}
|
||||
|
||||
func main() {
|
||||
|
|
|
@ -16,6 +16,7 @@ type Config struct {
|
|||
Port string
|
||||
ManagementSystemApiKey string
|
||||
ScyllaDB ScyllaDB
|
||||
RabbitMq RabbitMq
|
||||
}
|
||||
|
||||
type ScyllaDB struct {
|
||||
|
@ -25,6 +26,12 @@ type ScyllaDB struct {
|
|||
Keyspace string
|
||||
}
|
||||
|
||||
type RabbitMq struct {
|
||||
Host string
|
||||
Username string
|
||||
Password string
|
||||
}
|
||||
|
||||
func LoadConfig() {
|
||||
// argument to start the server locally for development
|
||||
if len(os.Args) > 1 {
|
||||
|
@ -52,6 +59,11 @@ func LoadConfig() {
|
|||
Password: os.Getenv("SCYLLADB_PASSWORD"),
|
||||
Keyspace: os.Getenv("SCYLLADB_KEYSPACE"),
|
||||
},
|
||||
RabbitMq: RabbitMq{
|
||||
Host: os.Getenv("RABBITMQ_HOST"),
|
||||
Username: os.Getenv("RABBITMQ_USERNAME"),
|
||||
Password: os.Getenv("RABBITMQ_PASSWORD"),
|
||||
},
|
||||
}
|
||||
|
||||
Cfg = cfg
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
package rabbitmq
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
gocnjhelper "git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper"
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
func LogsMessagesHandling() {
|
||||
msgs, err := logsMessagesConsumer()
|
||||
|
||||
if err != nil {
|
||||
gocnjhelper.LogErrorf("Failed to create logs queue and binding consumer, err: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
channelClosedChannel := make(chan *amqp.Error, 1)
|
||||
LogsClient.Channel.NotifyClose(channelClosedChannel)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-channelClosedChannel:
|
||||
// This case handles the event of closed channel e.g. abnormal shutdown
|
||||
msgs, err = logsMessagesConsumer()
|
||||
|
||||
if err != nil {
|
||||
// If the AMQP channel is not ready, it will continue the loop. Next
|
||||
// iteration will enter this case because chClosedCh is closed by the
|
||||
// library
|
||||
continue
|
||||
}
|
||||
|
||||
gocnjhelper.LogDebug("Re-set logs channel")
|
||||
|
||||
// Re-set channel to receive notifications
|
||||
// The library closes this channel after abnormal shutdown
|
||||
channelClosedChannel = make(chan *amqp.Error, 1)
|
||||
LogsClient.Channel.NotifyClose(channelClosedChannel)
|
||||
|
||||
case msg := <-msgs:
|
||||
gocnjhelper.LogDebugf("RECV msg %s", msg)
|
||||
|
||||
logMessage := gocnjhelper.RabbitMqLogMessage{}
|
||||
|
||||
if err := json.Unmarshal(msg.Body, &logMessage); err != nil {
|
||||
gocnjhelper.LogErrorf("Failed to unmarshal json message, err: %s", err)
|
||||
continue
|
||||
}
|
||||
|
||||
log.Info().Msgf("msg", logMessage)
|
||||
|
||||
msg.Ack(false)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func logsMessagesConsumer() (msgs <-chan amqp.Delivery, err error) {
|
||||
return LogsClient.ConsumeChannelMessages(gocnjhelper.ChannelConsumeSettings{
|
||||
QueueName: queueLogs,
|
||||
Consumer: "",
|
||||
AutoAck: false,
|
||||
Exclusive: false,
|
||||
NoLocal: false,
|
||||
NoWait: false,
|
||||
Arguments: nil,
|
||||
})
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
package rabbitmq
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"clickandjoin.app/managementsystem/modules/config"
|
||||
gocnjhelper "git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper"
|
||||
)
|
||||
|
||||
const queueLogs = "cnj.logs"
|
||||
|
||||
var LogsClient *gocnjhelper.Client
|
||||
|
||||
func Init() {
|
||||
cfg := config.Cfg
|
||||
|
||||
LogsClient = gocnjhelper.NewClient(
|
||||
gocnjhelper.ExchangeSettings{},
|
||||
gocnjhelper.QueueSettings{},
|
||||
gocnjhelper.ChannelQosSettingsDefault,
|
||||
gocnjhelper.Config{
|
||||
ReconnectDelay: 1 * time.Second,
|
||||
ReInitDelay: 1 * time.Second,
|
||||
ResendDelay: 5 * time.Second,
|
||||
},
|
||||
gocnjhelper.GetConnectionString(
|
||||
cfg.RabbitMq.Username,
|
||||
cfg.RabbitMq.Password,
|
||||
cfg.RabbitMq.Host))
|
||||
|
||||
<-time.After(time.Second)
|
||||
|
||||
LogsMessagesHandling()
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
package structs
|
||||
|
||||
type RabbitMqMessage struct {
|
||||
Cmd int
|
||||
Rec string
|
||||
Body any
|
||||
}
|
||||
|
||||
type RabbitMqMailMessage struct {
|
||||
UserMail string `json:"m"`
|
||||
TemplateId string `json:"t"`
|
||||
LanguageId string `json:"l"`
|
||||
BodyData map[string]interface{} `json:"b"`
|
||||
}
|
Loading…
Reference in New Issue