added timer task for ack message handling

master
Alex 2022-02-28 19:36:48 +01:00
parent 64000b23b7
commit 9ae2c47057
6 changed files with 57 additions and 30 deletions

View File

@ -3,10 +3,9 @@ package earth.krakatao;
import earth.krakatao.events.KraSocketClientEventInitiater; import earth.krakatao.events.KraSocketClientEventInitiater;
import earth.krakatao.events.KraSocketClientEventInterface; import earth.krakatao.events.KraSocketClientEventInterface;
import earth.krakatao.protocol.KraSocketClientProtocol; import earth.krakatao.protocol.KraSocketClientProtocol;
import earth.krakatao.protocol.KraSocketClientProtocolMessage;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.function.Consumer; import java.util.List;
import java.util.logging.Logger; import java.util.logging.Logger;
import lombok.Getter; import lombok.Getter;
@ -22,7 +21,8 @@ public class KraSocketClient {
public KraSocketClient(KraSocketClientConfig kraSocketClientConfig, public KraSocketClient(KraSocketClientConfig kraSocketClientConfig,
KraSocketClientEventInterface kraSocketClientEventInterface, KraSocketClientEventInterface kraSocketClientEventInterface,
KraSocketClientProtocol kraSocketClientProtocol) throws URISyntaxException { KraSocketClientProtocol kraSocketClientProtocol, List<Long> webSocketAckTimeouts)
throws URISyntaxException {
this.kraSocketClientConfig = kraSocketClientConfig; this.kraSocketClientConfig = kraSocketClientConfig;
this.kraSocketClientEventInitiater = new KraSocketClientEventInitiater(); this.kraSocketClientEventInitiater = new KraSocketClientEventInitiater();
@ -33,6 +33,6 @@ public class KraSocketClient {
+ this.kraSocketClientConfig.getWebSocketPort() + this.kraSocketClientConfig.getWebSocketPort()
+ "/ws?ak=" + this.kraSocketClientConfig.getWebSocketAccessKey() + "/ws?ak=" + this.kraSocketClientConfig.getWebSocketAccessKey()
+ "&s=" + this.kraSocketClientConfig.getWebSocketServerName()), + "&s=" + this.kraSocketClientConfig.getWebSocketServerName()),
this.kraSocketClientEventInitiater, kraSocketClientProtocol); this.kraSocketClientEventInitiater, kraSocketClientProtocol, webSocketAckTimeouts);
} }
} }

View File

@ -1,14 +1,16 @@
package earth.krakatao; package earth.krakatao;
import java.util.List;
import lombok.Data; import lombok.Data;
@Data @Data
public class KraSocketClientConfig { public class KraSocketClientConfig {
private final String WebSocketProtocol; private final String webSocketProtocol;
private final String WebSocketHost; private final String webSocketHost;
private final int WebSocketPort; private final int webSocketPort;
private final String WebSocketAccessKey; private final String webSocketAccessKey;
private final String WebSocketServerName; private final String webSocketServerName;
private final List<Long> webSocketAckTimeouts;
} }

View File

