handling messages

alpha
alex 2022-12-11 22:47:37 +01:00
parent aa02a460c2
commit 6dbf063a22
12 changed files with 185 additions and 26 deletions

1
go.mod
View File

@ -7,6 +7,7 @@ require (
github.com/fasthttp/websocket v1.5.0 // indirect github.com/fasthttp/websocket v1.5.0 // indirect
github.com/gofiber/fiber/v2 v2.40.1 // indirect github.com/gofiber/fiber/v2 v2.40.1 // indirect
github.com/gofiber/websocket/v2 v2.1.2 // indirect github.com/gofiber/websocket/v2 v2.1.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/klauspost/compress v1.15.12 // indirect github.com/klauspost/compress v1.15.12 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect github.com/mattn/go-isatty v0.0.16 // indirect

1
go.sum
View File

@ -8,6 +8,7 @@ github.com/gofiber/fiber/v2 v2.40.1 h1:pc7n9VVpGIqNsvg9IPLQhyFEMJL8gCs1kneH5D1pI
github.com/gofiber/fiber/v2 v2.40.1/go.mod h1:Gko04sLksnHbzLSRBFWPFdzM9Ws9pRxvvIaohJK1dsk= github.com/gofiber/fiber/v2 v2.40.1/go.mod h1:Gko04sLksnHbzLSRBFWPFdzM9Ws9pRxvvIaohJK1dsk=
github.com/gofiber/websocket/v2 v2.1.2 h1:EulKyLB/fJgui5+6c8irwEnYQ9FRsrLZfkrq9OfTDGc= github.com/gofiber/websocket/v2 v2.1.2 h1:EulKyLB/fJgui5+6c8irwEnYQ9FRsrLZfkrq9OfTDGc=
github.com/gofiber/websocket/v2 v2.1.2/go.mod h1:S+sKWo0xeC7Wnz5h4/8f6D/NxsrLFIdWDYB3SyVO9pE= github.com/gofiber/websocket/v2 v2.1.2/go.mod h1:S+sKWo0xeC7Wnz5h4/8f6D/NxsrLFIdWDYB3SyVO9pE=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/compress v1.14.1/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.14.1/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=

View File

@ -1,7 +1,6 @@
package main package main
import ( import (
"log"
"os" "os"
"clickandjoin.app/websocketserver/modules/config" "clickandjoin.app/websocketserver/modules/config"
@ -43,7 +42,7 @@ func main() {
socketserver.WebSocketServer(app) socketserver.WebSocketServer(app)
if len(os.Args) < 2 { if len(os.Args) < 2 {
log.Fatalln("Please specify port") logrus.Fatalln("Please specify port")
} }
app.Listen("127.0.0.1:" + os.Args[1]) app.Listen("127.0.0.1:" + os.Args[1])

View File

@ -2,4 +2,4 @@ package cache
import "clickandjoin.app/websocketserver/modules/structs" import "clickandjoin.app/websocketserver/modules/structs"
var WebSocketClients = make(map[string]*structs.SocketClient) var SocketClients = make(map[string]*structs.SocketClient)

View File

@ -1,10 +1,15 @@
package rabbitmq package rabbitmq
import ( import (
"context"
"encoding/json"
"fmt" "fmt"
"log" "log"
"time"
"clickandjoin.app/websocketserver/modules/config" "clickandjoin.app/websocketserver/modules/config"
"clickandjoin.app/websocketserver/modules/structs"
"clickandjoin.app/websocketserver/socketclients"
amqp "github.com/rabbitmq/amqp091-go" amqp "github.com/rabbitmq/amqp091-go"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@ -95,6 +100,8 @@ func Init() {
go func() { go func() {
for d := range msgs { for d := range msgs {
log.Printf(" [x] %s", d.Body) log.Printf(" [x] %s", d.Body)
socketclients.BroadcastMessage(d.Body)
} }
}() }()
@ -129,6 +136,43 @@ func Init() {
<-forever */ <-forever */
} }
func PublishBroadcastMessage(rabbitMqMessage structs.RabbitMqMessage) error {
msg, err := json.Marshal(rabbitMqMessage)
if err != nil {
logrus.Errorln("Failed to marshal rabbitMqMessage, err:", err)
return err
}
err = publishMessage(msg, MessagesQueue.Name)
return err
}
func publishMessage(body []byte, channelName string) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := Channel.PublishWithContext(ctx,
"messages", // exchange
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: body,
})
if err != nil {
logrus.Errorln("Failed to publish a message, err:", err)
return err
}
logrus.Printf("[x] Sent %s\n", body)
return nil
}
/* /*
func declareQueue(channel *amqp.Channel, name string, queue *amqp.Queue) { func declareQueue(channel *amqp.Channel, name string, queue *amqp.Queue) {
q, err := channel.QueueDeclare( q, err := channel.QueueDeclare(

View File

@ -1,7 +0,0 @@
package structs
import "github.com/gofiber/websocket/v2"
type SocketClient struct {
Conn *websocket.Conn
}

View File

@ -1,8 +0,0 @@
package structs
import "github.com/gofiber/websocket/v2"
type SocketMessage struct {
Conn *websocket.Conn
Msg []byte
}

View File

@ -0,0 +1,7 @@
package structs
type RabbitMqMessage struct {
Cmd int
Rec string
Body any
}

56
modules/structs/socket.go Normal file
View File

@ -0,0 +1,56 @@
package structs
import (
"encoding/json"
"sync"
"github.com/gofiber/websocket/v2"
"github.com/sirupsen/logrus"
)
type SocketClient struct {
Conn *websocket.Conn
connMu sync.Mutex
}
type SocketMessage struct {
Conn *websocket.Conn
Msg []byte
}
func (socketClient *SocketClient) SendMessage(message SendSocketMessage) error {
marshaledMessage, err := json.Marshal(message)
if err != nil {
logrus.Errorln("Failed to send ws message, err:", err)
return err
}
socketClient.connMu.Lock()
defer socketClient.connMu.Unlock()
err = socketClient.Conn.WriteMessage(websocket.TextMessage, marshaledMessage)
if err != nil {
logrus.Errorln("Failed to write ws message, err:", err)
return err
}
return nil
}
type SendSocketMessage struct {
Cmd int
Body any
}
type SocketMessageTest struct {
Cmd int
Body string
}
type ReceivedSocketMessage struct {
Cmd int
Rec string // represent receiver user id
Body any
}

View File

@ -0,0 +1,23 @@
package socketclients
import (
"clickandjoin.app/websocketserver/modules/cache"
"clickandjoin.app/websocketserver/modules/structs"
"github.com/gofiber/websocket/v2"
)
func BroadcastMessage(message []byte) {
for _, client := range cache.SocketClients {
client.Conn.WriteMessage(websocket.TextMessage, message)
}
}
func IsReceiverConnectedToThisServer(recId string) (isConnected bool, socketClient *structs.SocketClient) {
for id, client := range cache.SocketClients {
if id == recId {
return true, client
}
}
return false, socketClient
}

View File

@ -1,8 +1,14 @@
package socketserver package socketserver
import ( import (
"encoding/json"
"clickandjoin.app/websocketserver/modules/cache"
"clickandjoin.app/websocketserver/modules/rabbitmq"
"clickandjoin.app/websocketserver/modules/structs" "clickandjoin.app/websocketserver/modules/structs"
"clickandjoin.app/websocketserver/socketclients"
"github.com/gofiber/websocket/v2" "github.com/gofiber/websocket/v2"
"github.com/google/uuid"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@ -14,15 +20,53 @@ func RunHub() {
for { for {
select { select {
case newSocketClient := <-register: case newSocketClient := <-register:
logrus.Println("register", newSocketClient) uuid := uuid.New().String()
newSocketClient.Conn.WriteMessage(websocket.TextMessage, []byte("Good afternoon")) cache.SocketClients[uuid] = newSocketClient
logrus.Println("register client", uuid)
// for testing
marshaled, err := json.Marshal(structs.SocketMessageTest{Cmd: 99999, Body: uuid})
if err != nil {
logrus.Errorln("Failed to marshal uuid, err:", err)
}
newSocketClient.Conn.WriteMessage(websocket.TextMessage, []byte(marshaled))
case data := <-broadcast: case data := <-broadcast:
logrus.Println("data", data) recMsg := structs.ReceivedSocketMessage{}
err := json.Unmarshal(data.Msg, &recMsg)
if err != nil {
logrus.Errorln("Failed err:", err)
}
logrus.Println(recMsg)
if recMsg.Rec != "" {
isConnected, recSocketClient := socketclients.IsReceiverConnectedToThisServer(recMsg.Rec)
// send message to target receiver when connected to this server
if isConnected {
recSocketClient.SendMessage(structs.SendSocketMessage{Cmd: recMsg.Cmd, Body: recMsg.Body})
} else {
logrus.Println("rec not found")
rabbitmq.PublishBroadcastMessage(structs.RabbitMqMessage{Cmd: recMsg.Cmd, Rec: recMsg.Rec, Body: recMsg.Body})
}
}
case connection := <-unregister: case connection := <-unregister:
logrus.Println("unregister", connection) logrus.Println("unregister", connection)
for id, client := range cache.SocketClients {
if connection == client.Conn {
delete(cache.SocketClients, id)
}
}
} }
} }
} }

View File

@ -1,11 +1,10 @@
package socketserver package socketserver
import ( import (
"log"
"clickandjoin.app/websocketserver/modules/structs" "clickandjoin.app/websocketserver/modules/structs"
"github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2"
"github.com/gofiber/websocket/v2" "github.com/gofiber/websocket/v2"
"github.com/sirupsen/logrus"
) )
func WebSocketServer(app *fiber.App) { func WebSocketServer(app *fiber.App) {
@ -22,16 +21,16 @@ func WebSocketServer(app *fiber.App) {
if err != nil { if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Println("read error:", err) logrus.Errorln("Read err:", err)
} }
return return
} }
if messageType == websocket.BinaryMessage { if messageType == websocket.TextMessage {
broadcast <- structs.SocketMessage{Conn: c, Msg: msg} broadcast <- structs.SocketMessage{Conn: c, Msg: msg}
} else { } else {
log.Println("websocket message received of type", messageType) logrus.Println("websocket message received of type", messageType)
} }
} }
})) }))