From 3543d31bcfaa29a62c6fe5997d51bb8779f251c6 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Thu, 5 Jun 2025 15:00:46 +0200 Subject: [PATCH] Kill and Restart test --- .../sdktesting/infra/RestateDeployer.kt | 4 +- .../tests/FrontCompatibilityTest.kt | 9 +- .../tests/KillAndRestartInvocationTest.kt | 113 +++ .../dev/restate/sdktesting/tests/utils.kt | 79 ++ src/main/openapi/admin.json | 928 ++++++++++++++++-- 5 files changed, 1071 insertions(+), 62 deletions(-) create mode 100644 src/main/kotlin/dev/restate/sdktesting/tests/KillAndRestartInvocationTest.kt diff --git a/src/main/kotlin/dev/restate/sdktesting/infra/RestateDeployer.kt b/src/main/kotlin/dev/restate/sdktesting/infra/RestateDeployer.kt index 85f8fc65..fb586c0e 100644 --- a/src/main/kotlin/dev/restate/sdktesting/infra/RestateDeployer.kt +++ b/src/main/kotlin/dev/restate/sdktesting/infra/RestateDeployer.kt @@ -13,7 +13,7 @@ import dev.restate.admin.api.DeploymentApi import dev.restate.admin.client.ApiClient import dev.restate.admin.client.ApiException import dev.restate.admin.model.RegisterDeploymentRequest -import dev.restate.admin.model.RegisterDeploymentRequestAnyOf +import dev.restate.admin.model.RegisterHttpDeploymentRequest import dev.restate.sdk.endpoint.Endpoint import dev.restate.sdk.http.vertx.RestateHttpServer import dev.restate.sdktesting.infra.runtimeconfig.IngressOptions @@ -391,7 +391,7 @@ private constructor( } private fun discoverDeployment(client: DeploymentApi, uri: String) { - val request = RegisterDeploymentRequest(RegisterDeploymentRequestAnyOf().uri(uri).force(false)) + val request = RegisterDeploymentRequest(RegisterHttpDeploymentRequest().uri(uri).force(false)) val response = Unreliables.retryUntilSuccess(20, TimeUnit.SECONDS) { diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/FrontCompatibilityTest.kt b/src/main/kotlin/dev/restate/sdktesting/tests/FrontCompatibilityTest.kt index cff44b71..233c62ad 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/FrontCompatibilityTest.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/FrontCompatibilityTest.kt @@ -254,11 +254,14 @@ class FrontCompatibilityTest { .dryRun(false)) try { - adminApi.updateDeployment(deployment.id, updateRequest) + adminApi.updateDeployment(deployment.httpDeploymentResponse.id, updateRequest) LOG.info( - "Successfully updated deployment {} to use URI {}", deployment.id, localEndpointURI) + "Successfully updated deployment {} to use URI {}", + deployment.httpDeploymentResponse.id, + localEndpointURI) } catch (e: Exception) { - LOG.error("Failed to update deployment {}: {}", deployment.id, e.message) + LOG.error( + "Failed to update deployment {}: {}", deployment.httpDeploymentResponse.id, e.message) throw e } } diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/KillAndRestartInvocationTest.kt b/src/main/kotlin/dev/restate/sdktesting/tests/KillAndRestartInvocationTest.kt new file mode 100644 index 00000000..5ec01afa --- /dev/null +++ b/src/main/kotlin/dev/restate/sdktesting/tests/KillAndRestartInvocationTest.kt @@ -0,0 +1,113 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate SDK Test suite tool, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE +package dev.restate.sdktesting.tests + +import dev.restate.admin.api.InvocationApi +import dev.restate.admin.client.ApiClient +import dev.restate.client.Client +import dev.restate.client.IngressException +import dev.restate.sdk.annotation.Handler +import dev.restate.sdk.annotation.Name +import dev.restate.sdk.annotation.Service +import dev.restate.sdk.endpoint.Endpoint +import dev.restate.sdk.kotlin.* +import dev.restate.sdktesting.infra.InjectAdminURI +import dev.restate.sdktesting.infra.InjectClient +import dev.restate.sdktesting.infra.RestateDeployerExtension +import java.net.URI +import java.util.concurrent.atomic.AtomicBoolean +import kotlinx.coroutines.future.await +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.InstanceOfAssertFactories.type +import org.awaitility.kotlin.await +import org.awaitility.kotlin.withAlias +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.RegisterExtension + +class KillAndRestartInvocationTest { + + @Service + @Name("RestartInvocation") + class RestartInvocation { + companion object { + val shouldFail = AtomicBoolean(true) + } + + @Handler + suspend fun testHandler(ctx: Context): String { + ctx.runBlock("my-run") { "something" } + + // Load if we should fail + val shouldFail = shouldFail.get() + if (shouldFail) { + throw IllegalStateException("Sorry, can't make any progress") + } + + return "Done" + } + } + + companion object { + @RegisterExtension + val deployerExt: RestateDeployerExtension = RestateDeployerExtension { + withEndpoint(Endpoint.bind(RestartInvocation())) + } + } + + @Test + fun killAndRestart( + @InjectClient ingressClient: Client, + @InjectAdminURI adminURI: URI, + ) = runTest { + // Create clients for the services + val restartClient = + KillAndRestartInvocationTestRestartInvocationClient.fromClient(ingressClient) + + // Send request + val sendResult = restartClient.send().testHandler(init = idempotentCallOptions) + val blockedFirstRequest = sendResult.attachAsync() + val invocationId = sendResult.invocationId() + + // Wait for the invocation to reach the error state + await withAlias + "invocation is stuck retrying" untilAsserted + { + assertThat(getInvocationStatus(adminURI, invocationId).rows) + .containsOnly( + SysInvocationEntry(id = invocationId, status = "backing-off", epoch = 0)) + } + + // Now restart the invocation + val adminClient = ApiClient().setHost(adminURI.host).setPort(adminURI.port) + val invocationApi = InvocationApi(adminClient) + val result = invocationApi.restartInvocation(invocationId, null, null, null) + assertThat(result.newInvocationEpoch).isEqualTo(1) + + // At this point, the invocation is still stuck retrying, but the original attach was unblocked + // with an error + assertThat(runCatching { blockedFirstRequest.await() }.exceptionOrNull()) + .asInstanceOf(type(IngressException::class.java)) + .returns(471) { it.statusCode } + + // Now let's re-attach, then unblock the service and wait for completion + val blockedSecondRequest = sendResult.attachAsync() + RestartInvocation.shouldFail.set(false) + + assertThat(blockedSecondRequest.await().response()).isEqualTo("Done") + + await withAlias + "got the two invocations in completed state, with same id and different epoch" untilAsserted + { + assertThat(getInvocationStatus(adminURI, invocationId).rows) + .containsExactlyInAnyOrder( + SysInvocationEntry(id = invocationId, status = "completed", epoch = 0), + SysInvocationEntry(id = invocationId, status = "completed", epoch = 1)) + } + } +} diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/utils.kt b/src/main/kotlin/dev/restate/sdktesting/tests/utils.kt index fc7a1ce5..50ecb5e3 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/utils.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/utils.kt @@ -9,15 +9,23 @@ package dev.restate.sdktesting.tests import dev.restate.common.RequestBuilder +import java.net.URI +import java.net.http.HttpClient +import java.net.http.HttpRequest +import java.net.http.HttpResponse import java.util.UUID import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.currentCoroutineContext +import kotlinx.coroutines.future.await import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.runTest import kotlinx.coroutines.withContext +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.Json import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.kotlin.additionalLoggingContext import org.awaitility.core.ConditionFactory @@ -40,3 +48,74 @@ fun runTest(timeout: Duration = 60.seconds, testBody: suspend TestScope.() -> Un val idempotentCallOptions: RequestBuilder<*, *>.() -> Unit = { idempotencyKey = UUID.randomUUID().toString() } + +/** Data classes for sys_journal query result */ +@Serializable data class JournalQueryResult(val rows: List = emptyList()) + +@Serializable +data class SysJournalEntry(val index: Int, @SerialName("entry_type") val entryType: String) + +/** Data classes for sys_invocation query result */ +@Serializable data class InvocationQueryResult(val rows: List = emptyList()) + +@Serializable data class SysInvocationEntry(val id: String, val status: String, val epoch: Long) + +/** JSON parser with configuration for sys_journal and sys_invocation query results */ +private val sysQueryJson = Json { + ignoreUnknownKeys = true + coerceInputValues = true +} + +/** + * Queries the sys_journal table for a given invocation ID and returns the parsed result. + * + * @param invocationId The ID of the invocation to query + * @param adminURI The URI of the Restate admin API + * @return The parsed result of the query + */ +suspend fun getJournal(adminURI: URI, invocationId: String): JournalQueryResult { + // Create the HTTP request to query sys_journal + val request = + HttpRequest.newBuilder() + .uri(URI.create("http://${adminURI.host}:${adminURI.port}/query")) + .header("accept", "application/json") + .header("content-type", "application/json") + .POST( + HttpRequest.BodyPublishers.ofString( + """{"query": "SELECT index, entry_type FROM sys_journal WHERE id = '$invocationId'"}""")) + .build() + + // Send the request and get the response + val response = + HttpClient.newHttpClient().sendAsync(request, HttpResponse.BodyHandlers.ofString()).await() + + // Parse the response using Kotlin serialization + return sysQueryJson.decodeFromString(response.body()) +} + +/** + * Queries the sys_invocation table for a given invocation ID and returns the parsed result. + * + * @param invocationId The ID of the invocation to query + * @param adminURI The URI of the Restate admin API + * @return The parsed result of the query containing invocation status information + */ +suspend fun getInvocationStatus(adminURI: URI, invocationId: String): InvocationQueryResult { + // Create the HTTP request to query sys_invocation + val request = + HttpRequest.newBuilder() + .uri(URI.create("http://${adminURI.host}:${adminURI.port}/query")) + .header("accept", "application/json") + .header("content-type", "application/json") + .POST( + HttpRequest.BodyPublishers.ofString( + """{"query": "SELECT id, status, epoch FROM sys_invocation WHERE id = '$invocationId'"}""")) + .build() + + // Send the request and get the response + val response = + HttpClient.newHttpClient().sendAsync(request, HttpResponse.BodyHandlers.ofString()).await() + + // Parse the response using Kotlin serialization + return sysQueryJson.decodeFromString(response.body()) +} diff --git a/src/main/openapi/admin.json b/src/main/openapi/admin.json index 79d02c00..a4ccd16f 100644 --- a/src/main/openapi/admin.json +++ b/src/main/openapi/admin.json @@ -2,7 +2,8 @@ "openapi": "3.0.0", "info": { "title": "Admin API", - "version": "1.3.0-rc.1" + "description": "This API exposes the admin operations of a Restate cluster, such as registering new service deployments, interacting with running invocations, register Kafka subscriptions, retrieve service metadata. For an overview, check out the [Operate documentation](https://docs.restate.dev/operate/). If you're looking for how to call your services, check out the [Ingress HTTP API](https://docs.restate.dev/invoke/http) instead.", + "version": "1.3.3-dev" }, "paths": { "/cluster-health": { @@ -114,6 +115,9 @@ ], "summary": "Create deployment", "description": "Create deployment. Restate will invoke the endpoint to gather additional information required for registration, such as the services exposed by the deployment. If the deployment is already registered, this method will fail unless `force` is set to `true`.", + "externalDocs": { + "url": "https://docs.restate.dev/operate/registration" + }, "operationId": "create_deployment", "requestBody": { "content": { @@ -297,6 +301,9 @@ ], "summary": "Update deployment", "description": "Update deployment. Invokes the endpoint and replaces the existing deployment metadata with the discovered information. This is a dangerous operation that should be used only when there are failing invocations on the deployment that cannot be resolved any other way. Sense checks are applied to test that the new deployment is sufficiently similar to the old one.", + "externalDocs": { + "url": "https://docs.restate.dev/operate/versioning" + }, "operationId": "update_deployment", "parameters": [ { @@ -510,7 +517,7 @@ "invocation" ], "summary": "Delete an invocation", - "description": "Delete the given invocation. By default, an invocation is terminated by gracefully cancelling it. This ensures virtual object state consistency. Alternatively, an invocation can be killed which does not guarantee consistency for virtual object instance state, in-flight invocations to other services, etc. A stored completed invocation can also be purged", + "description": "Use kill_invocation/cancel_invocation/purge_invocation instead.", "operationId": "delete_invocation", "parameters": [ { @@ -546,7 +553,427 @@ } } }, - "403": { + "403": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorDescriptionResponse" + } + } + } + }, + "404": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorDescriptionResponse" + } + } + } + }, + "409": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorDescriptionResponse" + } + } + } + }, + "500": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorDescriptionResponse" + } + } + } + }, + "503": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorDescriptionResponse" + } + } + } + } + }, + "deprecated": true + } + }, + "/invocations/{invocation_id}/cancel": { + "patch": { + "tags": [ + "invocation" + ], + "summary": "Cancel an invocation", + "description": "Cancel the given invocation. Canceling an invocation allows it to free any resources it is holding and roll back any changes it has made so far, running compensation code. For more details, checkout https://docs.restate.dev/guides/sagas", + "externalDocs": { + "url": "https://docs.restate.dev/guides/sagas" + }, + "operationId": "cancel_invocation", + "parameters": [ + { + "name": "invocation_id", + "in": "path", + "description": "Invocation identifier.", + "required": true, + "schema": { + "type": "string" + } + } + ], + "responses": { + "202": { + "description": "Accepted" + }, + "404 Not Found": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorDescriptionResponse" + } + } + } + }, + "503 Service Unavailable": { + "description": "Error when routing the request within restate.", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorDescriptionResponse" + } + } + } + }, + "400 Bad Request": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorDescriptionResponse" + } + } + } + }, + "409 Conflict": { + "description": "The invocation was already completed, so it cannot be cancelled nor killed. You can instead purge the invocation, in order for restate to forget it.", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorDescriptionResponse" + } + } + } + } + } + } + }, + "/invocations/{invocation_id}/kill": { + "patch": { + "tags": [ + "invocation" + ], + "summary": "Kill an invocation", + "description": "Kill the given invocation. This does not guarantee consistency for virtual object instance state, in-flight invocations to other services, etc.", + "operationId": "kill_invocation", + "parameters": [ + { + "name": "invocation_id", + "in": "path", + "description": "Invocation identifier.", + "required": true, + "schema": { + "type": "string" + } + } + ], + "responses": { + "200": { + "description": "" + }, + "404 Not Found": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorDescriptionResponse" + } + } + } + }, + "503 Service Unavailable": { + "description": "Error when routing the request within restate.", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorDescriptionResponse" + } + } + } + }, + "400 Bad Request": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorDescriptionResponse" + } + } + } + }, + "409 Conflict": { + "description": "The invocation was already completed, so it cannot be cancelled nor killed. You can instead purge the invocation, in order for restate to forget it.", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorDescriptionResponse" + } + } + } + } + } + } + }, + "/invocations/{invocation_id}/purge": { + "patch": { + "tags": [ + "invocation" + ], + "summary": "Purge an invocation", + "description": "Purge the given invocation. This cleanups all the state for the given invocation, including its journal. This command applies only to completed invocations.", + "operationId": "purge_invocation", + "parameters": [ + { + "name": "invocation_id", + "in": "path", + "description": "Invocation identifier.", + "required": true, + "schema": { + "type": "string" + } + }, + { + "name": "epoch", + "in": "query", + "description": "Remove the specific epoch. If not provided, epoch 0 will be removed. When removing the latest epoch, all the previous epochs will be cleaned up as well.", + "style": "simple", + "schema": { + "type": "integer", + "format": "uint32", + "minimum": 0.0 + } + } + ], + "responses": { + "200": { + "description": "" + }, + "404 Not Found": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorDescriptionResponse" + } + } + } + }, + "503 Service Unavailable": { + "description": "Error when routing the request within restate.", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorDescriptionResponse" + } + } + } + }, + "400 Bad Request": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorDescriptionResponse" + } + } + } + }, + "409 Conflict": { + "description": "The invocation is not yet completed. An invocation can be purged only when completed.", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorDescriptionResponse" + } + } + } + } + } + } + }, + "/invocations/{invocation_id}/purge-journal": { + "patch": { + "tags": [ + "invocation" + ], + "summary": "Purge an invocation journal", + "description": "Purge the given invocation journal. This cleanups only the journal for the given invocation, retaining the metadata. This command applies only to completed invocations.", + "operationId": "purge_journal", + "parameters": [ + { + "name": "invocation_id", + "in": "path", + "description": "Invocation identifier.", + "required": true, + "schema": { + "type": "string" + } + }, + { + "name": "epoch", + "in": "query", + "description": "Remove the specific epoch. If not provided, epoch 0 will be removed. When removing the latest epoch, all the previous epochs will be cleaned up as well.", + "style": "simple", + "schema": { + "type": "integer", + "format": "uint32", + "minimum": 0.0 + } + } + ], + "responses": { + "200": { + "description": "" + }, + "404 Not Found": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorDescriptionResponse" + } + } + } + }, + "503 Service Unavailable": { + "description": "Error when routing the request within restate.", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorDescriptionResponse" + } + } + } + }, + "400 Bad Request": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorDescriptionResponse" + } + } + } + }, + "409 Conflict": { + "description": "The invocation is not yet completed. An invocation can be purged only when completed.", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorDescriptionResponse" + } + } + } + } + } + } + }, + "/invocations/{invocation_id}/restart": { + "patch": { + "tags": [ + "invocation" + ], + "summary": "Restart an invocation", + "description": "Restart the given invocation. This will restart the invocation, given its input is available.", + "operationId": "restart_invocation", + "parameters": [ + { + "name": "invocation_id", + "in": "path", + "description": "Invocation identifier.", + "required": true, + "schema": { + "type": "string" + } + }, + { + "name": "if_running", + "in": "query", + "description": "What to do if the invocation is still running. By default, the running invocation will be killed.", + "style": "simple", + "schema": { + "$ref": "#/components/schemas/RestartInvocationIfRunning" + } + }, + { + "name": "previous_attempt_retention", + "in": "query", + "description": "If set, it will override the configured completion_retention/journal_retention when the invocation was executed the first time. If none of the completion_retention/journal_retention are configured, and neither this previous_attempt_retention, then the previous attempt won't be retained at all. Can be configured using humantime format or ISO8601.", + "style": "simple", + "schema": { + "type": "string" + } + }, + { + "name": "apply_to_workflow_run", + "in": "query", + "description": "What to do in case of restarting a workflow run. By default, clears all promises and state.", + "style": "simple", + "schema": { + "$ref": "#/components/schemas/RestartInvocationApplyToWorkflowRun" + } + } + ], + "responses": { + "200": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/RestartInvocationResponse" + } + } + } + }, + "404 Not Found": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorDescriptionResponse" + } + } + } + }, + "503 Service Unavailable": { + "description": "Error when routing the request within restate.", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorDescriptionResponse" + } + } + } + }, + "400 Bad Request": { "description": "", "content": { "application/json": { @@ -556,8 +983,8 @@ } } }, - "404": { - "description": "", + "409 Conflict": { + "description": "The invocation is still running. An invocation can be restarted only when completed, or if the query parameter if_running=kill is provided.", "content": { "application/json": { "schema": { @@ -566,8 +993,8 @@ } } }, - "409": { - "description": "", + "422 Unprocessable Entity": { + "description": "Restarting the invocation is not supported, because it was started using the old service protocol.", "content": { "application/json": { "schema": { @@ -576,8 +1003,8 @@ } } }, - "500": { - "description": "", + "410 Gone": { + "description": "The invocation cannot be restarted because the input is not available. In order to restart an invocation, the journal must be available in order to read the input again. Journal can be retained after completion by enabling journal retention.", "content": { "application/json": { "schema": { @@ -586,8 +1013,8 @@ } } }, - "503": { - "description": "", + "425 Too Early": { + "description": "The invocation cannot be restarted because it's not running yet, meaning it might have been scheduled or inboxed.", "content": { "application/json": { "schema": { @@ -1331,6 +1758,9 @@ ], "summary": "Create subscription", "description": "Create subscription.", + "externalDocs": { + "url": "https://docs.restate.dev/operate/invocation#managing-kafka-subscriptions" + }, "operationId": "create_subscription", "requestBody": { "content": { @@ -1690,28 +2120,175 @@ } }, "DeploymentResponse": { - "type": "object", - "required": [ - "id", - "services" - ], - "properties": { - "id": { - "$ref": "#/components/schemas/String" + "anyOf": [ + { + "title": "HttpDeploymentResponse", + "description": "Deployment response for HTTP deployments", + "type": "object", + "required": [ + "created_at", + "http_version", + "id", + "max_protocol_version", + "min_protocol_version", + "protocol_type", + "services", + "uri" + ], + "properties": { + "id": { + "title": "Deployment ID", + "allOf": [ + { + "$ref": "#/components/schemas/String" + } + ] + }, + "uri": { + "title": "Deployment URI", + "description": "URI used to invoke this service deployment.", + "type": "string" + }, + "protocol_type": { + "title": "Protocol Type", + "description": "Protocol type used to invoke this service deployment.", + "allOf": [ + { + "$ref": "#/components/schemas/ProtocolType" + } + ] + }, + "http_version": { + "title": "HTTP Version", + "description": "HTTP Version used to invoke this service deployment.", + "type": "string" + }, + "additional_headers": { + "title": "Additional headers", + "description": "Additional headers used to invoke this service deployment.", + "type": "object", + "additionalProperties": { + "type": "string" + } + }, + "created_at": { + "type": "string" + }, + "min_protocol_version": { + "title": "Minimum Service Protocol version", + "description": "During registration, the SDKs declare a range from minimum (included) to maximum (included) Service Protocol supported version.", + "type": "integer", + "format": "int32" + }, + "max_protocol_version": { + "title": "Maximum Service Protocol version", + "description": "During registration, the SDKs declare a range from minimum (included) to maximum (included) Service Protocol supported version.", + "type": "integer", + "format": "int32" + }, + "sdk_version": { + "title": "SDK version", + "description": "SDK library and version declared during registration.", + "type": "string", + "nullable": true + }, + "services": { + "title": "Services", + "description": "List of services exposed by this deployment.", + "type": "array", + "items": { + "$ref": "#/components/schemas/ServiceNameRevPair" + } + } + } }, - "services": { - "title": "Services", - "description": "List of services exposed by this deployment.", - "type": "array", - "items": { - "$ref": "#/components/schemas/ServiceNameRevPair" + { + "title": "LambdaDeploymentResponse", + "description": "Deployment response for Lambda deployments", + "type": "object", + "required": [ + "arn", + "created_at", + "id", + "max_protocol_version", + "min_protocol_version", + "services" + ], + "properties": { + "id": { + "title": "Deployment ID", + "allOf": [ + { + "$ref": "#/components/schemas/String" + } + ] + }, + "arn": { + "title": "Lambda ARN", + "description": "Lambda ARN used to invoke this service deployment.", + "allOf": [ + { + "$ref": "#/components/schemas/LambdaARN" + } + ] + }, + "assume_role_arn": { + "title": "Assume role ARN", + "description": "Assume role ARN used to invoke this deployment. Check https://docs.restate.dev/category/aws-lambda for more details.", + "type": "string", + "nullable": true + }, + "additional_headers": { + "title": "Additional headers", + "description": "Additional headers used to invoke this service deployment.", + "type": "object", + "additionalProperties": { + "type": "string" + } + }, + "created_at": { + "type": "string" + }, + "min_protocol_version": { + "title": "Minimum Service Protocol version", + "description": "During registration, the SDKs declare a range from minimum (included) to maximum (included) Service Protocol supported version.", + "type": "integer", + "format": "int32" + }, + "max_protocol_version": { + "title": "Maximum Service Protocol version", + "description": "During registration, the SDKs declare a range from minimum (included) to maximum (included) Service Protocol supported version.", + "type": "integer", + "format": "int32" + }, + "sdk_version": { + "title": "SDK version", + "description": "SDK library and version declared during registration.", + "type": "string", + "nullable": true + }, + "services": { + "title": "Services", + "description": "List of services exposed by this deployment.", + "type": "array", + "items": { + "$ref": "#/components/schemas/ServiceNameRevPair" + } + } } } - } + ] }, "String": { "type": "string" }, + "ProtocolType": { + "type": "string", + "enum": [ + "RequestResponse", + "BidiStream" + ] + }, "ServiceNameRevPair": { "type": "object", "required": [ @@ -1729,13 +2306,6 @@ } } }, - "ProtocolType": { - "type": "string", - "enum": [ - "RequestResponse", - "BidiStream" - ] - }, "LambdaARN": { "type": "string", "format": "arn" @@ -1743,6 +2313,8 @@ "RegisterDeploymentRequest": { "anyOf": [ { + "title": "RegisterHttpDeploymentRequest", + "description": "Register HTTP deployment request", "type": "object", "required": [ "uri" @@ -1783,6 +2355,8 @@ } }, { + "title": "RegisterLambdaDeploymentRequest", + "description": "Register Lambda deployment request", "type": "object", "required": [ "arn" @@ -1839,6 +2413,26 @@ "items": { "$ref": "#/components/schemas/ServiceMetadata" } + }, + "min_protocol_version": { + "title": "Minimum Service Protocol version", + "description": "During registration, the SDKs declare a range from minimum (included) to maximum (included) Service Protocol supported version.", + "default": 0, + "type": "integer", + "format": "int32" + }, + "max_protocol_version": { + "title": "Maximum Service Protocol version", + "description": "During registration, the SDKs declare a range from minimum (included) to maximum (included) Service Protocol supported version.", + "default": 0, + "type": "integer", + "format": "int32" + }, + "sdk_version": { + "title": "SDK version", + "description": "SDK library and version declared during registration.", + "type": "string", + "nullable": true } } }, @@ -1847,7 +2441,6 @@ "required": [ "deployment_id", "handlers", - "idempotency_retention", "name", "public", "revision", @@ -1902,7 +2495,8 @@ "idempotency_retention": { "title": "Idempotency retention", "description": "The retention duration of idempotent requests for this service.", - "type": "string" + "type": "string", + "nullable": true }, "workflow_completion_retention": { "title": "Workflow completion retention", @@ -1910,6 +2504,12 @@ "type": "string", "nullable": true }, + "journal_retention": { + "title": "Journal retention", + "description": "The journal retention. When set, this applies to all requests to all handlers of this service.\n\nIn case the request has an idempotency key, the `idempotency_retention` caps the maximum `journal_retention` time. In case the request targets a workflow handler, the `workflow_completion_retention` caps the maximum `journal_retention` time.", + "type": "string", + "nullable": true + }, "inactivity_timeout": { "title": "Inactivity timeout", "description": "This timer guards against stalled service/handler invocations. Once it expires, Restate triggers a graceful termination by asking the service invocation to suspend (which preserves intermediate progress).\n\nThe 'abort timeout' is used to abort the invocation, in case it doesn't react to the request to suspend.\n\nCan be configured using the [`humantime`](https://docs.rs/humantime/latest/humantime/fn.parse_duration.html) format.\n\nThis overrides the default inactivity timeout set in invoker options.", @@ -1921,6 +2521,12 @@ "description": "This timer guards against stalled service/handler invocations that are supposed to terminate. The abort timeout is started after the 'inactivity timeout' has expired and the service/handler invocation has been asked to gracefully terminate. Once the timer expires, it will abort the service/handler invocation.\n\nThis timer potentially **interrupts** user code. If the user code needs longer to gracefully terminate, then this value needs to be set accordingly.\n\nCan be configured using the [`humantime`](https://docs.rs/humantime/latest/humantime/fn.parse_duration.html) format.\n\nThis overrides the default abort timeout set in invoker options.", "type": "string", "nullable": true + }, + "enable_lazy_state": { + "title": "Enable lazy state", + "description": "If true, lazy state will be enabled for all invocations to this service. This is relevant only for Workflows and Virtual Objects.", + "type": "boolean", + "nullable": true } } }, @@ -1957,6 +2563,42 @@ "type": "string" } }, + "idempotency_retention": { + "title": "Idempotency retention", + "description": "The retention duration of idempotent requests for this service.", + "type": "string", + "nullable": true + }, + "workflow_completion_retention": { + "title": "Workflow completion retention", + "description": "The retention duration of workflows. Only available on workflow services.", + "type": "string", + "nullable": true + }, + "journal_retention": { + "title": "Journal retention", + "description": "The journal retention. When set, this applies to all requests to this handler.\n\nIn case the request has an idempotency key, the `idempotency_retention` caps the maximum `journal_retention` time. In case this handler is a workflow handler, the `workflow_completion_retention` caps the maximum `journal_retention` time.", + "type": "string", + "nullable": true + }, + "inactivity_timeout": { + "title": "Inactivity timeout", + "description": "This timer guards against stalled service/handler invocations. Once it expires, Restate triggers a graceful termination by asking the service invocation to suspend (which preserves intermediate progress).\n\nThe 'abort timeout' is used to abort the invocation, in case it doesn't react to the request to suspend.\n\nCan be configured using the [`humantime`](https://docs.rs/humantime/latest/humantime/fn.parse_duration.html) format.\n\nThis overrides the default inactivity timeout set in invoker options.", + "type": "string", + "nullable": true + }, + "abort_timeout": { + "title": "Abort timeout", + "description": "This timer guards against stalled service/handler invocations that are supposed to terminate. The abort timeout is started after the 'inactivity timeout' has expired and the service/handler invocation has been asked to gracefully terminate. Once the timer expires, it will abort the service/handler invocation.\n\nThis timer potentially **interrupts** user code. If the user code needs longer to gracefully terminate, then this value needs to be set accordingly.\n\nCan be configured using the [`humantime`](https://docs.rs/humantime/latest/humantime/fn.parse_duration.html) format.\n\nThis overrides the default abort timeout set in invoker options.", + "type": "string", + "nullable": true + }, + "enable_lazy_state": { + "title": "Enable lazy state", + "description": "If true, lazy state will be enabled for all invocations to this service. This is relevant only for Workflows and Virtual Objects.", + "type": "boolean", + "nullable": true + }, "input_description": { "title": "Human readable input description", "description": "If empty, no schema was provided by the user at discovery time.", @@ -1996,29 +2638,52 @@ ] }, "DetailedDeploymentResponse": { - "type": "object", "anyOf": [ { + "title": "HttpDetailedDeploymentResponse", + "description": "Detailed deployment response for HTTP deployments", "type": "object", "required": [ "created_at", "http_version", + "id", "max_protocol_version", "min_protocol_version", "protocol_type", + "services", "uri" ], "properties": { + "id": { + "title": "Deployment ID", + "allOf": [ + { + "$ref": "#/components/schemas/String" + } + ] + }, "uri": { + "title": "Deployment URI", + "description": "URI used to invoke this service deployment.", "type": "string" }, "protocol_type": { - "$ref": "#/components/schemas/ProtocolType" + "title": "Protocol Type", + "description": "Protocol type used to invoke this service deployment.", + "allOf": [ + { + "$ref": "#/components/schemas/ProtocolType" + } + ] }, "http_version": { + "title": "HTTP Version", + "description": "HTTP Version used to invoke this service deployment.", "type": "string" }, "additional_headers": { + "title": "Additional headers", + "description": "Additional headers used to invoke this service deployment.", "type": "object", "additionalProperties": { "type": "string" @@ -2028,32 +2693,72 @@ "type": "string" }, "min_protocol_version": { + "title": "Minimum Service Protocol version", + "description": "During registration, the SDKs declare a range from minimum (included) to maximum (included) Service Protocol supported version.", "type": "integer", "format": "int32" }, "max_protocol_version": { + "title": "Maximum Service Protocol version", + "description": "During registration, the SDKs declare a range from minimum (included) to maximum (included) Service Protocol supported version.", "type": "integer", "format": "int32" + }, + "sdk_version": { + "title": "SDK version", + "description": "SDK library and version declared during registration.", + "type": "string", + "nullable": true + }, + "services": { + "title": "Services", + "description": "List of services exposed by this deployment.", + "type": "array", + "items": { + "$ref": "#/components/schemas/ServiceMetadata" + } } } }, { + "title": "LambdaDetailedDeploymentResponse", + "description": "Detailed deployment response for Lambda deployments", "type": "object", "required": [ "arn", "created_at", + "id", "max_protocol_version", - "min_protocol_version" + "min_protocol_version", + "services" ], "properties": { + "id": { + "title": "Deployment ID", + "allOf": [ + { + "$ref": "#/components/schemas/String" + } + ] + }, "arn": { - "$ref": "#/components/schemas/LambdaARN" + "title": "Lambda ARN", + "description": "Lambda ARN used to invoke this service deployment.", + "allOf": [ + { + "$ref": "#/components/schemas/LambdaARN" + } + ] }, "assume_role_arn": { + "title": "Assume role ARN", + "description": "Assume role ARN used to invoke this deployment. Check https://docs.restate.dev/category/aws-lambda for more details.", "type": "string", "nullable": true }, "additional_headers": { + "title": "Additional headers", + "description": "Additional headers used to invoke this service deployment.", "type": "object", "additionalProperties": { "type": "string" @@ -2063,33 +2768,34 @@ "type": "string" }, "min_protocol_version": { + "title": "Minimum Service Protocol version", + "description": "During registration, the SDKs declare a range from minimum (included) to maximum (included) Service Protocol supported version.", "type": "integer", "format": "int32" }, "max_protocol_version": { + "title": "Maximum Service Protocol version", + "description": "During registration, the SDKs declare a range from minimum (included) to maximum (included) Service Protocol supported version.", "type": "integer", "format": "int32" + }, + "sdk_version": { + "title": "SDK version", + "description": "SDK library and version declared during registration.", + "type": "string", + "nullable": true + }, + "services": { + "title": "Services", + "description": "List of services exposed by this deployment.", + "type": "array", + "items": { + "$ref": "#/components/schemas/ServiceMetadata" + } } } } - ], - "required": [ - "id", - "services" - ], - "properties": { - "id": { - "$ref": "#/components/schemas/String" - }, - "services": { - "title": "Services", - "description": "List of services exposed by this deployment.", - "type": "array", - "items": { - "$ref": "#/components/schemas/ServiceMetadata" - } - } - } + ] }, "UpdateDeploymentRequest": { "anyOf": [ @@ -2171,6 +2877,71 @@ "Purge" ] }, + "RestartInvocationResponse": { + "type": "object", + "required": [ + "new_invocation_epoch" + ], + "properties": { + "new_invocation_epoch": { + "description": "The new invocation epoch of the invocation.", + "type": "integer", + "format": "uint32", + "minimum": 0.0 + } + } + }, + "RestartInvocationIfRunning": { + "description": "What to do if the invocation is still running. By default, the running invocation will be killed.", + "oneOf": [ + { + "description": "Kill the invocation, sending a failure to the waiting callers, then restart the invocation.", + "type": "string", + "enum": [ + "kill" + ] + }, + { + "description": "Fail the Restart operation if the invocation is still running.", + "type": "string", + "enum": [ + "fail" + ] + } + ] + }, + "RestartInvocationApplyToWorkflowRun": { + "description": "What to do in case of restarting a workflow run. By default, clears all promises and state.", + "oneOf": [ + { + "type": "string", + "enum": [ + "nothing" + ] + }, + { + "description": "Clear all the promises, retain the state", + "type": "string", + "enum": [ + "clear_all_promises" + ] + }, + { + "description": "Clear all the state, retain the promises", + "type": "string", + "enum": [ + "clear_all_state" + ] + }, + { + "description": "Clear all the promises and state", + "type": "string", + "enum": [ + "clear_all_promises_and_state" + ] + } + ] + }, "ListServicesResponse": { "type": "object", "required": [ @@ -2377,5 +3148,48 @@ } } } + }, + "tags": [ + { + "name": "deployment", + "description": "Service Deployment management" + }, + { + "name": "invocation", + "description": "Invocation management", + "externalDocs": { + "url": "https://docs.restate.dev/operate/invocation" + } + }, + { + "name": "subscription", + "description": "Subscription management", + "externalDocs": { + "url": "https://docs.restate.dev/operate/invocation#managing-kafka-subscriptions" + } + }, + { + "name": "service", + "description": "Service management" + }, + { + "name": "service_handler", + "description": "Service handlers metadata" + }, + { + "name": "cluster_health", + "description": "Cluster health" + }, + { + "name": "health", + "description": "Admin API health" + }, + { + "name": "version", + "description": "API Version" + } + ], + "externalDocs": { + "url": "https://docs.restate.dev/operate/" } }