Skip to content

Commit 9382b56

Browse files
committed
feat: update redis (wip); feat: add suspendingHandler function for commands
1 parent d5f800b commit 9382b56

File tree

14 files changed

+133
-90
lines changed

14 files changed

+133
-90
lines changed

build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ plugins {
66

77
allprojects {
88
group = "org.sayandev"
9-
version = "1.8.9.52"
9+
version = "1.8.9.68"
1010
description = "A modular Kotlin framework for Minecraft: JE"
1111

1212
plugins.apply("maven-publish")

gradle/libs.versions.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ implementation-core = [
142142
"configurate-extra-kotlin",
143143
"cloud-core",
144144
"cloud-kotlin-extensions",
145-
"cloud-kotlin-coroutines",
145+
# "cloud-kotlin-coroutines",
146146
"adventure-api",
147147
"adventure-text-minimessage",
148148
"adventure-text-serializer-gson",

stickynote-bukkit/src/main/kotlin/org/sayandev/stickynote/bukkit/command/BukkitCommand.kt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.incendo.cloud.parser.standard.StringArrayParser
2222
import org.incendo.cloud.parser.standard.StringParser
2323
import org.incendo.cloud.setting.ManagerSetting
2424
import org.incendo.cloud.suggestion.Suggestion
25+
import org.sayandev.stickynote.bukkit.launch
2526
import org.sayandev.stickynote.bukkit.plugin
2627
import org.sayandev.stickynote.bukkit.utils.AdventureUtils
2728
import org.sayandev.stickynote.bukkit.utils.ServerVersion
@@ -103,6 +104,14 @@ abstract class BukkitCommand(
103104
override fun errorPrefix(prefix: Component) {
104105
errorPrefix = prefix
105106
}
107+
108+
fun <S : Any> MutableCommandBuilder<S>.suspendingHandler(context: suspend (CommandContext<S>) -> Unit) {
109+
this.handler {
110+
launch {
111+
context(it)
112+
}
113+
}
114+
}
106115
}
107116

108117
fun CommandContext<BukkitSender>.player(): Player? {

stickynote-bukkit/src/main/kotlin/org/sayandev/stickynote/bukkit/messaging/publisher/PluginMessagePublisher.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ abstract class PluginMessagePublisher<P, S>(
4242
}
4343
}
4444

45-
fun publish(player: Player, payloadWrapper: PayloadWrapper<P>): CompletableDeferred<S> {
45+
suspend fun publish(player: Player, payloadWrapper: PayloadWrapper<P>): CompletableDeferred<S> {
4646
player.sendPluginMessage(plugin, this.id(), payloadWrapper.asJson().toByteArray())
4747
return publish(payloadWrapper)
4848
}

stickynote-bukkit/src/main/kotlin/org/sayandev/stickynote/bukkit/messaging/subscriber/PluginMessageSubscribeListener.kt

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,8 @@ import org.bukkit.entity.Player
44
import org.bukkit.plugin.messaging.PluginMessageListener
55
import org.sayandev.stickynote.bukkit.messaging.publisher.PluginMessagePublisher
66
import org.sayandev.stickynote.bukkit.plugin
7-
import org.sayandev.stickynote.bukkit.warn
87
import org.sayandev.stickynote.core.messaging.publisher.PayloadWrapper
98
import org.sayandev.stickynote.core.messaging.publisher.PayloadWrapper.Companion.asJson
10-
import org.sayandev.stickynote.core.messaging.publisher.PayloadWrapper.Companion.asOptionalPayloadWrapper
119
import org.sayandev.stickynote.core.messaging.publisher.PayloadWrapper.Companion.asPayloadWrapper
1210
import org.sayandev.stickynote.core.messaging.publisher.PayloadWrapper.Companion.typedPayload
1311
import org.sayandev.stickynote.core.messaging.publisher.Publisher.Companion.HANDLER_LIST
@@ -43,13 +41,11 @@ class PluginMessageSubscribeListener<P, S>(
4341
PayloadWrapper.State.RESPOND -> {
4442
for (publisher in HANDLER_LIST.filterIsInstance<PluginMessagePublisher<P, S>>()) {
4543
if (publisher.id() == channel) {
46-
val handle = String(data).asOptionalPayloadWrapper<P>()?.typedPayload(payloadClass)?.let { publisher.handle(it) }
4744
publisher.payloads[result.uniqueId]?.apply {
48-
if (handle != null) {
49-
this.complete(result.typedPayload(resultClass))
50-
}
45+
//val handle = String(data).asOptionalPayloadWrapper<P>()?.typedPayload(payloadClass)?.let { publisher.handle(it) }
46+
this.complete(result.typedPayload(resultClass))
5147
publisher.payloads.remove(result.uniqueId)
52-
} ?: throw IllegalStateException("No payload found for uniqueId ${result.uniqueId}")
48+
} /*?: throw IllegalStateException("No payload found for uniqueId ${result.uniqueId}")*/
5349
}
5450
}
5551
}

stickynote-core/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ dependencies {
44
api(libs.configurate.extra.kotlin)
55
api(libs.cloud.core)
66
api(libs.cloud.kotlin.extensions)
7-
api(libs.cloud.kotlin.coroutines)
7+
// api(libs.cloud.kotlin.coroutines)
88
api(libs.adventure.api)
99
api(libs.adventure.text.minimessage)
1010
api(libs.adventure.text.serializer.gson)

stickynote-core/src/main/kotlin/org/sayandev/stickynote/core/command/Command.kt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ import org.incendo.cloud.parser.standard.StringParser
1212
import org.incendo.cloud.suggestion.Suggestion
1313
import org.sayandev.stickynote.core.command.interfaces.CommandExtension
1414
import org.sayandev.stickynote.core.command.interfaces.SenderExtension
15+
import org.sayandev.stickynote.core.utils.CoroutineUtils.launch
1516
import java.util.concurrent.CompletableFuture
17+
import kotlin.coroutines.CoroutineContext
1618

1719
abstract class Command<S: SenderExtension<*, *>>(
1820
val rootId: String,
@@ -57,6 +59,14 @@ abstract class Command<S: SenderExtension<*, *>>(
5759
partedPermission.removeAt(0)
5860
permission("${rootId}.commands.${partedPermission.map { it.name() }.distinct().joinToString(".")}")
5961
}
62+
63+
fun <S : Any> MutableCommandBuilder<S>.suspendingHandler(dispatcher: CoroutineContext, context: suspend (CommandContext<S>) -> Unit) {
64+
this.handler {
65+
launch(dispatcher) {
66+
context(it)
67+
}
68+
}
69+
}
6070
}
6171

6272
internal fun CommandComponent.Builder<SenderExtension<*, *>, String>.createStringSuggestion(suggestions: Collection<String>) {

stickynote-core/src/main/kotlin/org/sayandev/stickynote/core/messaging/publisher/PayloadWrapper.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package org.sayandev.stickynote.core.messaging.publisher
22

33
import com.google.gson.Gson
44
import com.google.gson.GsonBuilder
5+
import com.google.gson.JsonIOException
56
import com.google.gson.TypeAdapter
67
import java.util.*
78

@@ -78,7 +79,11 @@ data class PayloadWrapper<P>(
7879
}
7980

8081
fun <P> PayloadWrapper<*>.typedPayload(payloadClass: Class<P>): P {
81-
return gson.fromJson(gson.toJson(this.payload), payloadClass)
82+
return try {
83+
gson.fromJson(gson.toJson(this.payload), payloadClass)
84+
} catch (e: JsonIOException) {
85+
throw IllegalStateException("Could not convert payload to $this", e)
86+
}
8287
}
8388
}
8489
}

stickynote-core/src/main/kotlin/org/sayandev/stickynote/core/messaging/publisher/Publisher.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ abstract class Publisher<P, S>(
1616
return "$namespace:$name"
1717
}
1818

19-
open fun publish(payloadWrapper: PayloadWrapper<P>): CompletableDeferred<S> {
19+
open suspend fun publish(payloadWrapper: PayloadWrapper<P>): CompletableDeferred<S> {
2020
val deferred = CompletableDeferred<S>()
2121
payloads[payloadWrapper.uniqueId] = deferred
2222

Lines changed: 48 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,24 @@
11
package org.sayandev.stickynote.core.messaging.publisher
22

3-
import com.google.common.util.concurrent.ThreadFactoryBuilder
4-
import kotlinx.coroutines.CompletableDeferred;
5-
import org.sayandev.stickynote.core.messaging.publisher.PayloadWrapper.Companion.asJson;
6-
import org.sayandev.stickynote.core.messaging.publisher.PayloadWrapper.Companion.asPayloadWrapper;
7-
import org.sayandev.stickynote.core.messaging.publisher.PayloadWrapper.Companion.typedPayload;
8-
import redis.clients.jedis.JedisPool;
9-
import redis.clients.jedis.JedisPubSub;
3+
import kotlinx.coroutines.CompletableDeferred
4+
import kotlinx.coroutines.CoroutineDispatcher
5+
import org.sayandev.stickynote.core.messaging.publisher.PayloadWrapper.Companion.asJson
6+
import org.sayandev.stickynote.core.messaging.publisher.PayloadWrapper.Companion.asPayloadWrapper
7+
import org.sayandev.stickynote.core.messaging.publisher.PayloadWrapper.Companion.typedPayload
8+
import org.sayandev.stickynote.core.messaging.subscriber.RedisSubscriber
9+
import org.sayandev.stickynote.core.utils.CoroutineUtils.awaitWithTimeout
10+
import org.sayandev.stickynote.core.utils.CoroutineUtils.launch
11+
import redis.clients.jedis.JedisPool
12+
import redis.clients.jedis.JedisPubSub
1013
import java.util.*
11-
import java.util.concurrent.*;
12-
import java.util.logging.Logger;
14+
import java.util.logging.Logger
1315

1416
abstract class RedisPublisher<P, S>(
17+
val dispatcher: CoroutineDispatcher,
1518
val redis: JedisPool,
1619
namespace: String,
1720
name: String,
21+
val payloadClass: Class<P>,
1822
val resultClass: Class<S>,
1923
logger: Logger
2024
) : Publisher<P, S>(
@@ -26,16 +30,35 @@ abstract class RedisPublisher<P, S>(
2630

2731
private val subJedis = redis.resource
2832
private val pubJedis = redis.resource
29-
private val executor = Executors.newSingleThreadExecutor(ThreadFactoryBuilder().setNameFormat("redis-pub-pub-thread-${channel}-%d").build())
30-
private val TIMEOUT_SECONDS = 5L
3133

3234
init {
3335
val pubSub = object : JedisPubSub() {
3436
override fun onMessage(channel: String, message: String) {
3537
if (channel != this@RedisPublisher.channel) return
3638
val result = message.asPayloadWrapper<S>()
3739
when (result.state) {
40+
PayloadWrapper.State.FORWARD -> {
41+
val wrappedPayload = message.asPayloadWrapper<P>()
42+
println("Before ${HANDLER_LIST.flatMap { publisher -> publisher.payloads.keys }.joinToString(", ")}")
43+
println("UniqueId: ${wrappedPayload.uniqueId}")
44+
println("Exclude: ${wrappedPayload.excludeSource} isSource: ${isSource(wrappedPayload.uniqueId)}")
45+
if (wrappedPayload.excludeSource && isSource(wrappedPayload.uniqueId)) return
46+
println("After")
47+
val payloadResult = handle(wrappedPayload.typedPayload(payloadClass)) ?: return
48+
pubJedis.publish(
49+
channel.toByteArray(),
50+
PayloadWrapper(
51+
wrappedPayload.uniqueId,
52+
payloadResult,
53+
PayloadWrapper.State.RESPOND,
54+
wrappedPayload.source,
55+
wrappedPayload.target,
56+
wrappedPayload.excludeSource
57+
).asJson().toByteArray()
58+
)
59+
}
3860
PayloadWrapper.State.RESPOND -> {
61+
println("Respond")
3962
for (publisher in HANDLER_LIST.filterIsInstance<RedisPublisher<P, S>>()) {
4063
if (publisher.id() == channel) {
4164
publisher.payloads[result.uniqueId]?.apply {
@@ -46,32 +69,29 @@ abstract class RedisPublisher<P, S>(
4669
}
4770
}
4871
PayloadWrapper.State.PROXY -> {}
49-
else -> {}
5072
}
5173
}
52-
};
74+
}
5375
Thread({ subJedis.subscribe(pubSub, channel) }, "redis-pub-sub-thread-${channel}-${UUID.randomUUID().toString().split("-").first()}").start()
5476
}
5577

56-
override fun publish(payloadWrapper: PayloadWrapper<P>): CompletableDeferred<S> {
57-
val future = executor.submit<Boolean> {
58-
pubJedis.publish(channel.toByteArray(), payloadWrapper.asJson().toByteArray());
59-
return@submit true;
60-
}
78+
override suspend fun publish(payload: PayloadWrapper<P>): CompletableDeferred<S> {
79+
val result = super.publish(payload)
6180

62-
try {
63-
if (!future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
64-
logger.warning("failed to publish payload `${payloadWrapper}` within $TIMEOUT_SECONDS seconds.")
65-
}
66-
} catch (e: TimeoutException) {
67-
logger.warning("failed to publish payload `${payloadWrapper}` after $TIMEOUT_SECONDS seconds. (timed-out)")
68-
future.cancel(true)
69-
} catch (e: Exception) {
70-
e.printStackTrace()
81+
launch(dispatcher) {
82+
pubJedis.publish(channel.toByteArray(), payload.asJson().toByteArray())
7183
}
7284

73-
return super.publish(payloadWrapper)
85+
return result
7486
}
7587

7688
abstract fun handle(payload: P): S?
89+
90+
fun isSource(uniqueId: UUID): Boolean {
91+
return HANDLER_LIST.flatMap { publisher -> publisher.payloads.keys }.contains(uniqueId)
92+
}
93+
94+
companion object {
95+
const val TIMEOUT_SECONDS = 5L
96+
}
7797
}

0 commit comments

Comments
 (0)