ensure that messages are received through ack

master
Alex 2022-01-29 23:29:56 +01:00
parent a40b109f8f
commit ffcb9308ff
4 changed files with 59 additions and 6 deletions

View File

@ -4,3 +4,15 @@ import "krakatoa.net/backend/modules/structs"
var WebClients = make(map[string]*structs.WebClient) var WebClients = make(map[string]*structs.WebClient)
var MinecraftClients = make(map[string]*structs.MinecraftClient) var MinecraftClients = make(map[string]*structs.MinecraftClient)
var WebVoiceMessagesSendQueue = make(map[int]*structs.WebClientSendQueueMessage)
var WebMobileMessagesSendQueue = make(map[int]*structs.WebClientSendQueueMessage)
func IsInWebMessagesSendQueue(queue map[int]*structs.WebClientSendQueueMessage, cmdID int) bool {
for CmdID, _ := range queue {
if CmdID == cmdID {
return true
}
}
return false
}

View File

@ -3,6 +3,7 @@ package structs
import ( import (
"errors" "errors"
"sync" "sync"
"time"
"github.com/gofiber/websocket/v2" "github.com/gofiber/websocket/v2"
"krakatoa.net/backend/modules/logger" "krakatoa.net/backend/modules/logger"
@ -38,6 +39,13 @@ type WebClient struct {
CurrentVoiceCmdIDIndexByBackend int CurrentVoiceCmdIDIndexByBackend int
} }
type WebClientSendQueueMessage struct {
MessageRaw []byte
CmdID int
TrySendCount int
Time time.Time
}
func (webClient *WebClient) SendBinaryMessage(conn *websocket.Conn, msg []byte) error { func (webClient *WebClient) SendBinaryMessage(conn *websocket.Conn, msg []byte) error {
if conn == webClient.MobileConn { if conn == webClient.MobileConn {
webClient.mobileConnMu.Lock() webClient.mobileConnMu.Lock()

View File

@ -1,5 +1,13 @@
package serverCommunication package serverCommunication
import (
"fmt"
"time"
"krakatoa.net/backend/modules/cache"
"krakatoa.net/backend/modules/logger"
)
/* /*
func getClientByDest(dest int, uuid string) (*structs.WebClient, *structs.MinecraftClient) { func getClientByDest(dest int, uuid string) (*structs.WebClient, *structs.MinecraftClient) {
switch dest { switch dest {
@ -17,3 +25,15 @@ func getClientByDest(dest int, uuid string) (*structs.WebClient, *structs.Minecr
} }
} }
*/ */
func AckHandler() {
ticker := time.NewTicker(20 * time.Millisecond)
for _ = range ticker.C {
fmt.Println("")
for cmdID, object := range cache.WebVoiceMessagesSendQueue {
logger.Web.Println("ackHandler", cmdID, object)
}
}
}

View File

@ -54,6 +54,14 @@ func HandleWebMessage(isVoice bool, conn *websocket.Conn, uuid string, msg []byt
return return
} }
} }
if isCmdIDInList(webClient.VoiceCmdIDs, cmdID) {
webClient.RemoveVoiceCmdID(cmdID)
}
if cache.IsInWebMessagesSendQueue(cache.WebVoiceMessagesSendQueue, cmdID) {
delete(cache.WebVoiceMessagesSendQueue, cmdID)
}
} else { } else {
for _, data := range webClient.MobileCmdIDsByBackend { for _, data := range webClient.MobileCmdIDsByBackend {
if data.WebCmdID == cmdID { // forward to target dest if data.WebCmdID == cmdID { // forward to target dest
@ -76,12 +84,10 @@ func HandleWebMessage(isVoice bool, conn *websocket.Conn, uuid string, msg []byt
return return
} }
} }
}
if isVoice && isCmdIDInList(webClient.VoiceCmdIDs, cmdID) { if isCmdIDInList(webClient.MobileCmdIDs, cmdID) {
webClient.RemoveVoiceCmdID(cmdID) webClient.RemoveMobileCmdID(cmdID)
} else if isCmdIDInList(webClient.MobileCmdIDs, cmdID) { }
webClient.RemoveMobileCmdID(cmdID)
} }
return return
} }
@ -119,7 +125,6 @@ func HandleWebMessage(isVoice bool, conn *websocket.Conn, uuid string, msg []byt
webClient.MobileCmdIDs = append(webClient.MobileCmdIDs, cmdID) webClient.MobileCmdIDs = append(webClient.MobileCmdIDs, cmdID)
logger.Web.Debugln("after MobileCmdIDs", webClient.MobileCmdIDs) logger.Web.Debugln("after MobileCmdIDs", webClient.MobileCmdIDs)
} }
return
} }
// no ack // no ack
@ -143,6 +148,10 @@ func HandleWebMessage(isVoice bool, conn *websocket.Conn, uuid string, msg []byt
if err != nil { if err != nil {
logger.WebVoice.Warnln("write:", err) logger.WebVoice.Warnln("write:", err)
} }
if status == kraProtocol.StatusGet {
cache.WebVoiceMessagesSendQueue[cmdID] = &structs.WebClientSendQueueMessage{MessageRaw: raw, CmdID: cmdID, TrySendCount: 0, Time: time.Now()}
}
} else { // mobile not connected } else { // mobile not connected
raw = kraProtocol.EncodeWebMessage(kraProtocol.StatusSend, 58299, cmdNumber, "") raw = kraProtocol.EncodeWebMessage(kraProtocol.StatusSend, 58299, cmdNumber, "")
@ -176,6 +185,10 @@ func HandleWebMessage(isVoice bool, conn *websocket.Conn, uuid string, msg []byt
if err != nil { if err != nil {
logger.WebMobile.Warnln("write:", err) logger.WebMobile.Warnln("write:", err)
} }
if status == kraProtocol.StatusGet {
cache.WebMobileMessagesSendQueue[cmdID] = &structs.WebClientSendQueueMessage{MessageRaw: raw, CmdID: cmdID, TrySendCount: 0, Time: time.Now()}
}
} else { // voice not connected } else { // voice not connected
raw = kraProtocol.EncodeWebMessage(kraProtocol.StatusSend, 5456, cmdNumber, "") raw = kraProtocol.EncodeWebMessage(kraProtocol.StatusSend, 5456, cmdNumber, "")