From ae78c02ebe7b9b4a55e1a9f114c35171ab8113c8 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 12 Jan 2026 17:23:58 +0100 Subject: [PATCH] Fix flaky tests by retrying admin API calls on 503 errors Add retryOnServiceUnavailable helper that uses Awaitility to retry admin API calls that fail with 503 Service Unavailable due to leadership changes in multi-node clusters. Applied to: - PauseResumeTest: resumeInvocation - PauseResumeChangingDeploymentTest: resumeInvocation - RestartAsNewInvocationTest: restartAsNewInvocation (2 locations) Fixes #401 --- .../tests/PauseResumeChangingDeploymentTest.kt | 10 +--------- .../restate/sdktesting/tests/PauseResumeTest.kt | 10 +--------- .../tests/RestartAsNewInvocationTest.kt | 10 ++++++++-- .../kotlin/dev/restate/sdktesting/tests/utils.kt | 16 ++++++++++++++++ 4 files changed, 26 insertions(+), 20 deletions(-) diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeChangingDeploymentTest.kt b/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeChangingDeploymentTest.kt index 3303ba84..efbf1151 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeChangingDeploymentTest.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeChangingDeploymentTest.kt @@ -22,7 +22,6 @@ import dev.restate.sdktesting.infra.InjectAdminURI import dev.restate.sdktesting.infra.InjectClient import dev.restate.sdktesting.infra.RestateDeployerExtension import java.net.URI -import org.apache.logging.log4j.LogManager import org.assertj.core.api.Assertions.assertThat import org.awaitility.kotlin.await import org.awaitility.kotlin.withAlias @@ -52,8 +51,6 @@ class PauseResumeChangingDeploymentTest { } companion object { - private val LOG = LogManager.getLogger(PauseResumeChangingDeploymentTest::class.java) - @RegisterExtension @JvmField val deployerExt: RestateDeployerExtension = RestateDeployerExtension { @@ -95,12 +92,7 @@ class PauseResumeChangingDeploymentTest { // Resume the paused invocation on the specific endpoint val adminClient = ApiClient().setHost(adminURI.host).setPort(adminURI.port) val invocationApi = InvocationApi(adminClient) - try { - invocationApi.resumeInvocation(invocationId, local.deploymentId) - } catch (e: Exception) { - LOG.error("Failed to resume invocation {}: {}", invocationId, e.message) - throw e - } + retryOnServiceUnavailable { invocationApi.resumeInvocation(invocationId, local.deploymentId) } assertThat(sendResult.attachSuspend().response()).isEqualTo("Success in new version!") diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeTest.kt b/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeTest.kt index 8f1187c8..d3c06276 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeTest.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeTest.kt @@ -21,7 +21,6 @@ import dev.restate.sdktesting.infra.InjectClient import dev.restate.sdktesting.infra.RestateDeployerExtension import java.net.URI import java.util.concurrent.atomic.AtomicBoolean -import org.apache.logging.log4j.LogManager import org.assertj.core.api.Assertions.assertThat import org.awaitility.kotlin.await import org.awaitility.kotlin.withAlias @@ -49,8 +48,6 @@ class PauseResumeTest { } companion object { - private val LOG = LogManager.getLogger(PauseResumeTest::class.java) - @RegisterExtension @JvmField val deployerExt: RestateDeployerExtension = RestateDeployerExtension { @@ -93,12 +90,7 @@ class PauseResumeTest { // Resume the paused invocation on the specific endpoint val adminClient = ApiClient().setHost(adminURI.host).setPort(adminURI.port) val invocationApi = InvocationApi(adminClient) - try { - invocationApi.resumeInvocation(invocationId, null) - } catch (e: Exception) { - LOG.error("Failed to resume invocation {}: {}", invocationId, e.message) - throw e - } + retryOnServiceUnavailable { invocationApi.resumeInvocation(invocationId, null) } assertThat(sendResult.attachSuspend().response()).isEqualTo("input") diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/RestartAsNewInvocationTest.kt b/src/main/kotlin/dev/restate/sdktesting/tests/RestartAsNewInvocationTest.kt index f223b715..710771e2 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/RestartAsNewInvocationTest.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/RestartAsNewInvocationTest.kt @@ -99,7 +99,10 @@ class RestartAsNewInvocationTest { val adminClient = ApiClient().setHost(adminURI.host).setPort(adminURI.port) val invocationApi = InvocationApi(adminClient) val newInvocationId = - invocationApi.restartAsNewInvocation(sendResult.invocationId(), null, null).newInvocationId + retryOnServiceUnavailable { + invocationApi.restartAsNewInvocation(sendResult.invocationId(), null, null) + } + .newInvocationId // Assert this returns the input val newInvocationResult = @@ -154,7 +157,10 @@ class RestartAsNewInvocationTest { val adminClient = ApiClient().setHost(adminURI.host).setPort(adminURI.port) val invocationApi = InvocationApi(adminClient) val newInvocationId = - invocationApi.restartAsNewInvocation(sendResult.invocationId(), 1, null).newInvocationId + retryOnServiceUnavailable { + invocationApi.restartAsNewInvocation(sendResult.invocationId(), 1, null) + } + .newInvocationId // Assert this returns the input val newInvocationResult = diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/utils.kt b/src/main/kotlin/dev/restate/sdktesting/tests/utils.kt index 5d5feb9c..77daa31b 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/utils.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/utils.kt @@ -10,6 +10,7 @@ package dev.restate.sdktesting.tests import dev.restate.admin.api.DeploymentApi import dev.restate.admin.client.ApiClient +import dev.restate.admin.client.ApiException import dev.restate.admin.model.RegisterDeploymentRequest import dev.restate.admin.model.RegisterHttpDeploymentRequest import dev.restate.common.RequestBuilder @@ -21,6 +22,7 @@ import java.net.http.HttpClient import java.net.http.HttpRequest import java.net.http.HttpResponse import java.util.UUID +import java.util.concurrent.TimeUnit import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds import kotlin.time.toJavaDuration @@ -37,6 +39,7 @@ import kotlinx.serialization.json.Json import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.kotlin.additionalLoggingContext import org.assertj.core.api.Assertions.assertThat +import org.awaitility.Awaitility import org.awaitility.core.ConditionFactory import org.testcontainers.Testcontainers @@ -62,6 +65,19 @@ val idempotentCallOptions: RequestBuilder<*, *>.() -> Unit = { idempotencyKey = UUID.randomUUID().toString() } +/** + * Retries a block that may throw ApiException with 503 status due to leadership changes. Uses a + * 30-second timeout to stay well under the default 60-second test timeout. Only retries on 503 + * errors; other errors are propagated immediately. + */ +fun retryOnServiceUnavailable(block: () -> T): T { + return Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .ignoreExceptionsMatching { e -> e is ApiException && e.code == 503 } + .until({ block() }) { true } +} + /** Data classes for sys_journal query result */ @Serializable data class JournalQueryResult(val rows: List = emptyList())