@ -2,7 +2,6 @@ package earth.krakatao;
import earth.krakatao.protocol.KraSocketClientProtocolMessage; import earth.krakatao.protocol.KraSocketClientProtocolMessage;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.function.Consumer;
public class KraSocketClientMessageTimerTask extends TimerTask { public class KraSocketClientMessageTimerTask extends TimerTask {
@ -15,13 +14,28 @@ public class KraSocketClientMessageTimerTask extends TimerTask {
@Override @Override
public void run() { public void run() {
KraSocketClient.getLogger().info( KraSocketClient.getLogger().info(
"MessageTimerTask is running " + this.socketClient.getKraProtocol().getConsumerHashMap() "MessageTimerTask is running. SendQueueMessages size: " + this.socketClient.getKraProtocol()
.getSendQueueMessages()
.size()); .size());
for (Consumer<KraSocketClientProtocolMessage> consumer : this.socketClient.getKraProtocol() long currentTimeMillis = System.currentTimeMillis();
.getConsumerHashMap().values()) {
KraSocketClient.getLogger().info("consumer: " + consumer); for (KraSocketClientProtocolMessage msg : this.socketClient.getKraProtocol()
.getSendQueueMessages()
.values()) {
if ((currentTimeMillis - msg.getTime())
> this.socketClient.getWebSocketAckTimeouts()
.get(msg.getTrySendCount())) {
KraSocketClient.getLogger().info("MessageTimerTask here");
msg.setTime(currentTimeMillis);
this.socketClient.SendMessage(msg);
if (msg.getTrySendCount() < 4) {
msg.setTrySendCount(msg.getTrySendCount() + 1);
}
}
} }
} }
} }

View File

@ -6,6 +6,7 @@ import earth.krakatao.protocol.KraSocketClientProtocolMessage;
import earth.krakatao.protocol.KraSocketClientProtocolStatus; import earth.krakatao.protocol.KraSocketClientProtocolStatus;
import java.net.URI; import java.net.URI;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import lombok.Getter; import lombok.Getter;
@ -14,18 +15,20 @@ import org.java_websocket.handshake.ServerHandshake;
public class SocketClient extends WebSocketClient { public class SocketClient extends WebSocketClient {
private final KraSocketClientEventInitiater kraSocketClientEventInitiater;
@Getter @Getter
private final KraSocketClientProtocol kraProtocol; private final KraSocketClientProtocol kraProtocol;
private final KraSocketClientEventInitiater kraSocketClientEventInitiater; @Getter
private TimerTask timerTask; private final List<Long> webSocketAckTimeouts;
private Timer timer; private Timer timer;
public SocketClient(URI serverUri, KraSocketClientEventInitiater kraSocketClientEventInitiater, public SocketClient(URI serverUri, KraSocketClientEventInitiater kraSocketClientEventInitiater,
KraSocketClientProtocol kraProtocol) { KraSocketClientProtocol kraProtocol, List<Long> webSocketAckTimeouts) {
super(serverUri); super(serverUri);
this.kraSocketClientEventInitiater = kraSocketClientEventInitiater; this.kraSocketClientEventInitiater = kraSocketClientEventInitiater;
this.kraProtocol = kraProtocol; this.kraProtocol = kraProtocol;
this.webSocketAckTimeouts = webSocketAckTimeouts;
} }
public void SendMessage(KraSocketClientProtocolMessage kraProtocolMessage) { public void SendMessage(KraSocketClientProtocolMessage kraProtocolMessage) {
@ -44,7 +47,7 @@ public class SocketClient extends WebSocketClient {
this.kraSocketClientEventInitiater.callOnOpen(serverHandshake); this.kraSocketClientEventInitiater.callOnOpen(serverHandshake);
KraSocketClient.getLogger().info("connection opened"); KraSocketClient.getLogger().info("connection opened");
this.timerTask = new KraSocketClientMessageTimerTask(this); TimerTask timerTask = new KraSocketClientMessageTimerTask(this);
this.timer = new Timer(true); this.timer = new Timer(true);
this.timer.scheduleAtFixedRate(timerTask, 0, 3 * 1000); this.timer.scheduleAtFixedRate(timerTask, 0, 3 * 1000);
} }
@ -71,13 +74,13 @@ public class SocketClient extends WebSocketClient {
// response to the user that this message cmdID is already in the queue // response to the user that this message cmdID is already in the queue
if (kraProtocolMessage.getStatus() == KraSocketClientProtocolStatus.GET.getStatus() if (kraProtocolMessage.getStatus() == KraSocketClientProtocolStatus.GET.getStatus()
&& this.kraProtocol.getCmdIDs().contains(kraProtocolMessage.getCmdID())) { && this.kraProtocol.getSendQueueMessages().containsKey(kraProtocolMessage.getCmdID())) {
// TODO: uuid // TODO: uuid
kraProtocolMessage = new KraSocketClientProtocolMessage( kraProtocolMessage = new KraSocketClientProtocolMessage(
KraSocketClientProtocolStatus.MESSAGE_ALREADY_IN_QUEUE.getStatus(), KraSocketClientProtocolStatus.MESSAGE_ALREADY_IN_QUEUE.getStatus(),
kraProtocolMessage.getStatus() == KraSocketClientProtocolStatus.GET.getStatus() kraProtocolMessage.getStatus() == KraSocketClientProtocolStatus.GET.getStatus()
? kraProtocolMessage.getStatus() : 0, 0, ? kraProtocolMessage.getStatus() : 0, 0,
"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", (short) kraProtocolMessage.getCmdNumber(), ""); "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", (short) kraProtocolMessage.getCmdNumber(), "", null);
KraSocketClient.getLogger() KraSocketClient.getLogger()
.info("response MESSAGE_ALREADY_IN_QUEUE " + kraProtocolMessage.getCmdID()); .info("response MESSAGE_ALREADY_IN_QUEUE " + kraProtocolMessage.getCmdID());

View File

@ -11,17 +11,21 @@ import lombok.Getter;
public class KraSocketClientProtocol { public class KraSocketClientProtocol {
@Getter //@Getter
private final ArrayList<Integer> cmdIDs; //private final ArrayList<Integer> cmdIDs;
@Getter @Getter
private int currentCmdIDIndex; private int currentCmdIDIndex;
//@Getter
//private final HashMap<Integer, Consumer<KraSocketClientProtocolMessage>> consumerHashMap;
@Getter @Getter
private final HashMap<Integer, Consumer<KraSocketClientProtocolMessage>> consumerHashMap; private final HashMap<Integer, KraSocketClientProtocolMessage> sendQueueMessages;
public KraSocketClientProtocol() { public KraSocketClientProtocol() {
this.cmdIDs = new ArrayList<>(); //this.cmdIDs = new ArrayList<>();
this.currentCmdIDIndex = 9; this.currentCmdIDIndex = 9;
this.consumerHashMap = new HashMap<>(); //this.consumerHashMap = new HashMap<>();
this.sendQueueMessages = new HashMap<>();
} }
public int generateCmdID() { public int generateCmdID() {
@ -79,7 +83,7 @@ public class KraSocketClientProtocol {
} }
} }
this.cmdIDs.add(cmdID); //this.cmdIDs.add(cmdID);
return raw; return raw;
} }
@ -125,10 +129,11 @@ public class KraSocketClientProtocol {
+ cmdNumber + " " + args + " "); + cmdNumber + " " + args + " ");
return new KraSocketClientProtocolMessage(status, cmdID, dest, return new KraSocketClientProtocolMessage(status, cmdID, dest,
Formatter.stringToUuid(playerUuid).toString(), cmdNumber, args); Formatter.stringToUuid(playerUuid).toString(), cmdNumber, args, null);
} }
public void removeData(int cmdID) { public void removeData(int cmdID) {
/*
Iterator<Integer> iterator = this.cmdIDs.iterator(); Iterator<Integer> iterator = this.cmdIDs.iterator();
KraSocketClient.getLogger().info("before: " + this.cmdIDs); KraSocketClient.getLogger().info("before: " + this.cmdIDs);
@ -140,6 +145,6 @@ public class KraSocketClientProtocol {
} }
KraSocketClient.getLogger().info("after: " + this.cmdIDs); KraSocketClient.getLogger().info("after: " + this.cmdIDs);
this.consumerHashMap.remove(cmdID); this.consumerHashMap.remove(cmdID); */
} }
} }

View File

@ -1,5 +1,6 @@
package earth.krakatao.protocol; package earth.krakatao.protocol;
import java.util.function.Consumer;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
@ -12,19 +13,21 @@ public class KraSocketClientProtocolMessage {
private final String uuid; private final String uuid;
private final short cmdNumber; private final short cmdNumber;
private final String args; private final String args;
private final Consumer<KraSocketClientProtocolMessage> messageConsumer;
@Setter @Setter
private int trySendCount; private int trySendCount;
@Setter @Setter
private long time; private long time;
public KraSocketClientProtocolMessage(byte status, int cmdID, int dest, String uuid, public KraSocketClientProtocolMessage(byte status, int cmdID, int dest, String uuid,
short cmdNumber, String args) { short cmdNumber, String args, Consumer<KraSocketClientProtocolMessage> messageConsumer) {
this.status = status; this.status = status;
this.cmdID = cmdID; this.cmdID = cmdID;
this.dest = dest; this.dest = dest;
this.uuid = uuid; this.uuid = uuid;
this.cmdNumber = cmdNumber; this.cmdNumber = cmdNumber;
this.args = args; this.args = args;
this.messageConsumer = messageConsumer;
this.trySendCount = 0; this.trySendCount = 0;
this.time = System.currentTimeMillis(); this.time = System.currentTimeMillis();