ensure that messages are received through ack

master
Alex 2022-01-30 14:24:45 +01:00
parent ffcb9308ff
commit a2fd670878
9 changed files with 78 additions and 47 deletions

View File

@ -6,6 +6,7 @@ import (
"krakatoa.net/backend/modules/configs/serverConfig"
"krakatoa.net/backend/modules/logger"
"krakatoa.net/backend/modules/mongo"
"krakatoa.net/backend/serverCommunication"
"krakatoa.net/backend/servers/minecraft"
"krakatoa.net/backend/servers/web"
)
@ -25,6 +26,7 @@ func init() {
func main() {
web.RunServer()
go web.Test()
go serverCommunication.AckHandler()
minecraft.RunServer()
}

View File

@ -4,15 +4,3 @@ import "krakatoa.net/backend/modules/structs"
var WebClients = make(map[string]*structs.WebClient)
var MinecraftClients = make(map[string]*structs.MinecraftClient)
var WebVoiceMessagesSendQueue = make(map[int]*structs.WebClientSendQueueMessage)
var WebMobileMessagesSendQueue = make(map[int]*structs.WebClientSendQueueMessage)
func IsInWebMessagesSendQueue(queue map[int]*structs.WebClientSendQueueMessage, cmdID int) bool {
for CmdID, _ := range queue {
if CmdID == cmdID {
return true
}
}
return false
}

View File

@ -14,15 +14,15 @@ const (
StatusErrTryAgain = 14
StatusErrArgLenTooBig = 15
StatusReply = 16
)
const (
DestBackend = 1
DestVoice = 2
DestMobile = 3
DestProxy = 10
DestPlayersCurrentServer = 11
DefaultCmdID = 1
)
func EncodeWebMessage(status int, cmdID int, cmdNumber int, args interface{}) []byte {

View File

@ -8,11 +8,16 @@ import (
)
type MinecraftClient struct {
Name string
Conn *websocket.Conn
connMu sync.Mutex
CmdIDs []int
cmdIDMu sync.Mutex
Name string
Conn *websocket.Conn
connMu sync.Mutex
CmdIDs []int // cache for received cmdIDs
cmdIDMu sync.Mutex
CmdIDsByBackend []*A
CmdIDsByBackendMu sync.Mutex
CurrentCmdIDIndexByBackend int
SendQueueMessages []*SendQueueMessage
sendQueueMessagesMu sync.Mutex
}
func (mcClient *MinecraftClient) SendBinaryMessage(msg []byte) error {
@ -33,3 +38,28 @@ func (mcClient *MinecraftClient) RemoveCmdID(cmdID int) {
mcClient.CmdIDs = newArr
}
}
func (mcClient *MinecraftClient) GenerateMinecraftCmdID() int {
if mcClient.CurrentCmdIDIndexByBackend >= 4294967296 || mcClient.CurrentCmdIDIndexByBackend < 10 {
return 10
}
mcClient.CurrentCmdIDIndexByBackend++
return mcClient.CurrentCmdIDIndexByBackend
}
func (mcClient *MinecraftClient) RemoveSendMessageFromQueueByCmdID(cmdID int) {
mcClient.sendQueueMessagesMu.Lock()
defer mcClient.sendQueueMessagesMu.Unlock()
for index, message := range mcClient.SendQueueMessages {
if message.CmdID == cmdID {
logger.Minecraft.Debugln("RemoveSendMessageFromQueueByCmdID before", mcClient.SendQueueMessages)
newArr := append(mcClient.SendQueueMessages[:index], mcClient.SendQueueMessages[index+1:]...)
mcClient.SendQueueMessages = newArr
logger.Minecraft.Debugln("RemoveSendMessageFromQueueByCmdID after", mcClient.SendQueueMessages)
break
}
}
}

View File

@ -0,0 +1,10 @@
package structs
import "time"
type SendQueueMessage struct {
MessageRaw []byte
CmdID int
TrySendCount int
Time time.Time
}

View File

@ -3,7 +3,6 @@ package structs
import (
"errors"
"sync"
"time"
"github.com/gofiber/websocket/v2"
"krakatoa.net/backend/modules/logger"
@ -25,25 +24,20 @@ type WebClient struct {
Uuid string
MobileConn *websocket.Conn
mobileConnMu sync.Mutex
MobileCmdIDs []int
MobileCmdIDs []int // cache for received cmdIDs
mobileCmdIDMu sync.Mutex
MobileCmdIDsByBackend []*A
mobileCmdIDsByBackendMu sync.Mutex
CurrentMobileVoiceCmdIDIndexByBackend int
VoiceConn *websocket.Conn
voiceConnMu sync.Mutex
VoiceCmdIDs []int
VoiceCmdIDs []int // cache for received cmdIDs
voiceCmdIDMu sync.Mutex
VoiceCmdIDsByBackend []*A // messages from backend to voice, when response from voice then response to target requester
voiceCmdIDsByBackendMu sync.Mutex
CurrentVoiceCmdIDIndexByBackend int
}
type WebClientSendQueueMessage struct {
MessageRaw []byte
CmdID int
TrySendCount int
Time time.Time
SendVoiceQueueMessages []*SendQueueMessage
SendMobileQueueMessages []*SendQueueMessage
}
func (webClient *WebClient) SendBinaryMessage(conn *websocket.Conn, msg []byte) error {

View File

@ -2,6 +2,7 @@ package serverCommunication
import (
"errors"
"time"
"github.com/gofiber/websocket/v2"
"krakatoa.net/backend/modules/cache"
@ -108,6 +109,12 @@ func HandleMinecraftMessage(conn *websocket.Conn, msg []byte) {
}
raw = kraProtocol.EncodeWebMessage(kraProtocol.StatusGet, webCmdID, cmdNumber, args)
if dest == kraProtocol.DestVoice {
webClient.SendVoiceQueueMessages = append(webClient.SendVoiceQueueMessages, &structs.SendQueueMessage{MessageRaw: raw, CmdID: webCmdID, TrySendCount: 0, Time: time.Now()})
} else {
webClient.SendMobileQueueMessages = append(webClient.SendMobileQueueMessages, &structs.SendQueueMessage{MessageRaw: raw, CmdID: webCmdID, TrySendCount: 0, Time: time.Now()})
}
} else {
raw = kraProtocol.EncodeWebMessage(kraProtocol.StatusSend, 0, cmdNumber, args)
}
@ -198,8 +205,7 @@ func SendMessageToServer(dest int, playerUuid string, args string) {
}
func SendMessageToMinecraftServer(status int, dest int, playerUuid string, cmdNumber int, args string) error {
cmdID := 0
raw := kraProtocol.EncodeJavaMessage(status, cmdID, dest, playerUuid, cmdNumber, args)
cmdID := kraProtocol.DefaultCmdID
serverName := getMinecraftServerNameByDest(dest)
@ -209,11 +215,22 @@ func SendMessageToMinecraftServer(status int, dest int, playerUuid string, cmdNu
return errors.New("mcClient nil")
}
if status == kraProtocol.StatusGet {
cmdID = mcClient.GenerateMinecraftCmdID()
}
raw := kraProtocol.EncodeJavaMessage(status, cmdID, dest, playerUuid, cmdNumber, args)
err := mcClient.SendBinaryMessage(raw)
if err != nil {
logger.Minecraft.Warnln("err", err)
return errors.New("write err: " + err.Error())
}
if status == kraProtocol.StatusGet {
mcClient.SendQueueMessages = append(mcClient.SendQueueMessages, &structs.SendQueueMessage{MessageRaw: raw, CmdID: cmdID, TrySendCount: 0, Time: time.Now()})
}
return nil
}

View File

@ -1,7 +1,6 @@
package serverCommunication
import (
"fmt"
"time"
"krakatoa.net/backend/modules/cache"
@ -30,10 +29,13 @@ func AckHandler() {
ticker := time.NewTicker(20 * time.Millisecond)
for _ = range ticker.C {
fmt.Println("")
for uuid, webClient := range cache.WebClients {
logger.Web.Println("ackHandler web client", uuid, webClient)
for cmdID, object := range cache.WebVoiceMessagesSendQueue {
logger.Web.Println("ackHandler", cmdID, object)
}
for serverName, mcClient := range cache.MinecraftClients {
logger.Web.Println("ackHandler mc client", serverName, mcClient)
}
}
}

View File

@ -58,10 +58,6 @@ func HandleWebMessage(isVoice bool, conn *websocket.Conn, uuid string, msg []byt
if isCmdIDInList(webClient.VoiceCmdIDs, cmdID) {
webClient.RemoveVoiceCmdID(cmdID)
}
if cache.IsInWebMessagesSendQueue(cache.WebVoiceMessagesSendQueue, cmdID) {
delete(cache.WebVoiceMessagesSendQueue, cmdID)
}
} else {
for _, data := range webClient.MobileCmdIDsByBackend {
if data.WebCmdID == cmdID { // forward to target dest
@ -148,10 +144,6 @@ func HandleWebMessage(isVoice bool, conn *websocket.Conn, uuid string, msg []byt
if err != nil {
logger.WebVoice.Warnln("write:", err)
}
if status == kraProtocol.StatusGet {
cache.WebVoiceMessagesSendQueue[cmdID] = &structs.WebClientSendQueueMessage{MessageRaw: raw, CmdID: cmdID, TrySendCount: 0, Time: time.Now()}
}
} else { // mobile not connected
raw = kraProtocol.EncodeWebMessage(kraProtocol.StatusSend, 58299, cmdNumber, "")
@ -185,10 +177,6 @@ func HandleWebMessage(isVoice bool, conn *websocket.Conn, uuid string, msg []byt
if err != nil {
logger.WebMobile.Warnln("write:", err)
}
if status == kraProtocol.StatusGet {
cache.WebMobileMessagesSendQueue[cmdID] = &structs.WebClientSendQueueMessage{MessageRaw: raw, CmdID: cmdID, TrySendCount: 0, Time: time.Now()}
}
} else { // voice not connected
raw = kraProtocol.EncodeWebMessage(kraProtocol.StatusSend, 5456, cmdNumber, "")