150 lines
2.6 KiB
Go
150 lines
2.6 KiB
Go
package rabbitmq
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
|
|
"clickandjoin.app/websocketserver/modules/config"
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
var Conn *amqp.Connection
|
|
var Channel *amqp.Channel
|
|
|
|
var MessagesQueue amqp.Queue
|
|
|
|
func getConnectionString() string {
|
|
cfg := &config.Cfg.RabbitMq
|
|
|
|
return fmt.Sprintf("amqp://%s:%s@%s/", cfg.Username, cfg.Password, cfg.Host)
|
|
}
|
|
|
|
func Init() {
|
|
conn, err := amqp.Dial(getConnectionString())
|
|
|
|
if err != nil {
|
|
logrus.Fatalln("RabbitMQ connection failed, err:", err)
|
|
}
|
|
|
|
ch, err := conn.Channel()
|
|
|
|
if err != nil {
|
|
logrus.Fatalln(err)
|
|
}
|
|
|
|
Channel = ch
|
|
|
|
//declareQueue(ch, "messages", &MessagesQueue)
|
|
|
|
err = ch.ExchangeDeclare(
|
|
"messages", // name
|
|
"fanout", // type
|
|
true, // durable
|
|
false, // auto-deleted
|
|
false, // internal
|
|
false, // no-wait
|
|
nil, // arguments
|
|
)
|
|
|
|
if err != nil {
|
|
logrus.Fatalln("Failed to declare exchange, err:", err)
|
|
}
|
|
|
|
q, err := ch.QueueDeclare(
|
|
"", // name
|
|
false, // durable
|
|
false, // delete when unused
|
|
true, // exclusive
|
|
false, // no-wait
|
|
nil, // arguments
|
|
)
|
|
|
|
if err != nil {
|
|
logrus.Fatalln("Failed to declare queue, err:", err)
|
|
}
|
|
|
|
err = ch.QueueBind(
|
|
q.Name, // queue name
|
|
"", // routing key
|
|
"messages", // exchange
|
|
false,
|
|
nil,
|
|
)
|
|
|
|
if err != nil {
|
|
logrus.Fatalln("Failed to bind queue, err:", err)
|
|
}
|
|
|
|
msgs, err := ch.Consume(
|
|
q.Name, // queue
|
|
"", // consumer
|
|
true, // auto-ack
|
|
false, // exclusive
|
|
false, // no-local
|
|
false, // no-wait
|
|
nil, // args
|
|
)
|
|
|
|
if err != nil {
|
|
logrus.Fatalln("Failed to register consumer, err:", err)
|
|
}
|
|
|
|
var forever chan struct{}
|
|
|
|
go func() {
|
|
for d := range msgs {
|
|
log.Printf(" [x] %s", d.Body)
|
|
}
|
|
}()
|
|
|
|
log.Printf(" [*] Waiting for messages")
|
|
<-forever
|
|
|
|
/*
|
|
|
|
msgs, err := ch.Consume(
|
|
MessagesQueue.Name, // queue
|
|
"", // consumer
|
|
true, // auto-ack
|
|
false, // exclusive
|
|
false, // no-local
|
|
false, // no-wait
|
|
nil, // args
|
|
)
|
|
|
|
if err != nil {
|
|
logrus.Fatalln("Failed to consume msgs, err:", err)
|
|
}
|
|
|
|
var forever chan struct{}
|
|
|
|
go func() {
|
|
for d := range msgs {
|
|
logrus.Println("Received msg", d.Body, string(d.Body))
|
|
}
|
|
}()
|
|
|
|
logrus.Println("Waiting for messages")
|
|
<-forever */
|
|
}
|
|
|
|
/*
|
|
func declareQueue(channel *amqp.Channel, name string, queue *amqp.Queue) {
|
|
q, err := channel.QueueDeclare(
|
|
name, // name
|
|
true, // durable
|
|
false, // delete when unused
|
|
false, // exclusive
|
|
false, // no-wait
|
|
nil, // arguments
|
|
)
|
|
|
|
if err != nil {
|
|
log.Fatalln("Failed to declare a queue", err)
|
|
}
|
|
|
|
*queue = q
|
|
}
|
|
*/
|