From 1ca0a07b3ae984e801c829edecb9667e4d7ccbbd Mon Sep 17 00:00:00 2001 From: alex Date: Sun, 10 Apr 2022 20:17:31 +0200 Subject: [PATCH] mongodb reactive stream driver --- pom.xml | 241 +++++++++--------- .../java/net/krakatoa/proxy/ProxySystem.java | 24 +- .../proxy/command/KrakatoapCommand.java | 3 +- .../listener/PlayerDisconnectListener.java | 42 +-- .../proxy/listener/PostLoginListener.java | 177 +++++++------ 5 files changed, 247 insertions(+), 240 deletions(-) diff --git a/pom.xml b/pom.xml index 9dfec7b..a6c00fe 100755 --- a/pom.xml +++ b/pom.xml @@ -1,126 +1,131 @@ - 4.0.0 + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + 4.0.0 - net.krakatoa.proxy - KraProxySystem - 1.0-SNAPSHOT + net.krakatoa.proxy + KraProxySystem + 1.0-SNAPSHOT - - 14 - 14 - + + 14 + 14 + - - clean install - - - maven-assembly-plugin - - - jar-with-dependencies - - - - - - single - - make-assembly - package - - - - - maven-compiler-plugin - - 9 - 9 - - org.apache.maven.plugins - - - + + clean install + + + maven-assembly-plugin + + + jar-with-dependencies + + + + + + single + + make-assembly + package + + + + + maven-compiler-plugin + + 9 + 9 + + org.apache.maven.plugins + + + - - - org.msgpack - msgpack-core - 0.9.0 - - - org.redisson - redisson - 3.17.0 - - - net.md-5 - bungeecord-api - 1.17-R0.1-SNAPSHOT - jar - provided - - - lombok - org.projectlombok - provided - 1.18.22 - - - mongodb-driver-async - org.mongodb - 3.12.10 - - - de.dytanic.cloudnet - cloudnet-bridge - 3.4.0-RELEASE - provided - - - de.dytanic.cloudnet - cloudnet-driver - 3.4.0-RELEASE - provided - - - earth.krakatoa - KraSocketClient - 1.2-SNAPSHOT - - + + + org.msgpack + msgpack-core + 0.9.0 + + + org.redisson + redisson + 3.17.0 + + + net.md-5 + bungeecord-api + 1.17-R0.1-SNAPSHOT + jar + provided + + + lombok + org.projectlombok + provided + 1.18.22 + + + mongodb-driver-async + org.mongodb + 3.12.10 + + + de.dytanic.cloudnet + cloudnet-bridge + 3.4.0-RELEASE + provided + + + de.dytanic.cloudnet + cloudnet-driver + 3.4.0-RELEASE + provided + + + earth.krakatoa + KraSocketClient + 1.2-SNAPSHOT + + + earth.krakatoa + KraCore + 1.0-SNAPSHOT + + - - - bungeecord-repo - https://oss.sonatype.org/content/repositories/snapshots - - - - sonatype-nexus-snapshots - - false - - - true - - https://oss.sonatype.org/content/repositories/snapshots - - - krakatoa-earth - https://repo.krakatoa.umbach.dev/repository/krakatoa-earth/ - - true - - - true - - - - cloudnet-releases - https://repo.cloudnetservice.eu/repository/releases/ - - + + + bungeecord-repo + https://oss.sonatype.org/content/repositories/snapshots + + + + sonatype-nexus-snapshots + + false + + + true + + https://oss.sonatype.org/content/repositories/snapshots + + + krakatoa-earth + https://repo.krakatoa.umbach.dev/repository/krakatoa-earth/ + + true + + + true + + + + cloudnet-releases + https://repo.cloudnetservice.eu/repository/releases/ + + \ No newline at end of file diff --git a/src/main/java/net/krakatoa/proxy/ProxySystem.java b/src/main/java/net/krakatoa/proxy/ProxySystem.java index cbd6ea4..28a9e78 100755 --- a/src/main/java/net/krakatoa/proxy/ProxySystem.java +++ b/src/main/java/net/krakatoa/proxy/ProxySystem.java @@ -5,6 +5,11 @@ import earth.krakatao.KraSocketClientConfig; import earth.krakatao.events.KraSocketClientEventInitiater; import earth.krakatao.events.KraSocketClientEventInterface; import earth.krakatao.protocol.KraSocketClientProtocol; +import earth.krakatoa.core.config.MongoConfig; +import earth.krakatoa.core.config.RedisConfig; +import earth.krakatoa.core.mongo.MongoManager; +import earth.krakatoa.core.redis.RedisManager; +import earth.krakatoa.core.util.Formatter; import java.util.Arrays; import lombok.Getter; import lombok.SneakyThrows; @@ -16,9 +21,6 @@ import net.krakatoa.proxy.listener.LoginListener; import net.krakatoa.proxy.listener.PlayerDisconnectListener; import net.krakatoa.proxy.listener.PostLoginListener; import net.krakatoa.proxy.listener.SocketClientMessageListener; -import net.krakatoa.proxy.mongo.MongoManager; -import net.krakatoa.proxy.redis.RedisManager; -import net.krakatoa.proxy.util.Formatter; import net.md_5.bungee.api.plugin.Plugin; import net.md_5.bungee.api.plugin.PluginManager; @@ -62,12 +64,13 @@ public class ProxySystem extends Plugin { this.kraSocketClient.getSocketClient().connect(); - this.mongoManager = new MongoManager(this.configHandler.getMongodbHost(), - this.configHandler.getMongodbPort(), this.configHandler.getMongodbUsername(), - this.configHandler.getMongodbPassword()); - this.mongoManager.connect(this.configHandler.getMongodbDatabase()); + this.mongoManager = new MongoManager( + new MongoConfig(this.configHandler.getMongodbHost(), this.configHandler.getMongodbPort(), + this.configHandler.getMongodbUsername(), this.configHandler.getMongodbPassword(), + this.configHandler.getMongodbDatabase())); - this.redisManager = new RedisManager("redis://127.0.0.1:6379"); + this.redisManager = new RedisManager( + new RedisConfig(this.configHandler.getRedisHost(), this.configHandler.getRedisPort())); this.redisManager.connect(); loadListeners(); @@ -79,9 +82,8 @@ public class ProxySystem extends Plugin { this.kraSocketClient.getSocketClient().getConnection().close(); this.kraSocketClient.getSocketClient().getConnection().closeConnection(1, ""); - this.mongoManager.getMongoClient().close(); - - this.redisManager.close(); + this.mongoManager.shutdown(); + this.redisManager.shutdown(); instance = null; } diff --git a/src/main/java/net/krakatoa/proxy/command/KrakatoapCommand.java b/src/main/java/net/krakatoa/proxy/command/KrakatoapCommand.java index d4b7c15..56ddbd3 100755 --- a/src/main/java/net/krakatoa/proxy/command/KrakatoapCommand.java +++ b/src/main/java/net/krakatoa/proxy/command/KrakatoapCommand.java @@ -33,8 +33,7 @@ public class KrakatoapCommand extends Command { .getKraSocketClientProtocol() .getReceivedQueueMessages().size()))); - String uuid = ProxySystem.getInstance().getFormatter() - .formatUuid(player.getUniqueId().toString()); + String uuid = ProxySystem.getInstance().getFormatter().formatUuid(player.getUniqueId()); RMap map = ProxySystem.getInstance().getRedisManager() .getRedissonClient() diff --git a/src/main/java/net/krakatoa/proxy/listener/PlayerDisconnectListener.java b/src/main/java/net/krakatoa/proxy/listener/PlayerDisconnectListener.java index bbe84a7..67a7456 100755 --- a/src/main/java/net/krakatoa/proxy/listener/PlayerDisconnectListener.java +++ b/src/main/java/net/krakatoa/proxy/listener/PlayerDisconnectListener.java @@ -11,6 +11,7 @@ import net.md_5.bungee.api.connection.ProxiedPlayer; import net.md_5.bungee.api.event.PlayerDisconnectEvent; import net.md_5.bungee.api.plugin.Listener; import net.md_5.bungee.event.EventHandler; +import org.bson.Document; import org.bson.conversions.Bson; import org.redisson.api.RMap; @@ -29,25 +30,29 @@ public class PlayerDisconnectListener implements Listener { .getMap("player:" + uuid); // mongo - ProxySystem.getInstance().getMongoManager().getPlayers() - .find(Filters.eq("uuid", uuid)) - .first((document, throwable) -> { - if (document != null) { - List updatesList = new ArrayList<>(); + Document document = (Document) ProxySystem.getInstance().getMongoManager() + .getPlayersCollection() + .find(Filters.eq("uuid", uuid)).first(); - map.forEach((key, value) -> { - if (!Objects.equals(key, "_id") && !Objects.equals(key, "uuid")) { - updatesList.add(Updates.set(key, value)); - } - }); + if (document != null) { + List updatesList = new ArrayList<>(); - Bson updates = Updates.combine(updatesList); + map.forEach((key, value) -> { + if (!Objects.equals(key, "_id") && !Objects.equals(key, "uuid")) { + updatesList.add(Updates.set(key, value)); + } + }); - UpdateOptions options = new UpdateOptions().upsert(true); + Bson updates = Updates.combine(updatesList); - System.out.println("doc: " + document.keySet()); + UpdateOptions options = new UpdateOptions().upsert(true); - ProxySystem.getInstance().getMongoManager().getPlayers() + System.out.println("doc: " + document.keySet()); + + ProxySystem.getInstance().getMongoManager().getPlayersCollection() + .updateOne(Filters.eq("uuid", uuid), updates); +/* + ProxySystem.getInstance().getMongoManager().getPlayersCollection() .updateOne(document, updates, options, (result, t) -> { System.out.println(result == null); @@ -55,11 +60,10 @@ public class PlayerDisconnectListener implements Listener { System.out.println("Modified document count: " + result.getModifiedCount()); System.out.println("Upserted id: " + result.getUpsertedId()); } - }); - } else { - System.out.println("Mongo player is null on disconnect"); - } - }); + }); */ + } else { + System.out.println("Mongo player is null on disconnect"); + } // redis if (!map.isEmpty()) { diff --git a/src/main/java/net/krakatoa/proxy/listener/PostLoginListener.java b/src/main/java/net/krakatoa/proxy/listener/PostLoginListener.java index 1ecbd3f..855e9d0 100755 --- a/src/main/java/net/krakatoa/proxy/listener/PostLoginListener.java +++ b/src/main/java/net/krakatoa/proxy/listener/PostLoginListener.java @@ -37,110 +37,107 @@ public class PostLoginListener implements Listener { String uuid = ProxySystem.getInstance().getFormatter() .formatUuid(proxiedPlayer.getUniqueId().toString()); - ProxySystem.getInstance().getMongoManager().getPlayers() - .find(Filters.eq("uuid", uuid)) - .first((document, throwable) -> { - if (document == null) { // new player - create entries in databases - System.out.println("create player in db"); - String voiceWebCode = generateVoiceWebCode(uuid); + Document document = (Document) ProxySystem.getInstance().getMongoManager() + .getPlayersCollection() + .find(Filters.eq("uuid", uuid)).first(); - document = new Document("uuid", uuid).append("voiceWebCode", voiceWebCode); + if (document == null) { // new player - create entries in databases + System.out.println("create player in db"); + String voiceWebCode = generateVoiceWebCode(uuid); - ProxySystem.getInstance().getMongoManager().getPlayers() - .insertOne(document, (unused, throwable1) -> { - System.out.println("insertOne"); - }); + document = new Document("uuid", uuid).append("voiceWebCode", voiceWebCode); - sendVoiceWebCodeUrl(proxiedPlayer, voiceWebCode); + ProxySystem.getInstance().getMongoManager().getPlayersCollection().insertOne(document); - // redis + sendVoiceWebCodeUrl(proxiedPlayer, voiceWebCode); - RMap map = ProxySystem.getInstance().getRedisManager() - .getRedissonClient() - .getMap("player:" + uuid); + // redis - if (map.isEmpty()) { - map.put("uuid", uuid); - map.put("voiceWebCode", "voiceWebCode"); + RMap map = ProxySystem.getInstance().getRedisManager() + .getRedissonClient() + .getMap("player:" + uuid); - ProxySystem.getInstance().getLogger().info("PUT !"); - } else { - ProxySystem.getInstance().getLogger() - .warning("REDIS player " + uuid + " exists in the cache, but should not exist"); - } - } else { // player found in the database + if (map.isEmpty()) { + map.put("uuid", uuid); + map.put("voiceWebCode", "voiceWebCode"); - // redis - ProxySystem.getInstance().getLogger().info("mongodb player " + document.keySet()); + ProxySystem.getInstance().getLogger().info("PUT !"); + } else { + ProxySystem.getInstance().getLogger() + .warning("REDIS player " + uuid + " exists in the cache, but should not exist"); + } + } else { // player found in the database - RMap map = ProxySystem.getInstance().getRedisManager() - .getRedissonClient() - .getMap("player:" + uuid); + // redis + ProxySystem.getInstance().getLogger().info("mongodb player " + document.keySet()); - if (map.isEmpty()) { - map.put("uuid", uuid); - map.put("voiceWebCode", "voiceWebCode"); + RMap map = ProxySystem.getInstance().getRedisManager() + .getRedissonClient() + .getMap("player:" + uuid); - document.forEach((key, value) -> { - if (!Objects.equals(key, "_id")) { - ProxySystem.getInstance().getLogger() - .info("key " + key + " value " + value.toString()); - map.put(key, value.toString()); - } - }); - ProxySystem.getInstance().getLogger().info("PUT !"); - } else { - ProxySystem.getInstance().getLogger() - .warning("REDIS player " + uuid + " exists in the cache, but should not exist"); - } + if (map.isEmpty()) { + map.put("uuid", uuid); + map.put("voiceWebCode", "voiceWebCode"); - // check if both sockets connected - int cmdID = ProxySystem.getInstance().getKraSocketClientProtocol().generateCmdID(); - - Document finalDocument = document; - Consumer consumer = value -> { - if (value.getArgs().equals("1")) { - // connected! Send player to game server - System.out.println( - proxiedPlayer.getName() + " sockets connected! Send player to game server"); - proxiedPlayer.sendMessage( - new TextComponent("§bWillkommen zurück §6" + proxiedPlayer.getDisplayName())); - - ProxyServer.getInstance().getScheduler().schedule(ProxySystem.getInstance(), () -> { - ServerInfo serverInfos = ProxyServer.getInstance().getServerInfo("NewYork-1"); - proxiedPlayer.connect(serverInfos); - }, 2L, TimeUnit.SECONDS); - } else { - // not connected! Send voice url - System.out.println( - proxiedPlayer.getName() + " sockets not connected! Send voice url"); - - String voiceWebCode = generateVoiceWebCode(uuid); - - Bson updates = Updates.combine(Updates.set("voiceWebCode", voiceWebCode)); - - UpdateOptions options = new UpdateOptions().upsert(true); - - ProxySystem.getInstance().getMongoManager().getPlayers() - .updateOne(finalDocument, updates, options, (updateResult, throwable1) -> { - }); - - System.out.println("voiceWebCode " + voiceWebCode); - - sendVoiceWebCodeUrl(proxiedPlayer, voiceWebCode); - } - - System.out.println( - "consumer response value " + value.getUuid() + " " + value.getCmdID()); - }; - - ProxySystem.getInstance().getKraSocketClient().getSocketClient() - .SendMessage(new KraSocketClientProtocolMessage( - KraSocketClientProtocolStatus.GET.getStatus(), cmdID, - KraSocketClientProtocolDest.BACKEND.getStatus(), proxiedPlayer.getUniqueId(), - (short) 10, "", consumer)); + document.forEach((key, value) -> { + if (!Objects.equals(key, "_id")) { + ProxySystem.getInstance().getLogger() + .info("key " + key + " value " + value.toString()); + map.put(key, value.toString()); } }); + ProxySystem.getInstance().getLogger().info("PUT !"); + } else { + ProxySystem.getInstance().getLogger() + .warning("REDIS player " + uuid + " exists in the cache, but should not exist"); + } + + // check if both sockets connected + int cmdID = ProxySystem.getInstance().getKraSocketClientProtocol().generateCmdID(); + + Document finalDocument = document; + Consumer consumer = value -> { + if (value.getArgs().equals("1")) { + // connected! Send player to game server + System.out.println( + proxiedPlayer.getName() + " sockets connected! Send player to game server"); + proxiedPlayer.sendMessage( + new TextComponent("§bWillkommen zurück §6" + proxiedPlayer.getDisplayName())); + + ProxyServer.getInstance().getScheduler().schedule(ProxySystem.getInstance(), () -> { + ServerInfo serverInfos = ProxyServer.getInstance().getServerInfo("NewYork-1"); + proxiedPlayer.connect(serverInfos); + }, 2L, TimeUnit.SECONDS); + } else { + // not connected! Send voice url + System.out.println( + proxiedPlayer.getName() + " sockets not connected! Send voice url"); + + String voiceWebCode = generateVoiceWebCode(uuid); + + Bson updates = Updates.combine(Updates.set("voiceWebCode", voiceWebCode)); + + UpdateOptions options = new UpdateOptions().upsert(true); + + ProxySystem.getInstance().getMongoManager().getPlayersCollection() + .updateOne(finalDocument, updates, options); + + System.out.println("voiceWebCode " + voiceWebCode); + + sendVoiceWebCodeUrl(proxiedPlayer, voiceWebCode); + } + + System.out.println( + "consumer response value " + value.getUuid() + " " + value.getCmdID()); + }; + + ProxySystem.getInstance().getKraSocketClient().getSocketClient() + .SendMessage(new KraSocketClientProtocolMessage( + KraSocketClientProtocolStatus.GET.getStatus(), cmdID, + KraSocketClientProtocolDest.BACKEND.getStatus(), proxiedPlayer.getUniqueId(), + (short) 10, "", consumer)); + } + /* MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();