mongodb reactive stream driver

master
alex 2022-04-10 20:17:31 +02:00
parent 84ade4ac37
commit 1ca0a07b3a
5 changed files with 247 additions and 240 deletions

View File

@ -90,6 +90,11 @@
<artifactId>KraSocketClient</artifactId> <artifactId>KraSocketClient</artifactId>
<version>1.2-SNAPSHOT</version> <version>1.2-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>earth.krakatoa</groupId>
<artifactId>KraCore</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies> </dependencies>
<repositories> <repositories>

View File

@ -5,6 +5,11 @@ import earth.krakatao.KraSocketClientConfig;
import earth.krakatao.events.KraSocketClientEventInitiater; import earth.krakatao.events.KraSocketClientEventInitiater;
import earth.krakatao.events.KraSocketClientEventInterface; import earth.krakatao.events.KraSocketClientEventInterface;
import earth.krakatao.protocol.KraSocketClientProtocol; import earth.krakatao.protocol.KraSocketClientProtocol;
import earth.krakatoa.core.config.MongoConfig;
import earth.krakatoa.core.config.RedisConfig;
import earth.krakatoa.core.mongo.MongoManager;
import earth.krakatoa.core.redis.RedisManager;
import earth.krakatoa.core.util.Formatter;
import java.util.Arrays; import java.util.Arrays;
import lombok.Getter; import lombok.Getter;
import lombok.SneakyThrows; import lombok.SneakyThrows;
@ -16,9 +21,6 @@ import net.krakatoa.proxy.listener.LoginListener;
import net.krakatoa.proxy.listener.PlayerDisconnectListener; import net.krakatoa.proxy.listener.PlayerDisconnectListener;
import net.krakatoa.proxy.listener.PostLoginListener; import net.krakatoa.proxy.listener.PostLoginListener;
import net.krakatoa.proxy.listener.SocketClientMessageListener; import net.krakatoa.proxy.listener.SocketClientMessageListener;
import net.krakatoa.proxy.mongo.MongoManager;
import net.krakatoa.proxy.redis.RedisManager;
import net.krakatoa.proxy.util.Formatter;
import net.md_5.bungee.api.plugin.Plugin; import net.md_5.bungee.api.plugin.Plugin;
import net.md_5.bungee.api.plugin.PluginManager; import net.md_5.bungee.api.plugin.PluginManager;
@ -62,12 +64,13 @@ public class ProxySystem extends Plugin {
this.kraSocketClient.getSocketClient().connect(); this.kraSocketClient.getSocketClient().connect();
this.mongoManager = new MongoManager(this.configHandler.getMongodbHost(), this.mongoManager = new MongoManager(
this.configHandler.getMongodbPort(), this.configHandler.getMongodbUsername(), new MongoConfig(this.configHandler.getMongodbHost(), this.configHandler.getMongodbPort(),
this.configHandler.getMongodbPassword()); this.configHandler.getMongodbUsername(), this.configHandler.getMongodbPassword(),
this.mongoManager.connect(this.configHandler.getMongodbDatabase()); this.configHandler.getMongodbDatabase()));
this.redisManager = new RedisManager("redis://127.0.0.1:6379"); this.redisManager = new RedisManager(
new RedisConfig(this.configHandler.getRedisHost(), this.configHandler.getRedisPort()));
this.redisManager.connect(); this.redisManager.connect();
loadListeners(); loadListeners();
@ -79,9 +82,8 @@ public class ProxySystem extends Plugin {
this.kraSocketClient.getSocketClient().getConnection().close(); this.kraSocketClient.getSocketClient().getConnection().close();
this.kraSocketClient.getSocketClient().getConnection().closeConnection(1, ""); this.kraSocketClient.getSocketClient().getConnection().closeConnection(1, "");
this.mongoManager.getMongoClient().close(); this.mongoManager.shutdown();
this.redisManager.shutdown();
this.redisManager.close();
instance = null; instance = null;
} }

View File

@ -33,8 +33,7 @@ public class KrakatoapCommand extends Command {
.getKraSocketClientProtocol() .getKraSocketClientProtocol()
.getReceivedQueueMessages().size()))); .getReceivedQueueMessages().size())));
String uuid = ProxySystem.getInstance().getFormatter() String uuid = ProxySystem.getInstance().getFormatter().formatUuid(player.getUniqueId());
.formatUuid(player.getUniqueId().toString());
RMap<String, String> map = ProxySystem.getInstance().getRedisManager() RMap<String, String> map = ProxySystem.getInstance().getRedisManager()
.getRedissonClient() .getRedissonClient()

View File

