Skip to content

Commit c2445c1

Browse files
committed
chore: optimize redis message threading
1 parent aa1c917 commit c2445c1

File tree

4 files changed

+241
-218
lines changed

4 files changed

+241
-218
lines changed

build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ plugins {
88

99
allprojects {
1010
group = "org.sayandev"
11-
version = "1.10.4.60"
11+
version = "1.10.4.60-REDIS-EXPERIMENT"
1212
description = "A modular Kotlin framework for Minecraft: JE"
1313

1414
plugins.apply("maven-publish")

stickynote-core/src/main/kotlin/org/sayandev/stickynote/core/messaging/publisher/RedisPublisher.kt

Lines changed: 36 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,10 @@ import org.sayandev.stickynote.core.coroutine.dispatcher.AsyncDispatcher
77
import org.sayandev.stickynote.core.messaging.publisher.PayloadWrapper.Companion.asJson
88
import org.sayandev.stickynote.core.messaging.publisher.PayloadWrapper.Companion.asPayloadWrapper
99
import org.sayandev.stickynote.core.messaging.publisher.PayloadWrapper.Companion.typedPayload
10+
import org.sayandev.stickynote.core.messaging.redis.RedisConnectionManager
1011
import org.sayandev.stickynote.core.utils.CoroutineUtils.launch
1112
import redis.clients.jedis.JedisPool
12-
import redis.clients.jedis.JedisPubSub
13-
import redis.clients.jedis.exceptions.JedisException
1413
import java.util.*
15-
import java.util.concurrent.atomic.AtomicBoolean
1614
import java.util.logging.Level
1715
import java.util.logging.Logger
1816

@@ -30,115 +28,61 @@ abstract class RedisPublisher<P, S>(
3028
name
3129
) {
3230
val channel = "$namespace:$name"
33-
private var subJedis = redis.resource
34-
private var subscriberThread: Thread? = null
35-
private val isSubscribed = AtomicBoolean(false)
36-
private val shouldReconnect = AtomicBoolean(true)
37-
private val pubSub = createPubSub()
3831

3932
init {
40-
startSubscriber()
33+
RedisConnectionManager.registerNamespace(namespace, redis, dispatcher)
4134
}
4235

43-
private fun createPubSub(): JedisPubSub {
44-
return object : JedisPubSub() {
45-
override fun onMessage(channel: String, message: String) {
46-
if (channel != this@RedisPublisher.channel) return
47-
try {
48-
val result = message.asPayloadWrapper<S>()
49-
when (result.state) {
50-
PayloadWrapper.State.FORWARD -> handleForward(message)
51-
PayloadWrapper.State.RESPOND -> handleResponse(result)
52-
PayloadWrapper.State.PROXY -> {}
53-
}
54-
} catch (e: Exception) {
55-
logger.log(Level.WARNING, "Error processing message: ${e.message}")
56-
}
57-
}
58-
}
59-
}
36+
fun handleForwardMessage(payloadWrapper: PayloadWrapper<Any>) {
37+
if (payloadWrapper.excludeSource && isSource(payloadWrapper.uniqueId)) return
6038

61-
private fun handleForward(message: String) {
62-
val wrappedPayload = message.asPayloadWrapper<P>()
63-
if (wrappedPayload.excludeSource && isSource(wrappedPayload.uniqueId)) return
64-
val payloadResult = handle(wrappedPayload.typedPayload(payloadClass)) ?: return
65-
66-
val localJedis = redis.resource
6739
try {
68-
localJedis.publish(
69-
channel.toByteArray(),
70-
PayloadWrapper(
71-
wrappedPayload.uniqueId,
72-
payloadResult,
73-
PayloadWrapper.State.RESPOND,
74-
wrappedPayload.source,
75-
wrappedPayload.target,
76-
wrappedPayload.excludeSource
77-
).asJson().toByteArray()
78-
)
79-
} finally {
80-
localJedis.close()
81-
}
82-
}
40+
val typedPayload = payloadWrapper.typedPayload(payloadClass)
41+
val payloadResult = handle(typedPayload) ?: return
8342

84-
private fun handleResponse(result: PayloadWrapper<S>) {
85-
for (publisher in HANDLER_LIST.filterIsInstance<RedisPublisher<P, S>>()) {
86-
if (publisher.id() == channel) {
87-
publisher.payloads[result.uniqueId]?.apply {
88-
this.complete(result.typedPayload(resultClass))
89-
publisher.payloads.remove(result.uniqueId)
90-
}
43+
val localJedis = redis.resource
44+
try {
45+
localJedis.publish(
46+
channel.toByteArray(),
47+
PayloadWrapper(
48+
payloadWrapper.uniqueId,
49+
50+
payloadResult,
51+
PayloadWrapper.State.RESPOND,
52+
payloadWrapper.source,
53+
payloadWrapper.target,
54+
payloadWrapper.excludeSource
55+
).asJson().toByteArray()
56+
)
57+
} finally {
58+
localJedis.close()
9159
}
60+
} catch (e: Exception) {
61+
logger.log(Level.WARNING, "Error handling forward message: ${e.message}")
9262
}
9363
}
9464

95-
private fun startSubscriber() {
96-
if (!shouldReconnect.get() || isSubscribed.get()) return
97-
98-
synchronized(this) {
99-
if (isSubscribed.get()) return
100-
101-
subscriberThread?.interrupt()
102-
subscriberThread = Thread({
103-
while (shouldReconnect.get()) {
104-
try {
105-
subJedis = redis.resource
106-
isSubscribed.set(true)
107-
subJedis.subscribe(pubSub, channel)
108-
} catch (e: JedisException) {
109-
logger.log(Level.WARNING, "Redis connection lost: ${e.message}")
110-
isSubscribed.set(false)
111-
safeCloseJedis()
112-
Thread.sleep(5000) // Wait before reconnecting
113-
} catch (e: Exception) {
114-
logger.log(Level.SEVERE, "Unexpected error in subscriber: ${e.message}")
115-
isSubscribed.set(false)
116-
safeCloseJedis()
117-
Thread.sleep(5000)
118-
}
119-
}
120-
}, "redis-pub-sub-thread-${channel}-${UUID.randomUUID().toString().split("-").first()}")
121-
subscriberThread?.start()
122-
}
123-
}
124-
125-
private fun safeCloseJedis() {
65+
fun handleResponseMessage(payloadWrapper: PayloadWrapper<Any>) {
12666
try {
127-
subJedis.close()
67+
val typedResult = payloadWrapper.typedPayload(resultClass)
68+
payloads[payloadWrapper.uniqueId]?.apply {
69+
this.complete(typedResult)
70+
payloads.remove(payloadWrapper.uniqueId)
71+
}
12872
} catch (e: Exception) {
129-
logger.log(Level.WARNING, "Error closing Jedis connection: ${e.message}")
73+
logger.log(Level.WARNING, "Error handling response message: ${e.message}")
13074
}
13175
}
13276

133-
override suspend fun publish(payload: PayloadWrapper<P>): CompletableDeferred<S> {
134-
val result = super.publish(payload)
77+
override suspend fun publish(payloadWrapper: PayloadWrapper<P>): CompletableDeferred<S> {
78+
val result = super.publish(payloadWrapper)
13579

13680
launch(dispatcher) {
13781
val localJedis = redis.resource
13882
try {
139-
val published = localJedis.publish(channel.toByteArray(), payload.asJson().toByteArray())
83+
val published = localJedis.publish(channel.toByteArray(), payloadWrapper.asJson().toByteArray())
14084
if (published <= 0) {
141-
payloads.remove(payload.uniqueId)
85+
payloads.remove(payloadWrapper.uniqueId)
14286
return@launch
14387
}
14488
} finally {
@@ -148,7 +92,7 @@ abstract class RedisPublisher<P, S>(
14892
delay(TIMEOUT_SECONDS * 1000L)
14993
if (result.isActive) {
15094
result.completeExceptionally(IllegalStateException("No response received in $TIMEOUT_SECONDS seconds"))
151-
payloads.remove(payload.uniqueId)
95+
payloads.remove(payloadWrapper.uniqueId)
15296
}
15397
}
15498

@@ -162,26 +106,7 @@ abstract class RedisPublisher<P, S>(
162106
.flatMap { publisher -> publisher.payloads.keys.asSequence() }.contains(uniqueId)
163107
}
164108

165-
fun shutdown() {
166-
shouldReconnect.set(false)
167-
pubSub.unsubscribe()
168-
safeCloseJedis()
169-
subscriberThread?.interrupt()
170-
}
171-
172109
companion object {
173110
const val TIMEOUT_SECONDS = 5L
174-
175-
init {
176-
launch(AsyncDispatcher("pub-debug-memory", 1)) {
177-
while (true) {
178-
val amount = HANDLER_LIST.asSequence().sumOf { it.payloads.asSequence().count() }
179-
delay(60 * 5 * 1000)
180-
if (amount < 100 && amount > 0) return@launch
181-
println("Current payload amount (pub): $amount")
182-
delay(30_000)
183-
}
184-
}
185-
}
186111
}
187112
}
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
package org.sayandev.stickynote.core.messaging.redis
2+
3+
import kotlinx.coroutines.CoroutineDispatcher
4+
import org.sayandev.stickynote.core.messaging.publisher.PayloadWrapper
5+
import org.sayandev.stickynote.core.messaging.publisher.PayloadWrapper.Companion.asPayloadWrapper
6+
import org.sayandev.stickynote.core.messaging.publisher.Publisher
7+
import org.sayandev.stickynote.core.messaging.publisher.RedisPublisher
8+
import org.sayandev.stickynote.core.messaging.subscriber.RedisSubscriber
9+
import org.sayandev.stickynote.core.messaging.subscriber.Subscriber
10+
import redis.clients.jedis.JedisPool
11+
import redis.clients.jedis.JedisPubSub
12+
import redis.clients.jedis.exceptions.JedisException
13+
import java.util.*
14+
import java.util.concurrent.ConcurrentHashMap
15+
import java.util.concurrent.atomic.AtomicBoolean
16+
import java.util.logging.Level
17+
import java.util.logging.Logger
18+
19+
object RedisConnectionManager {
20+
private val namespaceSubscriptions = ConcurrentHashMap<String, NamespaceSubscription>()
21+
private val logger = Logger.getLogger(RedisConnectionManager::class.java.name)
22+
23+
fun registerNamespace(
24+
namespace: String,
25+
redis: JedisPool,
26+
dispatcher: CoroutineDispatcher
27+
) {
28+
namespaceSubscriptions.computeIfAbsent(namespace) {
29+
NamespaceSubscription(namespace, redis, dispatcher)
30+
}
31+
}
32+
33+
fun unregisterNamespace(namespace: String) {
34+
namespaceSubscriptions[namespace]?.shutdown()
35+
namespaceSubscriptions.remove(namespace)
36+
}
37+
38+
fun getSubscription(namespace: String): NamespaceSubscription? {
39+
return namespaceSubscriptions[namespace]
40+
}
41+
42+
fun shutdown() {
43+
namespaceSubscriptions.values.forEach { it.shutdown() }
44+
namespaceSubscriptions.clear()
45+
}
46+
47+
class NamespaceSubscription(
48+
private val namespace: String,
49+
private val redis: JedisPool,
50+
private val dispatcher: CoroutineDispatcher
51+
) {
52+
private val pattern = "$namespace:*"
53+
private var subJedis = redis.resource
54+
private var subscriberThread: Thread? = null
55+
private val isSubscribed = AtomicBoolean(false)
56+
private val shouldReconnect = AtomicBoolean(true)
57+
private val pubSub = createPubSub()
58+
59+
init {
60+
startSubscriber()
61+
}
62+
63+
private fun createPubSub(): JedisPubSub {
64+
return object : JedisPubSub() {
65+
override fun onPMessage(pattern: String, channel: String, message: String) {
66+
if (!channel.startsWith("$namespace:")) return
67+
68+
try {
69+
val channelName = channel.substringAfter("$namespace:")
70+
handleMessage(channelName, message)
71+
} catch (e: Exception) {
72+
logger.log(Level.WARNING, "Error processing message on channel $channel: ${e.message}")
73+
}
74+
}
75+
}
76+
}
77+
78+
private fun handleMessage(channelName: String, message: String) {
79+
try {
80+
val payloadWrapper = message.asPayloadWrapper<Any>()
81+
82+
when (payloadWrapper.state) {
83+
PayloadWrapper.State.FORWARD -> handleForwardMessage(channelName, payloadWrapper)
84+
PayloadWrapper.State.RESPOND -> handleResponseMessage(channelName, payloadWrapper)
85+
PayloadWrapper.State.PROXY -> handleProxyMessage(channelName, payloadWrapper)
86+
}
87+
} catch (e: Exception) {
88+
logger.log(Level.WARNING, "Error parsing message: ${e.message}")
89+
}
90+
}
91+
92+
private fun handleForwardMessage(channelName: String, payloadWrapper: PayloadWrapper<Any>) {
93+
// Find matching publisher
94+
Publisher.HANDLER_LIST
95+
.filterIsInstance<RedisPublisher<*, *>>()
96+
.find { it.namespace == namespace && it.name == channelName }
97+
?.handleForwardMessage(payloadWrapper)
98+
99+
// Find matching subscriber
100+
Subscriber.HANDLER_LIST
101+
.filterIsInstance<RedisSubscriber<*, *>>()
102+
.find { it.namespace == namespace && it.name == channelName }
103+
?.handleForwardMessage(payloadWrapper)
104+
}
105+
106+
private fun handleResponseMessage(channelName: String, payloadWrapper: PayloadWrapper<Any>) {
107+
Publisher.HANDLER_LIST
108+
.filterIsInstance<RedisPublisher<*, *>>()
109+
.filter { it.namespace == namespace && it.name == channelName }
110+
.forEach { it.handleResponseMessage(payloadWrapper) }
111+
}
112+
113+
private fun handleProxyMessage(channelName: String, payloadWrapper: PayloadWrapper<Any>) {
114+
Subscriber.HANDLER_LIST
115+
.filterIsInstance<RedisSubscriber<*, *>>()
116+
.find { it.namespace == namespace && it.name == channelName }
117+
?.handleProxyMessage(payloadWrapper)
118+
}
119+
120+
private fun startSubscriber() {
121+
if (!shouldReconnect.get() || isSubscribed.get()) return
122+
123+
synchronized(this) {
124+
if (isSubscribed.get()) return
125+
126+
subscriberThread?.interrupt()
127+
subscriberThread = Thread({
128+
while (shouldReconnect.get()) {
129+
try {
130+
subJedis = redis.resource
131+
isSubscribed.set(true)
132+
subJedis.psubscribe(pubSub, pattern)
133+
} catch (e: JedisException) {
134+
logger.log(Level.WARNING, "Redis connection lost for namespace $namespace: ${e.message}")
135+
isSubscribed.set(false)
136+
safeCloseJedis()
137+
Thread.sleep(5000)
138+
} catch (e: Exception) {
139+
logger.log(Level.SEVERE, "Unexpected error in subscriber for namespace $namespace: ${e.message}")
140+
isSubscribed.set(false)
141+
safeCloseJedis()
142+
Thread.sleep(5000)
143+
}
144+
}
145+
}, "redis-namespace-sub-$namespace-${UUID.randomUUID().toString().split("-").first()}")
146+
subscriberThread?.start()
147+
}
148+
}
149+
150+
private fun safeCloseJedis() {
151+
try {
152+
subJedis.close()
153+
} catch (e: Exception) {
154+
logger.log(Level.WARNING, "Error closing Jedis connection for namespace $namespace: ${e.message}")
155+
}
156+
}
157+
158+
fun shutdown() {
159+
shouldReconnect.set(false)
160+
pubSub.punsubscribe()
161+
safeCloseJedis()
162+
subscriberThread?.interrupt()
163+
}
164+
}
165+
}

0 commit comments

Comments
 (0)