Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import dev.restate.sdktesting.infra.InjectAdminURI
import dev.restate.sdktesting.infra.InjectClient
import dev.restate.sdktesting.infra.RestateDeployerExtension
import java.net.URI
import org.apache.logging.log4j.LogManager
import org.assertj.core.api.Assertions.assertThat
import org.awaitility.kotlin.await
import org.awaitility.kotlin.withAlias
Expand Down Expand Up @@ -52,8 +51,6 @@ class PauseResumeChangingDeploymentTest {
}

companion object {
private val LOG = LogManager.getLogger(PauseResumeChangingDeploymentTest::class.java)

@RegisterExtension
@JvmField
val deployerExt: RestateDeployerExtension = RestateDeployerExtension {
Expand Down Expand Up @@ -95,12 +92,7 @@ class PauseResumeChangingDeploymentTest {
// Resume the paused invocation on the specific endpoint
val adminClient = ApiClient().setHost(adminURI.host).setPort(adminURI.port)
val invocationApi = InvocationApi(adminClient)
try {
invocationApi.resumeInvocation(invocationId, local.deploymentId)
} catch (e: Exception) {
LOG.error("Failed to resume invocation {}: {}", invocationId, e.message)
throw e
}
retryOnServiceUnavailable { invocationApi.resumeInvocation(invocationId, local.deploymentId) }

assertThat(sendResult.attachSuspend().response()).isEqualTo("Success in new version!")

Expand Down
10 changes: 1 addition & 9 deletions src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import dev.restate.sdktesting.infra.InjectClient
import dev.restate.sdktesting.infra.RestateDeployerExtension
import java.net.URI
import java.util.concurrent.atomic.AtomicBoolean
import org.apache.logging.log4j.LogManager
import org.assertj.core.api.Assertions.assertThat
import org.awaitility.kotlin.await
import org.awaitility.kotlin.withAlias
Expand Down Expand Up @@ -49,8 +48,6 @@ class PauseResumeTest {
}

companion object {
private val LOG = LogManager.getLogger(PauseResumeTest::class.java)

@RegisterExtension
@JvmField
val deployerExt: RestateDeployerExtension = RestateDeployerExtension {
Expand Down Expand Up @@ -93,12 +90,7 @@ class PauseResumeTest {
// Resume the paused invocation on the specific endpoint
val adminClient = ApiClient().setHost(adminURI.host).setPort(adminURI.port)
val invocationApi = InvocationApi(adminClient)
try {
invocationApi.resumeInvocation(invocationId, null)
} catch (e: Exception) {
LOG.error("Failed to resume invocation {}: {}", invocationId, e.message)
throw e
}
retryOnServiceUnavailable { invocationApi.resumeInvocation(invocationId, null) }

assertThat(sendResult.attachSuspend().response()).isEqualTo("input")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ class RestartAsNewInvocationTest {
val adminClient = ApiClient().setHost(adminURI.host).setPort(adminURI.port)
val invocationApi = InvocationApi(adminClient)
val newInvocationId =
invocationApi.restartAsNewInvocation(sendResult.invocationId(), null, null).newInvocationId
retryOnServiceUnavailable {
invocationApi.restartAsNewInvocation(sendResult.invocationId(), null, null)
}
.newInvocationId

// Assert this returns the input
val newInvocationResult =
Expand Down Expand Up @@ -154,7 +157,10 @@ class RestartAsNewInvocationTest {
val adminClient = ApiClient().setHost(adminURI.host).setPort(adminURI.port)
val invocationApi = InvocationApi(adminClient)
val newInvocationId =
invocationApi.restartAsNewInvocation(sendResult.invocationId(), 1, null).newInvocationId
retryOnServiceUnavailable {
invocationApi.restartAsNewInvocation(sendResult.invocationId(), 1, null)
}
.newInvocationId

// Assert this returns the input
val newInvocationResult =
Expand Down
16 changes: 16 additions & 0 deletions src/main/kotlin/dev/restate/sdktesting/tests/utils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package dev.restate.sdktesting.tests

import dev.restate.admin.api.DeploymentApi
import dev.restate.admin.client.ApiClient
import dev.restate.admin.client.ApiException
import dev.restate.admin.model.RegisterDeploymentRequest
import dev.restate.admin.model.RegisterHttpDeploymentRequest
import dev.restate.common.RequestBuilder
Expand All @@ -21,6 +22,7 @@ import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.util.UUID
import java.util.concurrent.TimeUnit
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration
Expand All @@ -37,6 +39,7 @@ import kotlinx.serialization.json.Json
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.kotlin.additionalLoggingContext
import org.assertj.core.api.Assertions.assertThat
import org.awaitility.Awaitility
import org.awaitility.core.ConditionFactory
import org.testcontainers.Testcontainers

Expand All @@ -62,6 +65,19 @@ val idempotentCallOptions: RequestBuilder<*, *>.() -> Unit = {
idempotencyKey = UUID.randomUUID().toString()
}

/**
* Retries a block that may throw ApiException with 503 status due to leadership changes. Uses a
* 30-second timeout to stay well under the default 60-second test timeout. Only retries on 503
* errors; other errors are propagated immediately.
*/
fun <T> retryOnServiceUnavailable(block: () -> T): T {
return Awaitility.await()
.atMost(30, TimeUnit.SECONDS)
.pollInterval(100, TimeUnit.MILLISECONDS)
.ignoreExceptionsMatching { e -> e is ApiException && e.code == 503 }
.until({ block() }) { true }
}

/** Data classes for sys_journal query result */
@Serializable data class JournalQueryResult(val rows: List<SysJournalEntry> = emptyList())

Expand Down
Loading