@ -11,6 +11,7 @@ import net.md_5.bungee.api.connection.ProxiedPlayer;
import net.md_5.bungee.api.event.PlayerDisconnectEvent; import net.md_5.bungee.api.event.PlayerDisconnectEvent;
import net.md_5.bungee.api.plugin.Listener; import net.md_5.bungee.api.plugin.Listener;
import net.md_5.bungee.event.EventHandler; import net.md_5.bungee.event.EventHandler;
import org.bson.Document;
import org.bson.conversions.Bson; import org.bson.conversions.Bson;
import org.redisson.api.RMap; import org.redisson.api.RMap;
@ -29,9 +30,10 @@ public class PlayerDisconnectListener implements Listener {
.getMap("player:" + uuid); .getMap("player:" + uuid);
// mongo // mongo
ProxySystem.getInstance().getMongoManager().getPlayers() Document document = (Document) ProxySystem.getInstance().getMongoManager()
.find(Filters.eq("uuid", uuid)) .getPlayersCollection()
.first((document, throwable) -> { .find(Filters.eq("uuid", uuid)).first();
if (document != null) { if (document != null) {
List<Bson> updatesList = new ArrayList<>(); List<Bson> updatesList = new ArrayList<>();
@ -47,7 +49,10 @@ public class PlayerDisconnectListener implements Listener {
System.out.println("doc: " + document.keySet()); System.out.println("doc: " + document.keySet());
ProxySystem.getInstance().getMongoManager().getPlayers() ProxySystem.getInstance().getMongoManager().getPlayersCollection()
.updateOne(Filters.eq("uuid", uuid), updates);
/*
ProxySystem.getInstance().getMongoManager().getPlayersCollection()
.updateOne(document, updates, options, (result, t) -> { .updateOne(document, updates, options, (result, t) -> {
System.out.println(result == null); System.out.println(result == null);
@ -55,11 +60,10 @@ public class PlayerDisconnectListener implements Listener {
System.out.println("Modified document count: " + result.getModifiedCount()); System.out.println("Modified document count: " + result.getModifiedCount());
System.out.println("Upserted id: " + result.getUpsertedId()); System.out.println("Upserted id: " + result.getUpsertedId());
} }
}); }); */
} else { } else {
System.out.println("Mongo player is null on disconnect"); System.out.println("Mongo player is null on disconnect");
} }
});
// redis // redis
if (!map.isEmpty()) { if (!map.isEmpty()) {

View File

@ -37,19 +37,17 @@ public class PostLoginListener implements Listener {
String uuid = ProxySystem.getInstance().getFormatter() String uuid = ProxySystem.getInstance().getFormatter()
.formatUuid(proxiedPlayer.getUniqueId().toString()); .formatUuid(proxiedPlayer.getUniqueId().toString());
ProxySystem.getInstance().getMongoManager().getPlayers() Document document = (Document) ProxySystem.getInstance().getMongoManager()
.find(Filters.eq("uuid", uuid)) .getPlayersCollection()
.first((document, throwable) -> { .find(Filters.eq("uuid", uuid)).first();
if (document == null) { // new player - create entries in databases if (document == null) { // new player - create entries in databases
System.out.println("create player in db"); System.out.println("create player in db");
String voiceWebCode = generateVoiceWebCode(uuid); String voiceWebCode = generateVoiceWebCode(uuid);
document = new Document("uuid", uuid).append("voiceWebCode", voiceWebCode); document = new Document("uuid", uuid).append("voiceWebCode", voiceWebCode);
ProxySystem.getInstance().getMongoManager().getPlayers() ProxySystem.getInstance().getMongoManager().getPlayersCollection().insertOne(document);
.insertOne(document, (unused, throwable1) -> {
System.out.println("insertOne");
});
sendVoiceWebCodeUrl(proxiedPlayer, voiceWebCode); sendVoiceWebCodeUrl(proxiedPlayer, voiceWebCode);
@ -121,9 +119,8 @@ public class PostLoginListener implements Listener {
UpdateOptions options = new UpdateOptions().upsert(true); UpdateOptions options = new UpdateOptions().upsert(true);
ProxySystem.getInstance().getMongoManager().getPlayers() ProxySystem.getInstance().getMongoManager().getPlayersCollection()
.updateOne(finalDocument, updates, options, (updateResult, throwable1) -> { .updateOne(finalDocument, updates, options);
});
System.out.println("voiceWebCode " + voiceWebCode); System.out.println("voiceWebCode " + voiceWebCode);
@ -140,7 +137,7 @@ public class PostLoginListener implements Listener {
KraSocketClientProtocolDest.BACKEND.getStatus(), proxiedPlayer.getUniqueId(), KraSocketClientProtocolDest.BACKEND.getStatus(), proxiedPlayer.getUniqueId(),
(short) 10, "", consumer)); (short) 10, "", consumer));
} }
});
/* /*
MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();