rabbitmq routing

alpha
alex 2022-12-12 22:42:58 +01:00
parent 6dbf063a22
commit 34ba77be10
3 changed files with 188 additions and 46 deletions

View File

@ -9,7 +9,6 @@ import (
"clickandjoin.app/websocketserver/modules/config" "clickandjoin.app/websocketserver/modules/config"
"clickandjoin.app/websocketserver/modules/structs" "clickandjoin.app/websocketserver/modules/structs"
"clickandjoin.app/websocketserver/socketclients"
amqp "github.com/rabbitmq/amqp091-go" amqp "github.com/rabbitmq/amqp091-go"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@ -41,7 +40,7 @@ func Init() {
Channel = ch Channel = ch
//declareQueue(ch, "messages", &MessagesQueue) //declareQueue(ch, "messages", &MessagesQueue)
/*
err = ch.ExchangeDeclare( err = ch.ExchangeDeclare(
"messages", // name "messages", // name
"fanout", // type "fanout", // type
@ -79,9 +78,9 @@ func Init() {
if err != nil { if err != nil {
logrus.Fatalln("Failed to bind queue, err:", err) logrus.Fatalln("Failed to bind queue, err:", err)
} } */
msgs, err := ch.Consume( /*msgs, err := ch.Consume(
q.Name, // queue q.Name, // queue
"", // consumer "", // consumer
true, // auto-ack true, // auto-ack
@ -93,8 +92,26 @@ func Init() {
if err != nil { if err != nil {
logrus.Fatalln("Failed to register consumer, err:", err) logrus.Fatalln("Failed to register consumer, err:", err)
} */
// test
err = ch.ExchangeDeclare(
"test", // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
logrus.Fatalln("Failed to declare exchange, err:", err)
} }
// listening for messages
/*
var forever chan struct{} var forever chan struct{}
go func() { go func() {
@ -106,7 +123,7 @@ func Init() {
}() }()
log.Printf(" [*] Waiting for messages") log.Printf(" [*] Waiting for messages")
<-forever <-forever */
/* /*
@ -136,6 +153,119 @@ func Init() {
<-forever */ <-forever */
} }
var msgs chan amqp.Delivery
func Listener() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}
func CreateClientBinding(id string) (queueName string, err error) {
q, err := Channel.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
logrus.Errorln("Failed to declare queue, err:", err)
return "", err
}
err = Channel.QueueBind(
q.Name,
id,
"test",
false,
nil,
)
if err != nil {
logrus.Println("Failed to bind queue, err:", err)
return "", err
}
msgs, err := Channel.Consume(
q.Name, // queue
"", // consumer
true, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
if err != nil {
logrus.Println("Failed to register consumer, err:", err)
return "", err
}
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs")
<-forever
logrus.Println("binded", id, q.Name)
return q.Name, nil
}
func DeleteClientBinding(qName string, id string) error {
err := Channel.QueueUnbind(
qName,
id,
"test",
nil,
)
if err != nil {
logrus.Println("Failed to unbind queue, err:", err)
return err
}
return nil
}
func PublishClientMessage(rabbitMqMessage structs.RabbitMqMessage) error {
msg, err := json.Marshal(rabbitMqMessage)
if err != nil {
logrus.Errorln("Failed to marshal rabbitMqMessage, err:", err)
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = Channel.PublishWithContext(ctx,
"test", // exchange
rabbitMqMessage.Rec, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: msg,
})
if err != nil {
logrus.Errorln("Failed to publish client msg, err:", err)
return err
}
return nil
}
func PublishBroadcastMessage(rabbitMqMessage structs.RabbitMqMessage) error { func PublishBroadcastMessage(rabbitMqMessage structs.RabbitMqMessage) error {
msg, err := json.Marshal(rabbitMqMessage) msg, err := json.Marshal(rabbitMqMessage)

View File

@ -11,6 +11,7 @@ import (
type SocketClient struct { type SocketClient struct {
Conn *websocket.Conn Conn *websocket.Conn
connMu sync.Mutex connMu sync.Mutex
QName string
} }
type SocketMessage struct { type SocketMessage struct {

View File

@ -22,6 +22,14 @@ func RunHub() {
case newSocketClient := <-register: case newSocketClient := <-register:
uuid := uuid.New().String() uuid := uuid.New().String()
qName, err := rabbitmq.CreateClientBinding(uuid)
logrus.Println("qName", qName)
if err == nil {
newSocketClient.QName = qName
}
cache.SocketClients[uuid] = newSocketClient cache.SocketClients[uuid] = newSocketClient
logrus.Println("register client", uuid) logrus.Println("register client", uuid)
@ -55,7 +63,8 @@ func RunHub() {
} else { } else {
logrus.Println("rec not found") logrus.Println("rec not found")
rabbitmq.PublishBroadcastMessage(structs.RabbitMqMessage{Cmd: recMsg.Cmd, Rec: recMsg.Rec, Body: recMsg.Body}) //rabbitmq.PublishBroadcastMessage(structs.RabbitMqMessage{Cmd: recMsg.Cmd, Rec: recMsg.Rec, Body: recMsg.Body})
rabbitmq.PublishClientMessage(structs.RabbitMqMessage{Cmd: recMsg.Cmd, Rec: recMsg.Rec, Body: recMsg.Body})
} }
} }
@ -65,6 +74,8 @@ func RunHub() {
for id, client := range cache.SocketClients { for id, client := range cache.SocketClients {
if connection == client.Conn { if connection == client.Conn {
delete(cache.SocketClients, id) delete(cache.SocketClients, id)
rabbitmq.DeleteClientBinding(client.QName, id)
} }
} }
} }