diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeChangingDeploymentTest.kt b/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeChangingDeploymentTest.kt index 3303ba84..efbf1151 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeChangingDeploymentTest.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeChangingDeploymentTest.kt @@ -22,7 +22,6 @@ import dev.restate.sdktesting.infra.InjectAdminURI import dev.restate.sdktesting.infra.InjectClient import dev.restate.sdktesting.infra.RestateDeployerExtension import java.net.URI -import org.apache.logging.log4j.LogManager import org.assertj.core.api.Assertions.assertThat import org.awaitility.kotlin.await import org.awaitility.kotlin.withAlias @@ -52,8 +51,6 @@ class PauseResumeChangingDeploymentTest { } companion object { - private val LOG = LogManager.getLogger(PauseResumeChangingDeploymentTest::class.java) - @RegisterExtension @JvmField val deployerExt: RestateDeployerExtension = RestateDeployerExtension { @@ -95,12 +92,7 @@ class PauseResumeChangingDeploymentTest { // Resume the paused invocation on the specific endpoint val adminClient = ApiClient().setHost(adminURI.host).setPort(adminURI.port) val invocationApi = InvocationApi(adminClient) - try { - invocationApi.resumeInvocation(invocationId, local.deploymentId) - } catch (e: Exception) { - LOG.error("Failed to resume invocation {}: {}", invocationId, e.message) - throw e - } + retryOnServiceUnavailable { invocationApi.resumeInvocation(invocationId, local.deploymentId) } assertThat(sendResult.attachSuspend().response()).isEqualTo("Success in new version!") diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeTest.kt b/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeTest.kt index 8f1187c8..d3c06276 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeTest.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeTest.kt @@ -21,7 +21,6 @@ import dev.restate.sdktesting.infra.InjectClient import dev.restate.sdktesting.infra.RestateDeployerExtension import java.net.URI import java.util.concurrent.atomic.AtomicBoolean -import org.apache.logging.log4j.LogManager import org.assertj.core.api.Assertions.assertThat import org.awaitility.kotlin.await import org.awaitility.kotlin.withAlias @@ -49,8 +48,6 @@ class PauseResumeTest { } companion object { - private val LOG = LogManager.getLogger(PauseResumeTest::class.java) - @RegisterExtension @JvmField val deployerExt: RestateDeployerExtension = RestateDeployerExtension { @@ -93,12 +90,7 @@ class PauseResumeTest { // Resume the paused invocation on the specific endpoint val adminClient = ApiClient().setHost(adminURI.host).setPort(adminURI.port) val invocationApi = InvocationApi(adminClient) - try { - invocationApi.resumeInvocation(invocationId, null) - } catch (e: Exception) { - LOG.error("Failed to resume invocation {}: {}", invocationId, e.message) - throw e - } + retryOnServiceUnavailable { invocationApi.resumeInvocation(invocationId, null) } assertThat(sendResult.attachSuspend().response()).isEqualTo("input") diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/RestartAsNewInvocationTest.kt b/src/main/kotlin/dev/restate/sdktesting/tests/RestartAsNewInvocationTest.kt index f223b715..710771e2 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/RestartAsNewInvocationTest.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/RestartAsNewInvocationTest.kt @@ -99,7 +99,10 @@ class RestartAsNewInvocationTest { val adminClient = ApiClient().setHost(adminURI.host).setPort(adminURI.port) val invocationApi = InvocationApi(adminClient) val newInvocationId = - invocationApi.restartAsNewInvocation(sendResult.invocationId(), null, null).newInvocationId + retryOnServiceUnavailable { + invocationApi.restartAsNewInvocation(sendResult.invocationId(), null, null) + } + .newInvocationId // Assert this returns the input val newInvocationResult = @@ -154,7 +157,10 @@ class RestartAsNewInvocationTest { val adminClient = ApiClient().setHost(adminURI.host).setPort(adminURI.port) val invocationApi = InvocationApi(adminClient) val newInvocationId = - invocationApi.restartAsNewInvocation(sendResult.invocationId(), 1, null).newInvocationId + retryOnServiceUnavailable { + invocationApi.restartAsNewInvocation(sendResult.invocationId(), 1, null) + } + .newInvocationId // Assert this returns the input val newInvocationResult = diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/utils.kt b/src/main/kotlin/dev/restate/sdktesting/tests/utils.kt index 5d5feb9c..77daa31b 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/utils.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/utils.kt @@ -10,6 +10,7 @@ package dev.restate.sdktesting.tests import dev.restate.admin.api.DeploymentApi import dev.restate.admin.client.ApiClient +import dev.restate.admin.client.ApiException import dev.restate.admin.model.RegisterDeploymentRequest import dev.restate.admin.model.RegisterHttpDeploymentRequest import dev.restate.common.RequestBuilder @@ -21,6 +22,7 @@ import java.net.http.HttpClient import java.net.http.HttpRequest import java.net.http.HttpResponse import java.util.UUID +import java.util.concurrent.TimeUnit import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds import kotlin.time.toJavaDuration @@ -37,6 +39,7 @@ import kotlinx.serialization.json.Json import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.kotlin.additionalLoggingContext import org.assertj.core.api.Assertions.assertThat +import org.awaitility.Awaitility import org.awaitility.core.ConditionFactory import org.testcontainers.Testcontainers @@ -62,6 +65,19 @@ val idempotentCallOptions: RequestBuilder<*, *>.() -> Unit = { idempotencyKey = UUID.randomUUID().toString() } +/** + * Retries a block that may throw ApiException with 503 status due to leadership changes. Uses a + * 30-second timeout to stay well under the default 60-second test timeout. Only retries on 503 + * errors; other errors are propagated immediately. + */ +fun retryOnServiceUnavailable(block: () -> T): T { + return Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .ignoreExceptionsMatching { e -> e is ApiException && e.code == 503 } + .until({ block() }) { true } +} + /** Data classes for sys_journal query result */ @Serializable data class JournalQueryResult(val rows: List = emptyList())