message handling

alpha
alex 2022-12-17 23:47:51 +01:00
parent 34ba77be10
commit cf5e6a2904
4 changed files with 88 additions and 164 deletions

View File

@ -2,6 +2,7 @@ package main
import ( import (
"os" "os"
"time"
"clickandjoin.app/websocketserver/modules/config" "clickandjoin.app/websocketserver/modules/config"
"clickandjoin.app/websocketserver/modules/rabbitmq" "clickandjoin.app/websocketserver/modules/rabbitmq"
@ -37,6 +38,10 @@ func main() {
return fiber.ErrUpgradeRequired return fiber.ErrUpgradeRequired
}) })
// wait so that rabbitmq can connect
// TODO: better way to handle this
time.Sleep(500 * time.Millisecond)
go socketserver.RunHub() go socketserver.RunHub()
socketserver.WebSocketServer(app) socketserver.WebSocketServer(app)

View File

@ -9,6 +9,7 @@ import (
"clickandjoin.app/websocketserver/modules/config" "clickandjoin.app/websocketserver/modules/config"
"clickandjoin.app/websocketserver/modules/structs" "clickandjoin.app/websocketserver/modules/structs"
"github.com/google/uuid"
amqp "github.com/rabbitmq/amqp091-go" amqp "github.com/rabbitmq/amqp091-go"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@ -18,6 +19,8 @@ var Channel *amqp.Channel
var MessagesQueue amqp.Queue var MessagesQueue amqp.Queue
const exchangeWebsocketMessages = "cnj.websocketserver.messages"
func getConnectionString() string { func getConnectionString() string {
cfg := &config.Cfg.RabbitMq cfg := &config.Cfg.RabbitMq
@ -39,65 +42,10 @@ func Init() {
Channel = ch Channel = ch
//declareQueue(ch, "messages", &MessagesQueue) // creates a new exchange if one does not already exist
/*
err = ch.ExchangeDeclare(
"messages", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
logrus.Fatalln("Failed to declare exchange, err:", err)
}
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
logrus.Fatalln("Failed to declare queue, err:", err)
}
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
"messages", // exchange
false,
nil,
)
if err != nil {
logrus.Fatalln("Failed to bind queue, err:", err)
} */
/*msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
logrus.Fatalln("Failed to register consumer, err:", err)
} */
// test
err = ch.ExchangeDeclare( err = ch.ExchangeDeclare(
"test", // name exchangeWebsocketMessages, // name
"direct", // type "direct", // type
true, // durable true, // durable
false, // auto-deleted false, // auto-deleted
@ -109,59 +57,9 @@ func Init() {
if err != nil { if err != nil {
logrus.Fatalln("Failed to declare exchange, err:", err) logrus.Fatalln("Failed to declare exchange, err:", err)
} }
// listening for messages
/*
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
socketclients.BroadcastMessage(d.Body)
}
}()
log.Printf(" [*] Waiting for messages")
<-forever */
/*
msgs, err := ch.Consume(
MessagesQueue.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
logrus.Fatalln("Failed to consume msgs, err:", err)
}
var forever chan struct{}
go func() {
for d := range msgs {
logrus.Println("Received msg", d.Body, string(d.Body))
}
}()
logrus.Println("Waiting for messages")
<-forever */
} }
var msgs chan amqp.Delivery func CreateClientBinding(socketClient *structs.SocketClient, id string) (queueName string, err error) {
func Listener() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}
func CreateClientBinding(id string) (queueName string, err error) {
q, err := Channel.QueueDeclare( q, err := Channel.QueueDeclare(
"", // name "", // name
false, // durable false, // durable
@ -177,21 +75,24 @@ func CreateClientBinding(id string) (queueName string, err error) {
} }
err = Channel.QueueBind( err = Channel.QueueBind(
q.Name, q.Name, // name
id, id, // key
"test", exchangeWebsocketMessages, // exchange
false, false, // no-wait
nil, nil, // arguments
) )
if err != nil { if err != nil {
logrus.Println("Failed to bind queue, err:", err) logrus.Errorln("Failed to bind queue, err:", err)
return "", err return "", err
} }
// set an ID for the consumer to delete the consumer after the socket user has disconnected
socketClient.RabbitMqConsumerId = uuid.New().String()
msgs, err := Channel.Consume( msgs, err := Channel.Consume(
q.Name, // queue q.Name, // queue
"", // consumer socketClient.RabbitMqConsumerId, // consumer
true, // auto ack true, // auto ack
false, // exclusive false, // exclusive
false, // no local false, // no local
@ -200,36 +101,49 @@ func CreateClientBinding(id string) (queueName string, err error) {
) )
if err != nil { if err != nil {
logrus.Println("Failed to register consumer, err:", err) logrus.Errorln("Failed to register consumer, err:", err)
return "", err return "", err
} }
var forever chan struct{}
go func() { go func() {
for d := range msgs { for d := range msgs {
log.Printf(" [x] %s", d.Body) log.Printf(" [x] %s", d.Body)
var receivedMessage structs.ReceivedSocketMessage
err := json.Unmarshal(d.Body, &receivedMessage)
if err != nil {
logrus.Errorln("Failed to unmarshal received msg, err:", err)
}
logrus.Debugln("RABBITMQ RECEIVED MESSAGE:", receivedMessage)
err = socketClient.SendMessage(structs.SendSocketMessage{Cmd: receivedMessage.Cmd, Body: receivedMessage.Body})
if err != nil {
logrus.Errorln("Failed to send message to client, err:", err)
}
} }
}() }()
log.Printf(" [*] Waiting for logs")
<-forever
logrus.Println("binded", id, q.Name)
return q.Name, nil return q.Name, nil
} }
func DeleteClientBinding(qName string, id string) error { func DeleteClient(consumerId string, qName string) error {
err := Channel.QueueUnbind( // delete consumer
qName, err := Channel.Cancel(consumerId, false)
id,
"test",
nil,
)
if err != nil { if err != nil {
logrus.Println("Failed to unbind queue, err:", err) logrus.Errorln("Failed to delete consumer, err:", err)
return err
}
// delete queue and queue binding
_, err = Channel.QueueDelete(qName, false, true, false)
if err != nil {
logrus.Errorln("Failed to delete queue, err:", err)
return err return err
} }
@ -248,7 +162,7 @@ func PublishClientMessage(rabbitMqMessage structs.RabbitMqMessage) error {
defer cancel() defer cancel()
err = Channel.PublishWithContext(ctx, err = Channel.PublishWithContext(ctx,
"test", // exchange exchangeWebsocketMessages, // exchange
rabbitMqMessage.Rec, // routing key rabbitMqMessage.Rec, // routing key
false, // mandatory false, // mandatory
false, // immediate false, // immediate
@ -263,7 +177,6 @@ func PublishClientMessage(rabbitMqMessage structs.RabbitMqMessage) error {
} }
return nil return nil
} }
func PublishBroadcastMessage(rabbitMqMessage structs.RabbitMqMessage) error { func PublishBroadcastMessage(rabbitMqMessage structs.RabbitMqMessage) error {

View File

@ -11,7 +11,8 @@ import (
type SocketClient struct { type SocketClient struct {
Conn *websocket.Conn Conn *websocket.Conn
connMu sync.Mutex connMu sync.Mutex
QName string RabbitMqQueueName string
RabbitMqConsumerId string
} }
type SocketMessage struct { type SocketMessage struct {

View File

@ -22,23 +22,25 @@ func RunHub() {
case newSocketClient := <-register: case newSocketClient := <-register:
uuid := uuid.New().String() uuid := uuid.New().String()
qName, err := rabbitmq.CreateClientBinding(uuid) queueName, err := rabbitmq.CreateClientBinding(newSocketClient, uuid)
logrus.Println("qName", qName) if err != nil {
logrus.Fatalln("Failed to create client binding, err:", err)
if err == nil { break
newSocketClient.QName = qName
} }
newSocketClient.RabbitMqQueueName = queueName
cache.SocketClients[uuid] = newSocketClient cache.SocketClients[uuid] = newSocketClient
logrus.Println("register client", uuid) logrus.Debugln("REGISTER CLIENT:", uuid)
// for testing // for testing
marshaled, err := json.Marshal(structs.SocketMessageTest{Cmd: 99999, Body: uuid}) marshaled, err := json.Marshal(structs.SocketMessageTest{Cmd: 99999, Body: uuid})
if err != nil { if err != nil {
logrus.Errorln("Failed to marshal uuid, err:", err) logrus.Errorln("Failed to marshal uuid, err:", err)
break
} }
newSocketClient.Conn.WriteMessage(websocket.TextMessage, []byte(marshaled)) newSocketClient.Conn.WriteMessage(websocket.TextMessage, []byte(marshaled))
@ -52,30 +54,33 @@ func RunHub() {
logrus.Errorln("Failed err:", err) logrus.Errorln("Failed err:", err)
} }
logrus.Println(recMsg) logrus.Debugln("RECEIVED MESSAGE:", recMsg)
if recMsg.Rec != "" { if recMsg.Rec != "" {
isConnected, recSocketClient := socketclients.IsReceiverConnectedToThisServer(recMsg.Rec) isConnected, recSocketClient := socketclients.IsReceiverConnectedToThisServer(recMsg.Rec)
// send message to target receiver when connected to this server // send message to target receiver when connected to this server
if isConnected { if isConnected {
logrus.Debugln("FORWARDING MESSAGE: receiver is on connected to this server")
recSocketClient.SendMessage(structs.SendSocketMessage{Cmd: recMsg.Cmd, Body: recMsg.Body}) recSocketClient.SendMessage(structs.SendSocketMessage{Cmd: recMsg.Cmd, Body: recMsg.Body})
} else { } else {
logrus.Println("rec not found") logrus.Debugln("FORWARDING MESSAGE: receiver connected to other server")
//rabbitmq.PublishBroadcastMessage(structs.RabbitMqMessage{Cmd: recMsg.Cmd, Rec: recMsg.Rec, Body: recMsg.Body}) err = rabbitmq.PublishClientMessage(structs.RabbitMqMessage{Cmd: recMsg.Cmd, Rec: recMsg.Rec, Body: recMsg.Body})
rabbitmq.PublishClientMessage(structs.RabbitMqMessage{Cmd: recMsg.Cmd, Rec: recMsg.Rec, Body: recMsg.Body})
if err != nil {
logrus.Fatalln("Failed to publish client message, err:", err)
}
} }
} }
case connection := <-unregister: case connection := <-unregister:
logrus.Println("unregister", connection)
for id, client := range cache.SocketClients { for id, client := range cache.SocketClients {
if connection == client.Conn { if connection == client.Conn {
logrus.Debugln("UNREGISTER CLIENT:", id)
delete(cache.SocketClients, id) delete(cache.SocketClients, id)
rabbitmq.DeleteClientBinding(client.QName, id) rabbitmq.DeleteClient(client.RabbitMqConsumerId, client.RabbitMqQueueName)
} }
} }
} }