Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import dev.restate.admin.api.DeploymentApi
import dev.restate.admin.client.ApiClient
import dev.restate.admin.client.ApiException
import dev.restate.admin.model.RegisterDeploymentRequest
import dev.restate.admin.model.RegisterDeploymentRequestAnyOf
import dev.restate.admin.model.RegisterHttpDeploymentRequest
import dev.restate.sdk.endpoint.Endpoint
import dev.restate.sdk.http.vertx.RestateHttpServer
import dev.restate.sdktesting.infra.runtimeconfig.IngressOptions
Expand Down Expand Up @@ -391,7 +391,7 @@ private constructor(
}

private fun discoverDeployment(client: DeploymentApi, uri: String) {
val request = RegisterDeploymentRequest(RegisterDeploymentRequestAnyOf().uri(uri).force(false))
val request = RegisterDeploymentRequest(RegisterHttpDeploymentRequest().uri(uri).force(false))

val response =
Unreliables.retryUntilSuccess(20, TimeUnit.SECONDS) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,14 @@ class FrontCompatibilityTest {
.dryRun(false))

try {
adminApi.updateDeployment(deployment.id, updateRequest)
adminApi.updateDeployment(deployment.httpDeploymentResponse.id, updateRequest)
LOG.info(
"Successfully updated deployment {} to use URI {}", deployment.id, localEndpointURI)
"Successfully updated deployment {} to use URI {}",
deployment.httpDeploymentResponse.id,
localEndpointURI)
} catch (e: Exception) {
LOG.error("Failed to update deployment {}: {}", deployment.id, e.message)
LOG.error(
"Failed to update deployment {}: {}", deployment.httpDeploymentResponse.id, e.message)
throw e
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate SDK Test suite tool,
// which is released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE
package dev.restate.sdktesting.tests

import dev.restate.admin.api.InvocationApi
import dev.restate.admin.client.ApiClient
import dev.restate.client.Client
import dev.restate.client.IngressException
import dev.restate.sdk.annotation.Handler
import dev.restate.sdk.annotation.Name
import dev.restate.sdk.annotation.Service
import dev.restate.sdk.endpoint.Endpoint
import dev.restate.sdk.kotlin.*
import dev.restate.sdktesting.infra.InjectAdminURI
import dev.restate.sdktesting.infra.InjectClient
import dev.restate.sdktesting.infra.RestateDeployerExtension
import java.net.URI
import java.util.concurrent.atomic.AtomicBoolean
import kotlinx.coroutines.future.await
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.InstanceOfAssertFactories.type
import org.awaitility.kotlin.await
import org.awaitility.kotlin.withAlias
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.RegisterExtension

class KillAndRestartInvocationTest {

@Service
@Name("RestartInvocation")
class RestartInvocation {
companion object {
val shouldFail = AtomicBoolean(true)
}

@Handler
suspend fun testHandler(ctx: Context): String {
ctx.runBlock("my-run") { "something" }

// Load if we should fail
val shouldFail = shouldFail.get()
if (shouldFail) {
throw IllegalStateException("Sorry, can't make any progress")
}

return "Done"
}
}

companion object {
@RegisterExtension
val deployerExt: RestateDeployerExtension = RestateDeployerExtension {
withEndpoint(Endpoint.bind(RestartInvocation()))
}
}

@Test
fun killAndRestart(
@InjectClient ingressClient: Client,
@InjectAdminURI adminURI: URI,
) = runTest {
// Create clients for the services
val restartClient =
KillAndRestartInvocationTestRestartInvocationClient.fromClient(ingressClient)

// Send request
val sendResult = restartClient.send().testHandler(init = idempotentCallOptions)
val blockedFirstRequest = sendResult.attachAsync()
val invocationId = sendResult.invocationId()

// Wait for the invocation to reach the error state
await withAlias
"invocation is stuck retrying" untilAsserted
{
assertThat(getInvocationStatus(adminURI, invocationId).rows)
.containsOnly(
SysInvocationEntry(id = invocationId, status = "backing-off", epoch = 0))
}

// Now restart the invocation
val adminClient = ApiClient().setHost(adminURI.host).setPort(adminURI.port)
val invocationApi = InvocationApi(adminClient)
val result = invocationApi.restartInvocation(invocationId, null, null, null)
assertThat(result.newInvocationEpoch).isEqualTo(1)

// At this point, the invocation is still stuck retrying, but the original attach was unblocked
// with an error
assertThat(runCatching { blockedFirstRequest.await() }.exceptionOrNull())
.asInstanceOf(type(IngressException::class.java))
.returns(471) { it.statusCode }

// Now let's re-attach, then unblock the service and wait for completion
val blockedSecondRequest = sendResult.attachAsync()
RestartInvocation.shouldFail.set(false)

assertThat(blockedSecondRequest.await().response()).isEqualTo("Done")

await withAlias
"got the two invocations in completed state, with same id and different epoch" untilAsserted
{
assertThat(getInvocationStatus(adminURI, invocationId).rows)
.containsExactlyInAnyOrder(
SysInvocationEntry(id = invocationId, status = "completed", epoch = 0),
SysInvocationEntry(id = invocationId, status = "completed", epoch = 1))
}
}
}
79 changes: 79 additions & 0 deletions src/main/kotlin/dev/restate/sdktesting/tests/utils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,23 @@
package dev.restate.sdktesting.tests

import dev.restate.common.RequestBuilder
import java.net.URI
import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.util.UUID
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.future.await
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withContext
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.kotlin.additionalLoggingContext
import org.awaitility.core.ConditionFactory
Expand All @@ -40,3 +48,74 @@ fun runTest(timeout: Duration = 60.seconds, testBody: suspend TestScope.() -> Un
val idempotentCallOptions: RequestBuilder<*, *>.() -> Unit = {
idempotencyKey = UUID.randomUUID().toString()
}

/** Data classes for sys_journal query result */
@Serializable data class JournalQueryResult(val rows: List<SysJournalEntry> = emptyList())

@Serializable
data class SysJournalEntry(val index: Int, @SerialName("entry_type") val entryType: String)

/** Data classes for sys_invocation query result */
@Serializable data class InvocationQueryResult(val rows: List<SysInvocationEntry> = emptyList())

@Serializable data class SysInvocationEntry(val id: String, val status: String, val epoch: Long)

/** JSON parser with configuration for sys_journal and sys_invocation query results */
private val sysQueryJson = Json {
ignoreUnknownKeys = true
coerceInputValues = true
}

/**
* Queries the sys_journal table for a given invocation ID and returns the parsed result.
*
* @param invocationId The ID of the invocation to query
* @param adminURI The URI of the Restate admin API
* @return The parsed result of the query
*/
suspend fun getJournal(adminURI: URI, invocationId: String): JournalQueryResult {
// Create the HTTP request to query sys_journal
val request =
HttpRequest.newBuilder()
.uri(URI.create("http://${adminURI.host}:${adminURI.port}/query"))
.header("accept", "application/json")
.header("content-type", "application/json")
.POST(
HttpRequest.BodyPublishers.ofString(
"""{"query": "SELECT index, entry_type FROM sys_journal WHERE id = '$invocationId'"}"""))
.build()

// Send the request and get the response
val response =
HttpClient.newHttpClient().sendAsync(request, HttpResponse.BodyHandlers.ofString()).await()

// Parse the response using Kotlin serialization
return sysQueryJson.decodeFromString<JournalQueryResult>(response.body())
}

/**
* Queries the sys_invocation table for a given invocation ID and returns the parsed result.
*
* @param invocationId The ID of the invocation to query
* @param adminURI The URI of the Restate admin API
* @return The parsed result of the query containing invocation status information
*/
suspend fun getInvocationStatus(adminURI: URI, invocationId: String): InvocationQueryResult {
// Create the HTTP request to query sys_invocation
val request =
HttpRequest.newBuilder()
.uri(URI.create("http://${adminURI.host}:${adminURI.port}/query"))
.header("accept", "application/json")
.header("content-type", "application/json")
.POST(
HttpRequest.BodyPublishers.ofString(
"""{"query": "SELECT id, status, epoch FROM sys_invocation WHERE id = '$invocationId'"}"""))
.build()

// Send the request and get the response
val response =
HttpClient.newHttpClient().sendAsync(request, HttpResponse.BodyHandlers.ofString()).await()

// Parse the response using Kotlin serialization
return sysQueryJson.decodeFromString<InvocationQueryResult>(response.body())
}
Loading
Loading