From 57e5e684aca5e16907e866bdf35b5813e0543491 Mon Sep 17 00:00:00 2001 From: Javier Aliaga Date: Thu, 22 Jan 2026 15:00:06 +0100 Subject: [PATCH 01/10] feat: Add workflow versioning Signed-off-by: Javier Aliaga --- .../durabletask/DurableTaskGrpcWorker.java | 6 +- .../DurableTaskGrpcWorkerBuilder.java | 19 ++-- .../dapr/durabletask/OrchestrationRunner.java | 17 +++- .../OrchestrationRuntimeStatus.java | 9 +- .../durabletask/TaskOrchestrationContext.java | 9 ++ .../TaskOrchestrationExecutor.java | 93 +++++++++++++++++-- .../durabletask/TaskOrchestratorResult.java | 10 +- .../TaskOrchestrationFactories.java | 70 ++++++++++++++ .../TaskOrchestrationFactory.java | 8 +- .../VersionNotRegisteredException.java | 4 + .../dapr/durabletask/IntegrationTestBase.java | 11 +++ 11 files changed, 228 insertions(+), 28 deletions(-) create mode 100644 durabletask-client/src/main/java/io/dapr/durabletask/orchestration/TaskOrchestrationFactories.java rename durabletask-client/src/main/java/io/dapr/durabletask/{ => orchestration}/TaskOrchestrationFactory.java (87%) create mode 100644 durabletask-client/src/main/java/io/dapr/durabletask/orchestration/exception/VersionNotRegisteredException.java diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java index eb3be6bb9a..ebbcfa3f55 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java @@ -17,6 +17,7 @@ import io.dapr.durabletask.implementation.protobuf.OrchestratorService; import io.dapr.durabletask.implementation.protobuf.OrchestratorService.TaskFailureDetails; import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc; +import io.dapr.durabletask.orchestration.TaskOrchestrationFactories; import io.grpc.Channel; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; @@ -42,7 +43,8 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { private static final Logger logger = Logger.getLogger(DurableTaskGrpcWorker.class.getPackage().getName()); private static final Duration DEFAULT_MAXIMUM_TIMER_INTERVAL = Duration.ofDays(3); - private final HashMap orchestrationFactories = new HashMap<>(); + private final TaskOrchestrationFactories orchestrationFactories; + private final HashMap activityFactories = new HashMap<>(); private final ManagedChannel managedSidecarChannel; @@ -57,7 +59,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { private Thread workerThread; DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) { - this.orchestrationFactories.putAll(builder.orchestrationFactories); + this.orchestrationFactories = builder.orchestrationFactories; this.activityFactories.putAll(builder.activityFactories); this.appId = builder.appId; diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java index 0d3ebf2274..67a128f32d 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java @@ -13,6 +13,8 @@ package io.dapr.durabletask; +import io.dapr.durabletask.orchestration.TaskOrchestrationFactories; +import io.dapr.durabletask.orchestration.TaskOrchestrationFactory; import io.grpc.Channel; import java.time.Duration; @@ -24,7 +26,7 @@ * */ public final class DurableTaskGrpcWorkerBuilder { - final HashMap orchestrationFactories = new HashMap<>(); + TaskOrchestrationFactories orchestrationFactories = new TaskOrchestrationFactories(); final HashMap activityFactories = new HashMap<>(); int port; Channel channel; @@ -40,20 +42,11 @@ public final class DurableTaskGrpcWorkerBuilder { * @return this builder object */ public DurableTaskGrpcWorkerBuilder addOrchestration(TaskOrchestrationFactory factory) { - String key = factory.getName(); - if (key == null || key.length() == 0) { - throw new IllegalArgumentException("A non-empty task orchestration name is required."); - } - - if (this.orchestrationFactories.containsKey(key)) { - throw new IllegalArgumentException( - String.format("A task orchestration factory named %s is already registered.", key)); - } - - this.orchestrationFactories.put(key, factory); + this.orchestrationFactories.addOrchestration(factory); return this; } + /** * Adds an activity factory to be used by the constructed {@link DurableTaskGrpcWorker}. * @@ -161,4 +154,6 @@ public DurableTaskGrpcWorkerBuilder appId(String appId) { public DurableTaskGrpcWorker build() { return new DurableTaskGrpcWorker(this); } + + } diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationRunner.java b/durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationRunner.java index 22b2154608..bbb9814a86 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationRunner.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationRunner.java @@ -16,10 +16,11 @@ import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.StringValue; import io.dapr.durabletask.implementation.protobuf.OrchestratorService; +import io.dapr.durabletask.orchestration.TaskOrchestrationFactories; +import io.dapr.durabletask.orchestration.TaskOrchestrationFactory; import java.time.Duration; import java.util.Base64; -import java.util.HashMap; import java.util.logging.Logger; /** @@ -134,8 +135,8 @@ public static byte[] loadAndRun(byte[] orchestratorRequestBytes, TaskOrchestrati } // Register the passed orchestration as the default ("*") orchestration - HashMap orchestrationFactories = new HashMap<>(); - orchestrationFactories.put("*", new TaskOrchestrationFactory() { + TaskOrchestrationFactories orchestrationFactories = new TaskOrchestrationFactories(); + orchestrationFactories.addOrchestration(new TaskOrchestrationFactory() { @Override public String getName() { return "*"; @@ -145,6 +146,16 @@ public String getName() { public TaskOrchestration create() { return orchestration; } + + @Override + public String getVersionName() { + return ""; + } + + @Override + public Boolean isLatestVersion() { + return false; + } }); TaskOrchestrationExecutor taskOrchestrationExecutor = new TaskOrchestrationExecutor( diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationRuntimeStatus.java b/durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationRuntimeStatus.java index 1bdd33ab38..e9530ae815 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationRuntimeStatus.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationRuntimeStatus.java @@ -68,7 +68,12 @@ public enum OrchestrationRuntimeStatus { /** * The orchestration is in a suspended state. */ - SUSPENDED; + SUSPENDED, + + /** + * The orchestration is in a stalled state. + */ + STALLED; static OrchestrationRuntimeStatus fromProtobuf(OrchestratorService.OrchestrationStatus status) { switch (status) { @@ -88,6 +93,8 @@ static OrchestrationRuntimeStatus fromProtobuf(OrchestratorService.Orchestration return PENDING; case ORCHESTRATION_STATUS_SUSPENDED: return SUSPENDED; + case ORCHESTRATION_STATUS_STALLED: + return STALLED; default: throw new IllegalArgumentException(String.format("Unknown status value: %s", status)); } diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java index df0c95ec82..3d5d06027f 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java @@ -326,6 +326,15 @@ default void continueAsNew(Object input) { this.continueAsNew(input, true); } + /** + * Check if the given patch name can be applied to the orchestration. + * + * @param patchName The name of the patch to check. + * @return True if the given patch name can be applied to the orchestration, False otherwise. + */ + + boolean isPatched(String patchName); + /** * Restarts the orchestration with a new input and clears its history. * diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java index 7a3436b036..5d08dfef57 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java @@ -19,7 +19,11 @@ import io.dapr.durabletask.implementation.protobuf.OrchestratorService.ScheduleTaskAction.Builder; import io.dapr.durabletask.interruption.ContinueAsNewInterruption; import io.dapr.durabletask.interruption.OrchestratorBlockedException; +import io.dapr.durabletask.orchestration.TaskOrchestrationFactories; +import io.dapr.durabletask.orchestration.TaskOrchestrationFactory; +import io.dapr.durabletask.orchestration.exception.VersionNotRegisteredException; import io.dapr.durabletask.util.UuidGenerator; +import io.opentelemetry.api.internal.StringUtils; import javax.annotation.Nullable; import java.time.Duration; @@ -47,14 +51,14 @@ final class TaskOrchestrationExecutor { private static final String EMPTY_STRING = ""; - private final HashMap orchestrationFactories; + private final TaskOrchestrationFactories orchestrationFactories; private final DataConverter dataConverter; private final Logger logger; private final Duration maximumTimerInterval; private final String appId; public TaskOrchestrationExecutor( - HashMap orchestrationFactories, + TaskOrchestrationFactories orchestrationFactories, DataConverter dataConverter, Duration maximumTimerInterval, Logger logger, @@ -79,6 +83,9 @@ public TaskOrchestratorResult execute(List pas } completed = true; logger.finest("The orchestrator execution completed normally"); + } catch (VersionNotRegisteredException versionNotRegisteredException) { + logger.warning("The orchestrator version is not registered: " + versionNotRegisteredException.toString()); + context.setVersionNotRegistered(); } catch (OrchestratorBlockedException orchestratorBlockedException) { logger.fine("The orchestrator has yielded and will await for new events."); } catch (ContinueAsNewInterruption continueAsNewInterruption) { @@ -87,7 +94,7 @@ public TaskOrchestratorResult execute(List pas } catch (Exception e) { // The orchestrator threw an unhandled exception - fail it // TODO: What's the right way to log this? - logger.warning("The orchestrator failed with an unhandled exception: " + e.toString()); + logger.warning("The orchestrator failed with an unhandled exception: " + e); context.fail(new FailureDetails(e)); } @@ -97,12 +104,16 @@ public TaskOrchestratorResult execute(List pas context.complete(null); } - return new TaskOrchestratorResult(context.pendingActions.values(), context.getCustomStatus()); + return new TaskOrchestratorResult(context.pendingActions.values(), + context.getCustomStatus(), + context.version, + context.encounteredPatches); } private class ContextImplTask implements TaskOrchestrationContext { private String orchestratorName; + private final List encounteredPatches = new ArrayList<>(); private String rawInput; private String instanceId; private Instant currentInstant; @@ -127,6 +138,11 @@ private class ContextImplTask implements TaskOrchestrationContext { private Object continuedAsNewInput; private boolean preserveUnprocessedEvents; private Object customStatus; + private final Map appliedPatches = new HashMap<>(); + private final Map historyPatches = new HashMap<>(); + + private OrchestratorService.OrchestrationVersion orchestratorStartedVersion; + private String version; public ContextImplTask(List pastEvents, List newEvents) { @@ -144,6 +160,7 @@ private void setName(String name) { this.orchestratorName = name; } + private void setInput(String rawInput) { this.rawInput = rawInput; } @@ -363,6 +380,34 @@ public Task callActivity( return this.createAppropriateTask(taskFactory, options); } + @Override + public boolean isPatched(String patchName) { + var isPatched = this.checkPatch(patchName); + if (isPatched) { + this.encounteredPatches.add(patchName); + } + + return isPatched; + } + + public boolean checkPatch(String patchName) { + if (this.appliedPatches.containsKey(patchName)) { + return this.appliedPatches.get(patchName); + } + + if (this.historyPatches.containsKey(patchName)) { + this.appliedPatches.put(patchName, true); + return true; + } + + if (this.isReplaying) { + this.appliedPatches.put(patchName, false); + return false; + } + this.appliedPatches.put(patchName, true); + return true; + } + @Override public void continueAsNew(Object input, boolean preserveUnprocessedEvents) { Helpers.throwIfOrchestratorComplete(this.isComplete); @@ -924,6 +969,7 @@ private void processEvent(OrchestratorService.HistoryEvent e) { case ORCHESTRATORSTARTED: Instant instant = DataConverter.getInstantFromTimestamp(e.getTimestamp()); this.setCurrentInstant(instant); + this.orchestratorStartedVersion = e.getOrchestratorStarted().getVersion(); this.logger.fine(() -> this.instanceId + ": Workflow orchestrator started"); break; case ORCHESTRATORCOMPLETED: @@ -938,18 +984,32 @@ private void processEvent(OrchestratorService.HistoryEvent e) { this.logger.fine(() -> this.instanceId + ": Workflow execution started"); this.setAppId(e.getRouter().getSourceAppID()); + if (this.orchestratorStartedVersion != null + && this.orchestratorStartedVersion.getPatchesCount() > 0) { + for (var patch : this.orchestratorStartedVersion.getPatchesList()) { + this.historyPatches.put(patch, true); + } + } + + var versionName = ""; + if (this.orchestratorStartedVersion != null && !StringUtils.isNullOrEmpty(this.orchestratorStartedVersion.getName())) { + versionName = this.orchestratorStartedVersion.getName(); + } + // Create and invoke the workflow orchestrator - TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories - .get(executionStarted.getName()); + TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories.getOrchestrationFactory(executionStarted.getName(), versionName); + if (factory == null) { // Try getting the default orchestrator - factory = TaskOrchestrationExecutor.this.orchestrationFactories.get("*"); + factory = TaskOrchestrationExecutor.this.orchestrationFactories.getOrchestrationFactory("*"); } // TODO: Throw if the factory is null (orchestration by that name doesn't exist) if (factory == null) { throw new IllegalStateException("No factory found for orchestrator: " + executionStarted.getName()); } + this.version = factory.getVersionName(); + TaskOrchestration orchestrator = factory.create(); orchestrator.run(this); break; @@ -959,6 +1019,9 @@ private void processEvent(OrchestratorService.HistoryEvent e) { case EXECUTIONTERMINATED: this.handleExecutionTerminated(e); break; + case EXECUTIONSTALLED: + this.logger.fine(() -> this.instanceId + ": Workflow execution stalled"); + break; case TASKSCHEDULED: this.handleTaskScheduled(e); break; @@ -998,6 +1061,22 @@ private void processEvent(OrchestratorService.HistoryEvent e) { } } + public void setVersionNotRegistered() { + this.pendingActions.clear(); + + OrchestratorService.CompleteOrchestrationAction.Builder builder = OrchestratorService.CompleteOrchestrationAction + .newBuilder(); + builder.setOrchestrationStatus(OrchestratorService.OrchestrationStatus.ORCHESTRATION_STATUS_STALLED); + + int id = this.sequenceNumber++; + OrchestratorService.OrchestratorAction action = OrchestratorService.OrchestratorAction.newBuilder() + .setId(id) + .setCompleteOrchestration(builder.build()) + .build(); + this.pendingActions.put(id, action); + + } + private class TaskRecord { private final CompletableTask task; private final String taskName; diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestratorResult.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestratorResult.java index 705a41d5c0..9e9d5f35b3 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestratorResult.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestratorResult.java @@ -17,6 +17,7 @@ import java.util.Collection; import java.util.Collections; +import java.util.List; final class TaskOrchestratorResult { @@ -24,10 +25,15 @@ final class TaskOrchestratorResult { private final String customStatus; - public TaskOrchestratorResult(Collection actions, String customStatus) { + private final String version; + + private final List patches; + + public TaskOrchestratorResult(Collection actions, String customStatus, String version, List patches) { this.actions = Collections.unmodifiableCollection(actions); - ; this.customStatus = customStatus; + this.version = version; + this.patches = patches; } public Collection getActions() { diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/TaskOrchestrationFactories.java b/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/TaskOrchestrationFactories.java new file mode 100644 index 0000000000..4e9ea8f006 --- /dev/null +++ b/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/TaskOrchestrationFactories.java @@ -0,0 +1,70 @@ +package io.dapr.durabletask.orchestration; + +import io.dapr.durabletask.orchestration.exception.VersionNotRegisteredException; + +import java.util.HashMap; + +public class TaskOrchestrationFactories { + final HashMap orchestrationFactories = new HashMap<>(); + final HashMap> versionedOrchestrationFactories = new HashMap<>(); + final HashMap latestVersionOrchestrationFactories = new HashMap<>(); + + public void addOrchestration(TaskOrchestrationFactory factory) { + String key = factory.getName(); + if (this.emptyString(key)) { + throw new IllegalArgumentException("A non-empty task orchestration name is required."); + } + + if (this.orchestrationFactories.containsKey(key)) { + throw new IllegalArgumentException( + String.format("A task orchestration factory named %s is already registered.", key)); + } + + if (emptyString(factory.getVersionName())) { + this.orchestrationFactories.put(key, factory); + return; + } + + if (!this.versionedOrchestrationFactories.containsKey(key)) { + this.versionedOrchestrationFactories.put(key, new HashMap<>()); + } else { + if (this.versionedOrchestrationFactories.get(key).containsKey(factory.getVersionName())) { + throw new IllegalArgumentException("The version name " + factory.getVersionName() + "for " + + factory.getName() + " is already registered."); + } + this.versionedOrchestrationFactories.get(key).put(factory.getVersionName(), factory); + if (factory.isLatestVersion()) { + this.latestVersionOrchestrationFactories.put(key, factory.getVersionName()); + } + } + } + + public TaskOrchestrationFactory getOrchestrationFactory(String orchestrationName) { + if (this.orchestrationFactories.containsKey(orchestrationName)) { + return this.orchestrationFactories.get(orchestrationName); + } + + return this.getOrchestrationFactory(orchestrationName, ""); + } + + public TaskOrchestrationFactory getOrchestrationFactory(String orchestrationName, String versionName) { + if (!this.versionedOrchestrationFactories.containsKey(orchestrationName)) { + return null; + } + + if (this.emptyString(versionName)) { + String latestVersion = this.latestVersionOrchestrationFactories.get(orchestrationName); + return this.versionedOrchestrationFactories.get(orchestrationName).get(latestVersion); + } + + if (this.versionedOrchestrationFactories.get(orchestrationName).containsKey(versionName)) { + return this.versionedOrchestrationFactories.get(orchestrationName).get(versionName); + } + + throw new VersionNotRegisteredException(); + } + + private boolean emptyString(String s) { + return s == null || s.isEmpty(); + } +} diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationFactory.java b/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/TaskOrchestrationFactory.java similarity index 87% rename from durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationFactory.java rename to durabletask-client/src/main/java/io/dapr/durabletask/orchestration/TaskOrchestrationFactory.java index 274813b69f..a5e1b6a3cf 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationFactory.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/TaskOrchestrationFactory.java @@ -11,7 +11,9 @@ limitations under the License. */ -package io.dapr.durabletask; +package io.dapr.durabletask.orchestration; + +import io.dapr.durabletask.TaskOrchestration; /** * Factory interface for producing {@link TaskOrchestration} implementations. @@ -30,4 +32,8 @@ public interface TaskOrchestrationFactory { * @return the created orchestration instance */ TaskOrchestration create(); + + String getVersionName(); + + Boolean isLatestVersion(); } diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/exception/VersionNotRegisteredException.java b/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/exception/VersionNotRegisteredException.java new file mode 100644 index 0000000000..16e20c9287 --- /dev/null +++ b/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/exception/VersionNotRegisteredException.java @@ -0,0 +1,4 @@ +package io.dapr.durabletask.orchestration.exception; + +public class VersionNotRegisteredException extends RuntimeException { +} diff --git a/durabletask-client/src/test/java/io/dapr/durabletask/IntegrationTestBase.java b/durabletask-client/src/test/java/io/dapr/durabletask/IntegrationTestBase.java index bbfcde0469..d0a8a8faa6 100644 --- a/durabletask-client/src/test/java/io/dapr/durabletask/IntegrationTestBase.java +++ b/durabletask-client/src/test/java/io/dapr/durabletask/IntegrationTestBase.java @@ -13,6 +13,7 @@ package io.dapr.durabletask; +import io.dapr.durabletask.orchestration.TaskOrchestrationFactory; import org.junit.jupiter.api.AfterEach; import java.time.Duration; @@ -67,6 +68,16 @@ public String getName() { public TaskOrchestration create() { return implementation; } + + @Override + public String getVersionName() { + return ""; + } + + @Override + public Boolean isLatestVersion() { + return false; + } }); return this; } From 4a95a603bbe26c2686e1e96b97d0ca4e112bf034 Mon Sep 17 00:00:00 2001 From: Javier Aliaga Date: Thu, 22 Jan 2026 15:21:21 +0100 Subject: [PATCH 02/10] chore: Fix checkstyle issues Signed-off-by: Javier Aliaga --- .../dapr/durabletask/DurableTaskClient.java | 2 +- .../DurableTaskGrpcWorkerBuilder.java | 3 -- .../durabletask/OrchestrationMetadata.java | 2 +- .../main/java/io/dapr/durabletask/Task.java | 2 ++ .../dapr/durabletask/TaskActivityContext.java | 1 - .../dapr/durabletask/TaskFailedException.java | 1 + .../durabletask/TaskOrchestrationContext.java | 18 +++++------ .../TaskOrchestrationExecutor.java | 12 ++++--- .../durabletask/TaskOrchestratorResult.java | 3 +- .../TaskOrchestrationFactories.java | 31 +++++++++++++++++++ .../VersionNotRegisteredException.java | 13 ++++++++ 11 files changed, 67 insertions(+), 21 deletions(-) diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskClient.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskClient.java index 42a98dd556..7f87918038 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskClient.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskClient.java @@ -145,7 +145,7 @@ public void raiseEvent(String instanceId, String eventName) { * Waits for an orchestration to start running and returns an {@link OrchestrationMetadata} object that contains * metadata about the started instance. * - *

A "started" orchestration instance is any instance not in the Pending state.

+ *

A "started" orchestration instance is any instance not in the Pending state.

* *

If an orchestration instance is already running when this method is called, the method will return immediately. *

diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java index 67a128f32d..ad60577256 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java @@ -46,7 +46,6 @@ public DurableTaskGrpcWorkerBuilder addOrchestration(TaskOrchestrationFactory fa return this; } - /** * Adds an activity factory to be used by the constructed {@link DurableTaskGrpcWorker}. * @@ -154,6 +153,4 @@ public DurableTaskGrpcWorkerBuilder appId(String appId) { public DurableTaskGrpcWorker build() { return new DurableTaskGrpcWorker(this); } - - } diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationMetadata.java b/durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationMetadata.java index a0565ba634..7f9285d034 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationMetadata.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationMetadata.java @@ -226,7 +226,7 @@ public boolean isCustomStatusFetched() { private T readPayloadAs(Class type, String payload) { if (!this.requestedInputsAndOutputs) { throw new IllegalStateException("This method can only be used when instance metadata is fetched with the option " - + "to include input and output data."); + + "to include input and output data."); } // Note that the Java gRPC implementation converts null protobuf strings into empty Java strings diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/Task.java b/durabletask-client/src/main/java/io/dapr/durabletask/Task.java index a3f3313816..de2f13e871 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/Task.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/Task.java @@ -27,12 +27,14 @@ *
  * Task{@literal <}int{@literal >} activityTask = ctx.callActivity("MyActivity", someInput, int.class);
  * 
+ * *

Orchestrator code uses the {@link #await()} method to block on the completion of the task and retrieve the result. * If the task is not yet complete, the {@code await()} method will throw an {@link OrchestratorBlockedException}, which * pauses the orchestrator's execution so that it can save its progress into durable storage and schedule any * outstanding work. When the task is complete, the orchestrator will run again from the beginning and the next time * the task's {@code await()} method is called, the result will be returned, or a {@link TaskFailedException} will be * thrown if the result of the task was an unhandled exception.

+ * *

Note that orchestrator code must never catch {@code OrchestratorBlockedException} because doing so can cause the * orchestration instance to get permanently stuck.

* diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityContext.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityContext.java index b2043b51ee..7a0d1ed1ee 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityContext.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityContext.java @@ -34,7 +34,6 @@ public interface TaskActivityContext { */ T getInput(Class targetType); - /** * Gets the execution id of the current task activity. * diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskFailedException.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskFailedException.java index 377eecb426..5362e830c7 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskFailedException.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskFailedException.java @@ -16,6 +16,7 @@ /** * Exception that gets thrown when awaiting a {@link Task} for an activity or sub-orchestration that fails with an * unhandled exception. + * *

Detailed information associated with a particular task failure can be retrieved * using the {@link #getErrorDetails()} method.

*/ diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java index 3d5d06027f..97d851b47f 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java @@ -326,15 +326,6 @@ default void continueAsNew(Object input) { this.continueAsNew(input, true); } - /** - * Check if the given patch name can be applied to the orchestration. - * - * @param patchName The name of the patch to check. - * @return True if the given patch name can be applied to the orchestration, False otherwise. - */ - - boolean isPatched(String patchName); - /** * Restarts the orchestration with a new input and clears its history. * @@ -361,6 +352,15 @@ default void continueAsNew(Object input) { */ void continueAsNew(Object input, boolean preserveUnprocessedEvents); + /** + * Check if the given patch name can be applied to the orchestration. + * + * @param patchName The name of the patch to check. + * @return True if the given patch name can be applied to the orchestration, False otherwise. + */ + + boolean isPatched(String patchName); + /** * Create a new Uuid that is safe for replay within an orchestration or operation. * diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java index 5d08dfef57..822f254d73 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java @@ -160,7 +160,6 @@ private void setName(String name) { this.orchestratorName = name; } - private void setInput(String rawInput) { this.rawInput = rawInput; } @@ -483,7 +482,7 @@ public Task callSubOrchestrator( if (input instanceof TaskOptions) { throw new IllegalArgumentException("TaskOptions cannot be used as an input. " - + "Did you call the wrong method overload?"); + + "Did you call the wrong method overload?"); } String serializedInput = this.dataConverter.serialize(input); @@ -992,16 +991,19 @@ private void processEvent(OrchestratorService.HistoryEvent e) { } var versionName = ""; - if (this.orchestratorStartedVersion != null && !StringUtils.isNullOrEmpty(this.orchestratorStartedVersion.getName())) { + if (this.orchestratorStartedVersion != null + && !StringUtils.isNullOrEmpty(this.orchestratorStartedVersion.getName())) { versionName = this.orchestratorStartedVersion.getName(); } // Create and invoke the workflow orchestrator - TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories.getOrchestrationFactory(executionStarted.getName(), versionName); + TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories + .getOrchestrationFactory(executionStarted.getName(), versionName); if (factory == null) { // Try getting the default orchestrator - factory = TaskOrchestrationExecutor.this.orchestrationFactories.getOrchestrationFactory("*"); + factory = TaskOrchestrationExecutor.this.orchestrationFactories + .getOrchestrationFactory("*"); } // TODO: Throw if the factory is null (orchestration by that name doesn't exist) if (factory == null) { diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestratorResult.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestratorResult.java index 9e9d5f35b3..8243176031 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestratorResult.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestratorResult.java @@ -29,7 +29,8 @@ final class TaskOrchestratorResult { private final List patches; - public TaskOrchestratorResult(Collection actions, String customStatus, String version, List patches) { + public TaskOrchestratorResult(Collection actions, + String customStatus, String version, List patches) { this.actions = Collections.unmodifiableCollection(actions); this.customStatus = customStatus; this.version = version; diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/TaskOrchestrationFactories.java b/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/TaskOrchestrationFactories.java index 4e9ea8f006..7a5041c061 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/TaskOrchestrationFactories.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/TaskOrchestrationFactories.java @@ -1,3 +1,16 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + package io.dapr.durabletask.orchestration; import io.dapr.durabletask.orchestration.exception.VersionNotRegisteredException; @@ -9,6 +22,11 @@ public class TaskOrchestrationFactories { final HashMap> versionedOrchestrationFactories = new HashMap<>(); final HashMap latestVersionOrchestrationFactories = new HashMap<>(); + /** + * Adds a new orchestration factory to the registry. + * + * @param factory the factory to add + */ public void addOrchestration(TaskOrchestrationFactory factory) { String key = factory.getName(); if (this.emptyString(key)) { @@ -39,6 +57,12 @@ public void addOrchestration(TaskOrchestrationFactory factory) { } } + /** + * Gets the orchestration factory for the specified orchestration name. + * + * @param orchestrationName the orchestration name + * @return the orchestration factory + */ public TaskOrchestrationFactory getOrchestrationFactory(String orchestrationName) { if (this.orchestrationFactories.containsKey(orchestrationName)) { return this.orchestrationFactories.get(orchestrationName); @@ -47,6 +71,13 @@ public TaskOrchestrationFactory getOrchestrationFactory(String orchestrationName return this.getOrchestrationFactory(orchestrationName, ""); } + /** + * Gets the orchestration factory for the specified orchestration name and version. + * + * @param orchestrationName the orchestration name + * @param versionName the version name + * @return the orchestration factory + */ public TaskOrchestrationFactory getOrchestrationFactory(String orchestrationName, String versionName) { if (!this.versionedOrchestrationFactories.containsKey(orchestrationName)) { return null; diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/exception/VersionNotRegisteredException.java b/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/exception/VersionNotRegisteredException.java index 16e20c9287..f69ad9ea65 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/exception/VersionNotRegisteredException.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/exception/VersionNotRegisteredException.java @@ -1,3 +1,16 @@ +/* + * Copyright 2026 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + package io.dapr.durabletask.orchestration.exception; public class VersionNotRegisteredException extends RuntimeException { From e87a02c07809bde3a048889958bbd4c05c20f5bb Mon Sep 17 00:00:00 2001 From: Javier Aliaga Date: Thu, 22 Jan 2026 16:09:58 +0100 Subject: [PATCH 03/10] chore: Propagate changes to workflow-sdk Signed-off-by: Javier Aliaga --- durabletask-client/pom.xml | 4 ++ .../TaskOrchestrationExecutor.java | 4 +- sdk-workflows/pom.xml | 4 ++ .../io/dapr/workflows/WorkflowContext.java | 2 - .../runtime/WorkflowClassWrapper.java | 17 ++++++- .../runtime/WorkflowInstanceWrapper.java | 9 +++- .../runtime/WorkflowRuntimeBuilder.java | 50 +++++++++++++++++-- .../runtime/WorkflowVersionWrapper.java | 42 ++++++++++++++++ 8 files changed, 120 insertions(+), 12 deletions(-) create mode 100644 sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowVersionWrapper.java diff --git a/durabletask-client/pom.xml b/durabletask-client/pom.xml index f91647cb66..c16a50381f 100644 --- a/durabletask-client/pom.xml +++ b/durabletask-client/pom.xml @@ -59,6 +59,10 @@ com.fasterxml.jackson.datatype jackson-datatype-jsr310 + + org.apache.commons + commons-lang3 + io.grpc grpc-testing diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java index 822f254d73..ae239b15ca 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java @@ -23,7 +23,7 @@ import io.dapr.durabletask.orchestration.TaskOrchestrationFactory; import io.dapr.durabletask.orchestration.exception.VersionNotRegisteredException; import io.dapr.durabletask.util.UuidGenerator; -import io.opentelemetry.api.internal.StringUtils; +import org.apache.commons.lang3.StringUtils; import javax.annotation.Nullable; import java.time.Duration; @@ -992,7 +992,7 @@ private void processEvent(OrchestratorService.HistoryEvent e) { var versionName = ""; if (this.orchestratorStartedVersion != null - && !StringUtils.isNullOrEmpty(this.orchestratorStartedVersion.getName())) { + && !StringUtils.isEmpty(this.orchestratorStartedVersion.getName())) { versionName = this.orchestratorStartedVersion.getName(); } diff --git a/sdk-workflows/pom.xml b/sdk-workflows/pom.xml index 7fd95807f1..6346811f25 100644 --- a/sdk-workflows/pom.xml +++ b/sdk-workflows/pom.xml @@ -22,6 +22,10 @@ dapr-sdk ${project.parent.version} + + org.apache.commons + commons-lang3 + org.mockito mockito-core diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java index 8608e96937..ddd847b0a2 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java @@ -20,7 +20,6 @@ import org.slf4j.Logger; import javax.annotation.Nullable; - import java.time.Duration; import java.time.Instant; import java.time.ZonedDateTime; @@ -43,7 +42,6 @@ public interface WorkflowContext { */ Logger getLogger(); - /** * Gets the name of the current workflow. * diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowClassWrapper.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowClassWrapper.java index 73b6cc8168..13f0c65132 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowClassWrapper.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowClassWrapper.java @@ -14,7 +14,6 @@ package io.dapr.workflows.runtime; import io.dapr.durabletask.TaskOrchestration; -import io.dapr.durabletask.TaskOrchestrationFactory; import io.dapr.workflows.Workflow; import java.lang.reflect.Constructor; @@ -23,11 +22,25 @@ /** * Wrapper for Durable Task Framework orchestration factory. */ -class WorkflowClassWrapper implements TaskOrchestrationFactory { +class WorkflowClassWrapper extends WorkflowVersionWrapper { private final Constructor workflowConstructor; private final String name; public WorkflowClassWrapper(Class clazz) { + super(); + this.name = clazz.getCanonicalName(); + + try { + this.workflowConstructor = clazz.getDeclaredConstructor(); + } catch (NoSuchMethodException e) { + throw new RuntimeException( + String.format("No constructor found for workflow class '%s'.", this.name), e + ); + } + } + + public WorkflowClassWrapper(Class clazz, String versionName, Boolean isLatestVersion) { + super(versionName, isLatestVersion); this.name = clazz.getCanonicalName(); try { diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowInstanceWrapper.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowInstanceWrapper.java index 77a568a386..597d0e864f 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowInstanceWrapper.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowInstanceWrapper.java @@ -14,13 +14,12 @@ package io.dapr.workflows.runtime; import io.dapr.durabletask.TaskOrchestration; -import io.dapr.durabletask.TaskOrchestrationFactory; import io.dapr.workflows.Workflow; /** * Wrapper for Durable Task Framework orchestration factory. */ -class WorkflowInstanceWrapper implements TaskOrchestrationFactory { +class WorkflowInstanceWrapper extends WorkflowVersionWrapper { private final T workflow; private final String name; @@ -29,6 +28,12 @@ public WorkflowInstanceWrapper(T instance) { this.workflow = instance; } + public WorkflowInstanceWrapper(T instance, String versionName, Boolean isLatestVersion) { + super(versionName, isLatestVersion); + this.name = instance.getClass().getCanonicalName(); + this.workflow = instance; + } + @Override public String getName() { return name; diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilder.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilder.java index 12fe62860b..f0d3b1d94a 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilder.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilder.java @@ -21,6 +21,7 @@ import io.dapr.workflows.internal.ApiTokenClientInterceptor; import io.grpc.ClientInterceptor; import io.grpc.ManagedChannel; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,11 +114,31 @@ public WorkflowRuntimeBuilder withExecutorService(ExecutorService executorServic * @return the WorkflowRuntimeBuilder */ public WorkflowRuntimeBuilder registerWorkflow(Class clazz) { - this.builder.addOrchestration(new WorkflowClassWrapper<>(clazz)); + return this.registerWorkflow(clazz, "", false); + } + + /** + * Registers a Workflow object. + * + * @param any Workflow type + * @param clazz the class being registered + * @param versionName the version name of the workflow + * @param isLatestVersion whether the workflow is the latest version + * @return the WorkflowRuntimeBuilder + */ + public WorkflowRuntimeBuilder registerWorkflow(Class clazz, + String versionName, + Boolean isLatestVersion) { + this.builder.addOrchestration(new WorkflowClassWrapper<>(clazz, versionName, isLatestVersion)); this.workflowSet.add(clazz.getCanonicalName()); this.workflows.add(clazz.getSimpleName()); - this.logger.info("Registered Workflow: {}", clazz.getSimpleName()); + if (StringUtils.isEmpty(versionName)) { + this.logger.info("Registered Workflow: {}", clazz.getSimpleName()); + } else { + this.logger.info("Registered Workflow Version: {} {} - isLatest {}", + clazz.getSimpleName(), versionName, isLatestVersion); + } return this; } @@ -130,13 +151,34 @@ public WorkflowRuntimeBuilder registerWorkflow(Class cla * @return the WorkflowRuntimeBuilder */ public WorkflowRuntimeBuilder registerWorkflow(T instance) { + this.registerWorkflow(instance, "", false); + return this; + } + + /** + * Registers a Workflow object. + * + * @param any Workflow type + * @param instance the workflow instance being registered + * @param versionName the version name of the workflow + * @param isLatestVersion whether the workflow is the latest version + * @return the WorkflowRuntimeBuilder + */ + public WorkflowRuntimeBuilder registerWorkflow(T instance, + String versionName, + Boolean isLatestVersion) { Class clazz = (Class) instance.getClass(); - this.builder.addOrchestration(new WorkflowInstanceWrapper<>(instance)); + this.builder.addOrchestration(new WorkflowInstanceWrapper<>(instance, versionName, isLatestVersion)); this.workflowSet.add(clazz.getCanonicalName()); this.workflows.add(clazz.getSimpleName()); - this.logger.info("Registered Workflow: {}", clazz.getSimpleName()); + if (StringUtils.isEmpty(versionName)) { + this.logger.info("Registered Workflow: {}", clazz.getSimpleName()); + } else { + this.logger.info("Registered Workflow Version: {} {} - isLatest {}", + clazz.getSimpleName(), versionName, isLatestVersion); + } return this; } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowVersionWrapper.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowVersionWrapper.java new file mode 100644 index 0000000000..4683ebc4dd --- /dev/null +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowVersionWrapper.java @@ -0,0 +1,42 @@ +/* + * Copyright 2026 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows.runtime; + +import io.dapr.durabletask.orchestration.TaskOrchestrationFactory; + +public abstract class WorkflowVersionWrapper implements TaskOrchestrationFactory { + private final String versionName; + private final Boolean isLatestVersion; + + public WorkflowVersionWrapper() { + this.versionName = ""; + this.isLatestVersion = false; + } + + public WorkflowVersionWrapper(String versionName, Boolean isLatestVersion) { + this.versionName = versionName; + this.isLatestVersion = isLatestVersion; + } + + @Override + public String getVersionName() { + return versionName; + } + + @Override + public Boolean isLatestVersion() { + return isLatestVersion; + } + +} From 7aa7e2c07d7c2cf58418656d79dc98e20df3b9b1 Mon Sep 17 00:00:00 2001 From: Javier Aliaga Date: Thu, 22 Jan 2026 17:13:01 +0100 Subject: [PATCH 04/10] chore: Fix get orchestrator bug Signed-off-by: Javier Aliaga --- .../durabletask/orchestration/TaskOrchestrationFactories.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/TaskOrchestrationFactories.java b/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/TaskOrchestrationFactories.java index 7a5041c061..972cb838d4 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/TaskOrchestrationFactories.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/TaskOrchestrationFactories.java @@ -79,6 +79,10 @@ public TaskOrchestrationFactory getOrchestrationFactory(String orchestrationName * @return the orchestration factory */ public TaskOrchestrationFactory getOrchestrationFactory(String orchestrationName, String versionName) { + if (this.orchestrationFactories.containsKey(orchestrationName)) { + return this.orchestrationFactories.get(orchestrationName); + } + if (!this.versionedOrchestrationFactories.containsKey(orchestrationName)) { return null; } From e7e8a740ef8e3d2ec94ee8eacef6d239e8d11e4f Mon Sep 17 00:00:00 2001 From: Javier Aliaga Date: Fri, 23 Jan 2026 11:00:41 +0100 Subject: [PATCH 05/10] chore: Update spring boot version Signed-off-by: Javier Aliaga --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 37c6ecea7d..63596d0ed9 100644 --- a/pom.xml +++ b/pom.xml @@ -53,7 +53,7 @@ 2.0 1.21.3 - 3.4.9 + 3.4.13 6.2.7 1.7.0 From d0509e36e3e021e91e0d0235b653eb0b39478d85 Mon Sep 17 00:00:00 2001 From: Javier Aliaga Date: Fri, 23 Jan 2026 12:58:05 +0100 Subject: [PATCH 06/10] chore: Codecov Signed-off-by: Javier Aliaga --- .../runtime/WorkflowClassWrapperTest.java | 38 ++++++++++++++++++ .../runtime/WorkflowRuntimeBuilderTest.java | 26 ++++++++++-- .../runtime/WorkflowVersionWrapperTest.java | 40 +++++++++++++++++++ 3 files changed, 100 insertions(+), 4 deletions(-) create mode 100644 sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowVersionWrapperTest.java diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowClassWrapperTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowClassWrapperTest.java index fd76cadaf4..bc4ab0fb16 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowClassWrapperTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowClassWrapperTest.java @@ -18,8 +18,10 @@ import io.dapr.workflows.WorkflowContext; import io.dapr.workflows.WorkflowStub; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.function.Executable; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrowsExactly; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -33,6 +35,22 @@ public WorkflowStub create() { } } + public static abstract class TestErrorWorkflow implements Workflow { + public TestErrorWorkflow(String s){} + @Override + public WorkflowStub create() { + return WorkflowContext::getInstanceId; + } + } + + public static abstract class TestPrivateWorkflow implements Workflow { + private TestPrivateWorkflow(){} + @Override + public WorkflowStub create() { + return WorkflowContext::getInstanceId; + } + } + @Test public void getName() { WorkflowClassWrapper wrapper = new WorkflowClassWrapper<>(TestWorkflow.class); @@ -53,4 +71,24 @@ public void createWithClass() { verify(mockContext, times(1)).getInstanceId(); } + @Test + public void createWithClassAndVersion() { + TaskOrchestrationContext mockContext = mock(TaskOrchestrationContext.class); + WorkflowClassWrapper wrapper = new WorkflowClassWrapper<>(TestWorkflow.class, "TestWorkflowV1",false); + when(mockContext.getInstanceId()).thenReturn("uuid"); + wrapper.create().run(mockContext); + verify(mockContext, times(1)).getInstanceId(); + } + + @Test + public void createErrorClassAndVersion() { + assertThrowsExactly(RuntimeException.class, () -> new WorkflowClassWrapper<>(TestErrorWorkflow.class)); + assertThrowsExactly(RuntimeException.class, () -> new WorkflowClassWrapper<>(TestErrorWorkflow.class, "TestWorkflowV1",false)); + + WorkflowClassWrapper wrapper = new WorkflowClassWrapper<>(TestPrivateWorkflow.class, "TestWorkflowV1",false); + TaskOrchestrationContext mockContext = mock(TaskOrchestrationContext.class); + assertThrowsExactly(RuntimeException.class, () -> wrapper.create().run(mockContext)); + + } + } diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilderTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilderTest.java index 98ddffd53d..5d47368258 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilderTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilderTest.java @@ -47,11 +47,24 @@ public void registerValidWorkflowClass() { assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerWorkflow(TestWorkflow.class)); } + @Test + public void registerValidVersionWorkflowClass() { + assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerWorkflow(TestWorkflow.class,"testWorkflowV1", false)); + assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerWorkflow(TestWorkflow.class,"testWorkflowV2", true)); + } + @Test public void registerValidWorkflowInstance() { assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerWorkflow(new TestWorkflow())); } + @Test + public void registerValidVersionWorkflowInstance() { + assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerWorkflow(new TestWorkflow(),"testWorkflowV1", false)); + assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerWorkflow(new TestWorkflow(),"testWorkflowV2", true)); + } + + @Test public void registerValidWorkflowActivityClass() { assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerActivity(TestActivity.class)); @@ -62,12 +75,15 @@ public void registerValidWorkflowActivityInstance() { assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerActivity(new TestActivity())); } + + @Test public void buildTest() { assertDoesNotThrow(() -> { try { WorkflowRuntime runtime = new WorkflowRuntimeBuilder().build(); System.out.println("WorkflowRuntime created"); + runtime.close(); } catch (Exception e) { throw new RuntimeException(e); } @@ -82,16 +98,18 @@ public void loggingOutputTest() { Logger testLogger = mock(Logger.class); - assertDoesNotThrow(() -> new WorkflowRuntimeBuilder(testLogger).registerWorkflow(TestWorkflow.class)); - assertDoesNotThrow(() -> new WorkflowRuntimeBuilder(testLogger).registerActivity(TestActivity.class)); +var runtimeBuilder = new WorkflowRuntimeBuilder(testLogger); + assertDoesNotThrow(() -> runtimeBuilder.registerWorkflow(TestWorkflow.class)); + assertDoesNotThrow(() -> runtimeBuilder.registerActivity(TestActivity.class)); - WorkflowRuntimeBuilder workflowRuntimeBuilder = new WorkflowRuntimeBuilder(); + var runtime = runtimeBuilder.build(); - WorkflowRuntime runtime = workflowRuntimeBuilder.build(); verify(testLogger, times(1)) .info(eq("Registered Workflow: {}"), eq("TestWorkflow")); verify(testLogger, times(1)) .info(eq("Registered Activity: {}"), eq("TestActivity")); + + runtime.close(); } } diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowVersionWrapperTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowVersionWrapperTest.java new file mode 100644 index 0000000000..31cebd5efc --- /dev/null +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowVersionWrapperTest.java @@ -0,0 +1,40 @@ +/* + * Copyright 2026 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows.runtime; + +import io.dapr.durabletask.TaskOrchestration; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +class WorkflowVersionWrapperTest { + + @Test + void getVersionProperties() { + var versionWrapper = new WorkflowVersionWrapper("A",true) { + @Override + public String getName() { + return "demo"; + } + + @Override + public TaskOrchestration create() { + return null; + } + }; + + assertEquals("A",versionWrapper.getVersionName()); + assertEquals(true, versionWrapper.isLatestVersion()); + } +} \ No newline at end of file From 1d12cf64993b915ff1eab7a831a69b31b2ce6551 Mon Sep 17 00:00:00 2001 From: Javier Aliaga Date: Tue, 27 Jan 2026 14:29:38 +0100 Subject: [PATCH 07/10] wip: Versioning examples Signed-off-by: Javier Aliaga --- .../versioning/VersioningWorker.java | 105 ++++++++++++++++++ 1 file changed, 105 insertions(+) create mode 100644 examples/src/main/java/io/dapr/examples/workflows/versioning/VersioningWorker.java diff --git a/examples/src/main/java/io/dapr/examples/workflows/versioning/VersioningWorker.java b/examples/src/main/java/io/dapr/examples/workflows/versioning/VersioningWorker.java new file mode 100644 index 0000000000..d692556cab --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/versioning/VersioningWorker.java @@ -0,0 +1,105 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.versioning; + +import io.dapr.examples.workflows.utils.PropertyUtils; +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; +import io.dapr.workflows.runtime.WorkflowRuntime; +import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class VersioningWorker { + + + public static final String ACTIVITY_1 = "Activity1"; + public static final String ACTIVITY_2 = "Activity2"; + public static final String ACTIVITY_3 = "Activity3"; + public static final String ACTIVITY_4 = "Activity4"; + + + + /** + * The main method of this app. + * + * @param args The port the app will listen on. + * @throws Exception An Exception. + */ + public static void main(String[] args) throws Exception { + // Register the Workflow with the builder. + WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder(PropertyUtils.getProperties(args)).registerWorkflow(FullVersionWorkflowV1.class, "V1", true); + builder.registerActivity(ACTIVITY_1, (ctx -> { + System.out.println("Activity1 called."); + return ACTIVITY_1; + })); + builder.registerActivity(ACTIVITY_2, (ctx -> { + System.out.println("Activity2 called."); + return ACTIVITY_2; + })); + builder.registerActivity(ACTIVITY_3, (ctx -> { + System.out.println("Activity2 called."); + return ACTIVITY_3; + })); + builder.registerActivity(ACTIVITY_4, (ctx -> { + System.out.println("Activity2 called."); + return ACTIVITY_4; + })); + + // Build and then start the workflow runtime pulling and executing tasks + WorkflowRuntime runtime = builder.build(); + System.out.println("Start workflow runtime"); + ExecutorService executor = Executors.newSingleThreadExecutor(); + executor.submit(() -> runtime.start()); + Thread.sleep(10000); + executor.shutdown(); + builder.registerWorkflow(FullVersionWorkflowV2.class, "V2", true); + + var runtime2 = builder.build(); + runtime2.start(); + } + + private static class FullVersionWorkflowV1 implements Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting Workflow V1: " + ctx.getName()); + + String result = ""; + result += ctx.callActivity(VersioningWorker.ACTIVITY_1, String.class).await() +", "; + result += ctx.callActivity(VersioningWorker.ACTIVITY_2, String.class).await(); + + ctx.getLogger().info("Workflow finished with result: " + result); + ctx.complete(result); + }; + } + } + + private static class FullVersionWorkflowV2 implements Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting Workflow V1: " + ctx.getName()); + + String result = ""; + result += ctx.callActivity(VersioningWorker.ACTIVITY_3, String.class).await() +", "; + result += ctx.callActivity(VersioningWorker.ACTIVITY_4, String.class).await(); + + ctx.getLogger().info("Workflow finished with result: " + result); + ctx.complete(result); + }; + } + } +} From 41a8467daec2cc5055446de7e2fade906cd67fde Mon Sep 17 00:00:00 2001 From: Javier Aliaga Date: Thu, 29 Jan 2026 16:49:25 +0100 Subject: [PATCH 08/10] chore: Examples and register workkflow by name Signed-off-by: Javier Aliaga --- .../TaskOrchestrationFactories.java | 30 +++-- .../workflows/utils/PropertyUtils.java | 3 + .../examples/workflows/versioning/README.md | 4 + .../versioning/VersioningWorker.java | 105 ------------------ .../versioning/full/SendEventClient.java | 39 +++++++ .../versioning/full/VersioningClient.java | 38 +++++++ .../versioning/full/VersioningWorkerV1.java | 68 ++++++++++++ .../versioning/full/VersioningWorkerV2.java | 68 ++++++++++++ .../workflows/client/DaprWorkflowClient.java | 58 +++++++++- .../runtime/WorkflowClassWrapper.java | 4 +- .../runtime/WorkflowInstanceWrapper.java | 4 +- .../runtime/WorkflowRuntimeBuilder.java | 59 ++++++---- .../runtime/WorkflowClassWrapperTest.java | 6 +- .../runtime/WorkflowRuntimeBuilderTest.java | 8 +- 14 files changed, 343 insertions(+), 151 deletions(-) create mode 100644 examples/src/main/java/io/dapr/examples/workflows/versioning/README.md delete mode 100644 examples/src/main/java/io/dapr/examples/workflows/versioning/VersioningWorker.java create mode 100644 examples/src/main/java/io/dapr/examples/workflows/versioning/full/SendEventClient.java create mode 100644 examples/src/main/java/io/dapr/examples/workflows/versioning/full/VersioningClient.java create mode 100644 examples/src/main/java/io/dapr/examples/workflows/versioning/full/VersioningWorkerV1.java create mode 100644 examples/src/main/java/io/dapr/examples/workflows/versioning/full/VersioningWorkerV2.java diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/TaskOrchestrationFactories.java b/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/TaskOrchestrationFactories.java index 972cb838d4..c513b4f1f5 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/TaskOrchestrationFactories.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/TaskOrchestrationFactories.java @@ -16,8 +16,11 @@ import io.dapr.durabletask.orchestration.exception.VersionNotRegisteredException; import java.util.HashMap; +import java.util.logging.Logger; public class TaskOrchestrationFactories { + private static final Logger logger = Logger.getLogger(TaskOrchestrationFactories.class.getPackage().getName()); + final HashMap orchestrationFactories = new HashMap<>(); final HashMap> versionedOrchestrationFactories = new HashMap<>(); final HashMap latestVersionOrchestrationFactories = new HashMap<>(); @@ -45,16 +48,20 @@ public void addOrchestration(TaskOrchestrationFactory factory) { if (!this.versionedOrchestrationFactories.containsKey(key)) { this.versionedOrchestrationFactories.put(key, new HashMap<>()); - } else { - if (this.versionedOrchestrationFactories.get(key).containsKey(factory.getVersionName())) { - throw new IllegalArgumentException("The version name " + factory.getVersionName() + "for " - + factory.getName() + " is already registered."); - } - this.versionedOrchestrationFactories.get(key).put(factory.getVersionName(), factory); - if (factory.isLatestVersion()) { - this.latestVersionOrchestrationFactories.put(key, factory.getVersionName()); - } } + + if (this.versionedOrchestrationFactories.get(key).containsKey(factory.getVersionName())) { + throw new IllegalArgumentException("The version name " + factory.getVersionName() + "for " + + factory.getName() + " is already registered."); + } + + this.versionedOrchestrationFactories.get(key).put(factory.getVersionName(), factory); + + if (factory.isLatestVersion()) { + logger.info("Setting latest version for " + key + " to " + factory.getVersionName()); + this.latestVersionOrchestrationFactories.put(key, factory.getVersionName()); + } + } /** @@ -64,6 +71,7 @@ public void addOrchestration(TaskOrchestrationFactory factory) { * @return the orchestration factory */ public TaskOrchestrationFactory getOrchestrationFactory(String orchestrationName) { + logger.info("Get orchestration factory for " + orchestrationName); if (this.orchestrationFactories.containsKey(orchestrationName)) { return this.orchestrationFactories.get(orchestrationName); } @@ -79,16 +87,20 @@ public TaskOrchestrationFactory getOrchestrationFactory(String orchestrationName * @return the orchestration factory */ public TaskOrchestrationFactory getOrchestrationFactory(String orchestrationName, String versionName) { + logger.info("Get orchestration factory for " + orchestrationName + " version " + versionName); if (this.orchestrationFactories.containsKey(orchestrationName)) { return this.orchestrationFactories.get(orchestrationName); } if (!this.versionedOrchestrationFactories.containsKey(orchestrationName)) { + logger.warning("No orchestration factory registered for " + orchestrationName); return null; } if (this.emptyString(versionName)) { + logger.info("No version specified, returning latest version"); String latestVersion = this.latestVersionOrchestrationFactories.get(orchestrationName); + logger.info("Latest version is " + latestVersion); return this.versionedOrchestrationFactories.get(orchestrationName).get(latestVersion); } diff --git a/examples/src/main/java/io/dapr/examples/workflows/utils/PropertyUtils.java b/examples/src/main/java/io/dapr/examples/workflows/utils/PropertyUtils.java index 9d64e45d36..7ea94e2f45 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/utils/PropertyUtils.java +++ b/examples/src/main/java/io/dapr/examples/workflows/utils/PropertyUtils.java @@ -25,8 +25,11 @@ public static Properties getProperties(String[] args) { properties = new Properties(new HashMap<>() {{ put(Properties.GRPC_PORT, args[0]); }}); + } return properties; } + + } diff --git a/examples/src/main/java/io/dapr/examples/workflows/versioning/README.md b/examples/src/main/java/io/dapr/examples/workflows/versioning/README.md new file mode 100644 index 0000000000..c4137fcbd8 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/versioning/README.md @@ -0,0 +1,4 @@ +dapr run --app-id workerV1 --resources-path ./components/workflows --dapr-grpc-port 50002 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.versioning.full.VersioningWorkerV1 50002 + +java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.versioning.full.VersioningClient 50002 +dapr stop --app-id workerV1 \ No newline at end of file diff --git a/examples/src/main/java/io/dapr/examples/workflows/versioning/VersioningWorker.java b/examples/src/main/java/io/dapr/examples/workflows/versioning/VersioningWorker.java deleted file mode 100644 index d692556cab..0000000000 --- a/examples/src/main/java/io/dapr/examples/workflows/versioning/VersioningWorker.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Copyright 2023 The Dapr Authors - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and -limitations under the License. -*/ - -package io.dapr.examples.workflows.versioning; - -import io.dapr.examples.workflows.utils.PropertyUtils; -import io.dapr.workflows.Workflow; -import io.dapr.workflows.WorkflowStub; -import io.dapr.workflows.runtime.WorkflowRuntime; -import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -public class VersioningWorker { - - - public static final String ACTIVITY_1 = "Activity1"; - public static final String ACTIVITY_2 = "Activity2"; - public static final String ACTIVITY_3 = "Activity3"; - public static final String ACTIVITY_4 = "Activity4"; - - - - /** - * The main method of this app. - * - * @param args The port the app will listen on. - * @throws Exception An Exception. - */ - public static void main(String[] args) throws Exception { - // Register the Workflow with the builder. - WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder(PropertyUtils.getProperties(args)).registerWorkflow(FullVersionWorkflowV1.class, "V1", true); - builder.registerActivity(ACTIVITY_1, (ctx -> { - System.out.println("Activity1 called."); - return ACTIVITY_1; - })); - builder.registerActivity(ACTIVITY_2, (ctx -> { - System.out.println("Activity2 called."); - return ACTIVITY_2; - })); - builder.registerActivity(ACTIVITY_3, (ctx -> { - System.out.println("Activity2 called."); - return ACTIVITY_3; - })); - builder.registerActivity(ACTIVITY_4, (ctx -> { - System.out.println("Activity2 called."); - return ACTIVITY_4; - })); - - // Build and then start the workflow runtime pulling and executing tasks - WorkflowRuntime runtime = builder.build(); - System.out.println("Start workflow runtime"); - ExecutorService executor = Executors.newSingleThreadExecutor(); - executor.submit(() -> runtime.start()); - Thread.sleep(10000); - executor.shutdown(); - builder.registerWorkflow(FullVersionWorkflowV2.class, "V2", true); - - var runtime2 = builder.build(); - runtime2.start(); - } - - private static class FullVersionWorkflowV1 implements Workflow { - @Override - public WorkflowStub create() { - return ctx -> { - ctx.getLogger().info("Starting Workflow V1: " + ctx.getName()); - - String result = ""; - result += ctx.callActivity(VersioningWorker.ACTIVITY_1, String.class).await() +", "; - result += ctx.callActivity(VersioningWorker.ACTIVITY_2, String.class).await(); - - ctx.getLogger().info("Workflow finished with result: " + result); - ctx.complete(result); - }; - } - } - - private static class FullVersionWorkflowV2 implements Workflow { - @Override - public WorkflowStub create() { - return ctx -> { - ctx.getLogger().info("Starting Workflow V1: " + ctx.getName()); - - String result = ""; - result += ctx.callActivity(VersioningWorker.ACTIVITY_3, String.class).await() +", "; - result += ctx.callActivity(VersioningWorker.ACTIVITY_4, String.class).await(); - - ctx.getLogger().info("Workflow finished with result: " + result); - ctx.complete(result); - }; - } - } -} diff --git a/examples/src/main/java/io/dapr/examples/workflows/versioning/full/SendEventClient.java b/examples/src/main/java/io/dapr/examples/workflows/versioning/full/SendEventClient.java new file mode 100644 index 0000000000..3027646ee2 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/versioning/full/SendEventClient.java @@ -0,0 +1,39 @@ +package io.dapr.examples.workflows.versioning.full; + +import io.dapr.examples.workflows.utils.PropertyUtils; +import io.dapr.examples.workflows.utils.RetryUtils; +import io.dapr.workflows.client.DaprWorkflowClient; +import io.dapr.workflows.client.WorkflowState; + +import java.time.Duration; +import java.util.concurrent.TimeoutException; + +public class SendEventClient { + /** + * The main method to start the client. + * + * @param args Input arguments (unused). + * @throws InterruptedException If program has been interrupted. + */ + public static void main(String[] args) throws InterruptedException { + try (DaprWorkflowClient client = new DaprWorkflowClient(PropertyUtils.getProperties(args))) { + // Schedule an orchestration which will reliably count the number of words in all the given sentences. + String instanceId = RetryUtils.callWithRetry(() -> client.scheduleNewWorkflow( + VersioningWorkerV1.FullVersionWorkflowV1.class), Duration.ofSeconds(60)); + + System.out.printf("Started a new V1 workflow with instance ID: %s%n", instanceId); + + // Block until the orchestration completes. Then print the final status, which includes the output. + WorkflowState workflowState = client.waitForWorkflowCompletion( + instanceId, + Duration.ofSeconds(30), + true); + System.out.printf("workflow instance with ID: %s completed with result: %s%n", instanceId, + workflowState.readOutputAs(String.class)); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } + } +} + + diff --git a/examples/src/main/java/io/dapr/examples/workflows/versioning/full/VersioningClient.java b/examples/src/main/java/io/dapr/examples/workflows/versioning/full/VersioningClient.java new file mode 100644 index 0000000000..fd0d107f46 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/versioning/full/VersioningClient.java @@ -0,0 +1,38 @@ +package io.dapr.examples.workflows.versioning.full; + +import io.dapr.examples.workflows.utils.PropertyUtils; +import io.dapr.examples.workflows.utils.RetryUtils; +import io.dapr.workflows.client.DaprWorkflowClient; +import io.dapr.workflows.client.WorkflowState; + +import java.time.Duration; +import java.util.concurrent.TimeoutException; + +public class VersioningClient { + /** + * The main method to start the client. + * + * @param args Input arguments (unused). + * @throws InterruptedException If program has been interrupted. + */ + public static void main(String[] args) throws InterruptedException { + try (DaprWorkflowClient client = new DaprWorkflowClient(PropertyUtils.getProperties(args))) { + // Schedule an orchestration which will reliably count the number of words in all the given sentences. + String instanceId = RetryUtils.callWithRetry(() -> client.scheduleNewWorkflow("VersioningWorker"), Duration.ofSeconds(60)); + + System.out.printf("Started a new V1 workflow with instance ID: %s%n", instanceId); + + // Block until the orchestration completes. Then print the final status, which includes the output. + WorkflowState workflowState = client.waitForWorkflowCompletion( + instanceId, + Duration.ofSeconds(30), + true); + System.out.printf("workflow instance with ID: %s completed with result: %s%n", instanceId, + workflowState.readOutputAs(String.class)); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } + } +} + + diff --git a/examples/src/main/java/io/dapr/examples/workflows/versioning/full/VersioningWorkerV1.java b/examples/src/main/java/io/dapr/examples/workflows/versioning/full/VersioningWorkerV1.java new file mode 100644 index 0000000000..ede78f5425 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/versioning/full/VersioningWorkerV1.java @@ -0,0 +1,68 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.versioning.full; + +import io.dapr.examples.workflows.utils.PropertyUtils; +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; +import io.dapr.workflows.runtime.WorkflowRuntime; +import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; + +public class VersioningWorkerV1 { + + + public static final String ACTIVITY_1 = "Activity1"; + public static final String ACTIVITY_2 = "Activity2"; + + /** + * The main method of this app. + * + * @param args The port the app will listen on. + * @throws Exception An Exception. + */ + public static void main(String[] args) throws Exception { + // Register the Workflow with the builder. + WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder(PropertyUtils.getProperties(args)).registerWorkflow("VersioningWorker", VersioningWorkerV1.FullVersionWorkflowV1.class, "V1", true); + builder.registerActivity(ACTIVITY_1, (ctx -> { + System.out.println("Activity1 called."); + return ACTIVITY_1; + })); + builder.registerActivity(ACTIVITY_2, (ctx -> { + System.out.println("Activity2 called."); + return ACTIVITY_2; + })); + + // Build and then start the workflow runtime pulling and executing tasks + WorkflowRuntime runtime = builder.build(); + runtime.start(); + } + + public static class FullVersionWorkflowV1 implements Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting Workflow V1: {}", ctx.getName()); + + String result = ""; + result += ctx.callActivity(VersioningWorkerV1.ACTIVITY_1, String.class).await() +", "; + ctx.waitForExternalEvent("test").await(); + result += ctx.callActivity(VersioningWorkerV1.ACTIVITY_2, String.class).await(); + + ctx.getLogger().info("Workflow finished with result: {}", result); + ctx.complete(result); + }; + } + } + +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/versioning/full/VersioningWorkerV2.java b/examples/src/main/java/io/dapr/examples/workflows/versioning/full/VersioningWorkerV2.java new file mode 100644 index 0000000000..e2849c07e7 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/versioning/full/VersioningWorkerV2.java @@ -0,0 +1,68 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.versioning.full; + +import io.dapr.examples.workflows.utils.PropertyUtils; +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; +import io.dapr.workflows.runtime.WorkflowRuntime; +import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; + +public class VersioningWorkerV2 { + + public static final String ACTIVITY_3 = "Activity3"; + public static final String ACTIVITY_4 = "Activity4"; + + /** + * The main method of this app. + * + * @param args The port the app will listen on. + * @throws Exception An Exception. + */ + public static void main(String[] args) throws Exception { + // Register the Workflow with the builder. + WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder(PropertyUtils.getProperties(args)) + .registerWorkflow("VersioningWorker", VersioningWorkerV1.FullVersionWorkflowV1.class, "V1", true) + .registerWorkflow("VersioningWorker", FullVersionWorkflowV2.class, "V2", true); + builder.registerActivity(ACTIVITY_3, (ctx -> { + System.out.println("Activity3 called."); + return ACTIVITY_3; + })); + builder.registerActivity(ACTIVITY_4, (ctx -> { + System.out.println("Activity4 called."); + return ACTIVITY_4; + })); + + // Build and then start the workflow runtime pulling and executing tasks + WorkflowRuntime runtime = builder.build(); + runtime.start(); + } + + + public static class FullVersionWorkflowV2 implements Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting Workflow V2: {}", ctx.getName()); + + String result = ""; + result += ctx.callActivity(VersioningWorkerV2.ACTIVITY_3, String.class).await() +", "; + result += ctx.callActivity(VersioningWorkerV2.ACTIVITY_4, String.class).await(); + + ctx.getLogger().info("Workflow finished with result: {}", result); + ctx.complete(result); + }; + } + } +} diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java b/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java index 79725c0209..09a2b1c8b4 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java @@ -86,7 +86,18 @@ private DaprWorkflowClient(DurableTaskClient innerClient, ManagedChannel grpcCha * @return the randomly-generated instance ID for new Workflow instance. */ public String scheduleNewWorkflow(Class clazz) { - return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName()); + return this.scheduleNewWorkflow(clazz.getCanonicalName()); + } + + /** + * Schedules a new workflow using DurableTask client. + * + * @param any Workflow type + * @param name Workflow name + * @return the randomly-generated instance ID for new Workflow instance. + */ + public String scheduleNewWorkflow(String name) { + return this.innerClient.scheduleNewOrchestrationInstance(name); } /** @@ -98,7 +109,19 @@ public String scheduleNewWorkflow(Class clazz) { * @return the randomly-generated instance ID for new Workflow instance. */ public String scheduleNewWorkflow(Class clazz, Object input) { - return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName(), input); + return this.scheduleNewWorkflow(clazz.getCanonicalName(), input); + } + + /** + * Schedules a new workflow using DurableTask client. + * + * @param any Workflow type + * @param name Workflow name + * @param input the input to pass to the scheduled orchestration instance. Must be serializable. + * @return the randomly-generated instance ID for new Workflow instance. + */ + public String scheduleNewWorkflow(String name, Object input) { + return this.innerClient.scheduleNewOrchestrationInstance(name, input); } /** @@ -111,7 +134,20 @@ public String scheduleNewWorkflow(Class clazz, Object in * @return the instanceId parameter value. */ public String scheduleNewWorkflow(Class clazz, Object input, String instanceId) { - return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName(), input, instanceId); + return this.scheduleNewWorkflow(clazz.getCanonicalName(), input, instanceId); + } + + /** + * Schedules a new workflow using DurableTask client. + * + * @param any Workflow type + * @param name Workflow name + * @param input the input to pass to the scheduled orchestration instance. Must be serializable. + * @param instanceId the unique ID of the orchestration instance to schedule + * @return the instanceId parameter value. + */ + public String scheduleNewWorkflow(String name, Object input, String instanceId) { + return this.innerClient.scheduleNewOrchestrationInstance(name, input, instanceId); } /** @@ -125,7 +161,21 @@ public String scheduleNewWorkflow(Class clazz, Object in public String scheduleNewWorkflow(Class clazz, NewWorkflowOptions options) { NewOrchestrationInstanceOptions orchestrationInstanceOptions = fromNewWorkflowOptions(options); - return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName(), + return this.scheduleNewWorkflow(clazz.getCanonicalName(), orchestrationInstanceOptions); + } + + /** + * Schedules a new workflow with a specified set of options for execution. + * + * @param any Workflow type + * @param name Class extending Workflow to start an instance of. + * @param options the options for the new workflow, including input, instance ID, etc. + * @return the instanceId parameter value. + */ + public String scheduleNewWorkflow(String name, NewWorkflowOptions options) { + NewOrchestrationInstanceOptions orchestrationInstanceOptions = fromNewWorkflowOptions(options); + + return this.innerClient.scheduleNewOrchestrationInstance(name, orchestrationInstanceOptions); } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowClassWrapper.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowClassWrapper.java index 13f0c65132..8ac3789f98 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowClassWrapper.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowClassWrapper.java @@ -39,9 +39,9 @@ public WorkflowClassWrapper(Class clazz) { } } - public WorkflowClassWrapper(Class clazz, String versionName, Boolean isLatestVersion) { + public WorkflowClassWrapper(String name, Class clazz, String versionName, Boolean isLatestVersion) { super(versionName, isLatestVersion); - this.name = clazz.getCanonicalName(); + this.name = name; try { this.workflowConstructor = clazz.getDeclaredConstructor(); diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowInstanceWrapper.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowInstanceWrapper.java index 597d0e864f..4dbd766246 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowInstanceWrapper.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowInstanceWrapper.java @@ -28,9 +28,9 @@ public WorkflowInstanceWrapper(T instance) { this.workflow = instance; } - public WorkflowInstanceWrapper(T instance, String versionName, Boolean isLatestVersion) { + public WorkflowInstanceWrapper(String name, T instance, String versionName, Boolean isLatestVersion) { super(versionName, isLatestVersion); - this.name = instance.getClass().getCanonicalName(); + this.name = name; this.workflow = instance; } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilder.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilder.java index f0d3b1d94a..cec61aeb71 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilder.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilder.java @@ -81,8 +81,8 @@ public WorkflowRuntime build() { this.executorService = this.executorService == null ? Executors.newCachedThreadPool() : this.executorService; if (instance == null) { instance = new WorkflowRuntime( - this.builder.withExecutorService(this.executorService).build(), - this.managedChannel, this.executorService); + this.builder.withExecutorService(this.executorService).build(), + this.managedChannel, this.executorService); } } } @@ -114,24 +114,38 @@ public WorkflowRuntimeBuilder withExecutorService(ExecutorService executorServic * @return the WorkflowRuntimeBuilder */ public WorkflowRuntimeBuilder registerWorkflow(Class clazz) { - return this.registerWorkflow(clazz, "", false); + return this.registerWorkflow(clazz.getCanonicalName(), clazz, "", false); + } + + /** + * Registers a Workflow object. + * + * @param any Workflow type + * @param name the name of the workflow to register + * @param clazz the class being registered + * @return the WorkflowRuntimeBuilder + */ + public WorkflowRuntimeBuilder registerWorkflow(String name, Class clazz) { + return this.registerWorkflow(name, clazz, "", false); } /** * Registers a Workflow object. * * @param any Workflow type + * @param name the name of the workflow to register * @param clazz the class being registered * @param versionName the version name of the workflow * @param isLatestVersion whether the workflow is the latest version * @return the WorkflowRuntimeBuilder */ - public WorkflowRuntimeBuilder registerWorkflow(Class clazz, + public WorkflowRuntimeBuilder registerWorkflow(String name, + Class clazz, String versionName, Boolean isLatestVersion) { - this.builder.addOrchestration(new WorkflowClassWrapper<>(clazz, versionName, isLatestVersion)); - this.workflowSet.add(clazz.getCanonicalName()); - this.workflows.add(clazz.getSimpleName()); + this.builder.addOrchestration(new WorkflowClassWrapper<>(name, clazz, versionName, isLatestVersion)); + this.workflowSet.add(name); + this.workflows.add(name); if (StringUtils.isEmpty(versionName)) { this.logger.info("Registered Workflow: {}", clazz.getSimpleName()); @@ -146,12 +160,13 @@ public WorkflowRuntimeBuilder registerWorkflow(Class cla /** * Registers a Workflow object. * - * @param any Workflow type + * @param any Workflow type * @param instance the workflow instance being registered * @return the WorkflowRuntimeBuilder */ public WorkflowRuntimeBuilder registerWorkflow(T instance) { - this.registerWorkflow(instance, "", false); + Class clazz = (Class) instance.getClass(); + this.registerWorkflow(clazz.getCanonicalName(), instance, "", false); return this; } @@ -159,25 +174,25 @@ public WorkflowRuntimeBuilder registerWorkflow(T instance) * Registers a Workflow object. * * @param any Workflow type + * @param name the name of the workflow to register * @param instance the workflow instance being registered * @param versionName the version name of the workflow * @param isLatestVersion whether the workflow is the latest version * @return the WorkflowRuntimeBuilder */ - public WorkflowRuntimeBuilder registerWorkflow(T instance, + public WorkflowRuntimeBuilder registerWorkflow(String name, + T instance, String versionName, Boolean isLatestVersion) { - Class clazz = (Class) instance.getClass(); - - this.builder.addOrchestration(new WorkflowInstanceWrapper<>(instance, versionName, isLatestVersion)); - this.workflowSet.add(clazz.getCanonicalName()); - this.workflows.add(clazz.getSimpleName()); + this.builder.addOrchestration(new WorkflowInstanceWrapper<>(name, instance, versionName, isLatestVersion)); + this.workflowSet.add(name); + this.workflows.add(name); if (StringUtils.isEmpty(versionName)) { - this.logger.info("Registered Workflow: {}", clazz.getSimpleName()); + this.logger.info("Registered Workflow {}: {}", name, instance.getClass()); } else { - this.logger.info("Registered Workflow Version: {} {} - isLatest {}", - clazz.getSimpleName(), versionName, isLatestVersion); + this.logger.info("Registered Workflow Version {}: {} {} - isLatest {}", + name, instance.getClass().getSimpleName(), versionName, isLatestVersion); } return this; @@ -198,7 +213,7 @@ public WorkflowRuntimeBuilder registerActivity(Clas * Registers an Activity object. * * @param any WorkflowActivity type - * @param name Name of the activity to register. + * @param name Name of the activity to register. * @param clazz Class of the activity to register. * @return the WorkflowRuntimeBuilder */ @@ -215,7 +230,7 @@ public WorkflowRuntimeBuilder registerActivity(Stri /** * Registers an Activity object. * - * @param any WorkflowActivity type + * @param any WorkflowActivity type * @param instance the class instance being registered * @return the WorkflowRuntimeBuilder */ @@ -226,8 +241,8 @@ public WorkflowRuntimeBuilder registerActivity(T in /** * Registers an Activity object. * - * @param any WorkflowActivity type - * @param name Name of the activity to register. + * @param any WorkflowActivity type + * @param name Name of the activity to register. * @param instance the class instance being registered * @return the WorkflowRuntimeBuilder */ diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowClassWrapperTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowClassWrapperTest.java index bc4ab0fb16..8458739a79 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowClassWrapperTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowClassWrapperTest.java @@ -74,7 +74,7 @@ public void createWithClass() { @Test public void createWithClassAndVersion() { TaskOrchestrationContext mockContext = mock(TaskOrchestrationContext.class); - WorkflowClassWrapper wrapper = new WorkflowClassWrapper<>(TestWorkflow.class, "TestWorkflowV1",false); + WorkflowClassWrapper wrapper = new WorkflowClassWrapper<>("TestWorkflow", TestWorkflow.class, "v1",false); when(mockContext.getInstanceId()).thenReturn("uuid"); wrapper.create().run(mockContext); verify(mockContext, times(1)).getInstanceId(); @@ -83,9 +83,9 @@ public void createWithClassAndVersion() { @Test public void createErrorClassAndVersion() { assertThrowsExactly(RuntimeException.class, () -> new WorkflowClassWrapper<>(TestErrorWorkflow.class)); - assertThrowsExactly(RuntimeException.class, () -> new WorkflowClassWrapper<>(TestErrorWorkflow.class, "TestWorkflowV1",false)); + assertThrowsExactly(RuntimeException.class, () -> new WorkflowClassWrapper<>("TestErrorWorkflow", TestErrorWorkflow.class, "v1",false)); - WorkflowClassWrapper wrapper = new WorkflowClassWrapper<>(TestPrivateWorkflow.class, "TestWorkflowV1",false); + WorkflowClassWrapper wrapper = new WorkflowClassWrapper<>("TestPrivateWorkflow", TestPrivateWorkflow.class, "v2",false); TaskOrchestrationContext mockContext = mock(TaskOrchestrationContext.class); assertThrowsExactly(RuntimeException.class, () -> wrapper.create().run(mockContext)); diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilderTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilderTest.java index 5d47368258..fcac119d84 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilderTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilderTest.java @@ -49,8 +49,8 @@ public void registerValidWorkflowClass() { @Test public void registerValidVersionWorkflowClass() { - assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerWorkflow(TestWorkflow.class,"testWorkflowV1", false)); - assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerWorkflow(TestWorkflow.class,"testWorkflowV2", true)); + assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerWorkflow("TestWorkflow", TestWorkflow.class,"testWorkflowV1", false)); + assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerWorkflow("TestWorkflow", TestWorkflow.class,"testWorkflowV2", true)); } @Test @@ -60,8 +60,8 @@ public void registerValidWorkflowInstance() { @Test public void registerValidVersionWorkflowInstance() { - assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerWorkflow(new TestWorkflow(),"testWorkflowV1", false)); - assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerWorkflow(new TestWorkflow(),"testWorkflowV2", true)); + assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerWorkflow("testWorkflowV1", new TestWorkflow(),"testWorkflowV1", false)); + assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerWorkflow("testWorkflowV2",new TestWorkflow(),"testWorkflowV2", true)); } From bae4d01056893a912e9c995089c473ef8e412f7e Mon Sep 17 00:00:00 2001 From: Javier Aliaga Date: Thu, 29 Jan 2026 17:41:06 +0100 Subject: [PATCH 09/10] chore: Fix versioning Signed-off-by: Javier Aliaga --- .../TaskOrchestrationExecutor.java | 30 +++++++++---------- .../workflows/client/DaprWorkflowClient.java | 7 ++--- 2 files changed, 17 insertions(+), 20 deletions(-) diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java index ae239b15ca..6096c9bdf9 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java @@ -106,7 +106,7 @@ public TaskOrchestratorResult execute(List pas return new TaskOrchestratorResult(context.pendingActions.values(), context.getCustomStatus(), - context.version, + context.versionName, context.encounteredPatches); } @@ -141,8 +141,9 @@ private class ContextImplTask implements TaskOrchestrationContext { private final Map appliedPatches = new HashMap<>(); private final Map historyPatches = new HashMap<>(); - private OrchestratorService.OrchestrationVersion orchestratorStartedVersion; - private String version; + private String orchestratorVersionName; + + private String versionName; public ContextImplTask(List pastEvents, List newEvents) { @@ -968,7 +969,14 @@ private void processEvent(OrchestratorService.HistoryEvent e) { case ORCHESTRATORSTARTED: Instant instant = DataConverter.getInstantFromTimestamp(e.getTimestamp()); this.setCurrentInstant(instant); - this.orchestratorStartedVersion = e.getOrchestratorStarted().getVersion(); + + if (StringUtils.isNotEmpty(e.getOrchestratorStarted().getVersion().getName())) { + this.orchestratorVersionName = e.getOrchestratorStarted().getVersion().getName(); + } + for (var patch : e.getOrchestratorStarted().getVersion().getPatchesList()) { + this.historyPatches.put(patch, true); + } + this.logger.fine(() -> this.instanceId + ": Workflow orchestrator started"); break; case ORCHESTRATORCOMPLETED: @@ -983,17 +991,9 @@ private void processEvent(OrchestratorService.HistoryEvent e) { this.logger.fine(() -> this.instanceId + ": Workflow execution started"); this.setAppId(e.getRouter().getSourceAppID()); - if (this.orchestratorStartedVersion != null - && this.orchestratorStartedVersion.getPatchesCount() > 0) { - for (var patch : this.orchestratorStartedVersion.getPatchesList()) { - this.historyPatches.put(patch, true); - } - } - var versionName = ""; - if (this.orchestratorStartedVersion != null - && !StringUtils.isEmpty(this.orchestratorStartedVersion.getName())) { - versionName = this.orchestratorStartedVersion.getName(); + if (!StringUtils.isEmpty(this.orchestratorVersionName)) { + versionName = this.orchestratorVersionName; } // Create and invoke the workflow orchestrator @@ -1010,7 +1010,7 @@ private void processEvent(OrchestratorService.HistoryEvent e) { throw new IllegalStateException("No factory found for orchestrator: " + executionStarted.getName()); } - this.version = factory.getVersionName(); + this.versionName = factory.getVersionName(); TaskOrchestration orchestrator = factory.create(); orchestrator.run(this); diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java b/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java index 09a2b1c8b4..5b55d036e1 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java @@ -159,22 +159,19 @@ public String scheduleNewWorkflow(String name, Object input * @return the instanceId parameter value. */ public String scheduleNewWorkflow(Class clazz, NewWorkflowOptions options) { - NewOrchestrationInstanceOptions orchestrationInstanceOptions = fromNewWorkflowOptions(options); - - return this.scheduleNewWorkflow(clazz.getCanonicalName(), orchestrationInstanceOptions); + return this.scheduleNewWorkflow(clazz.getCanonicalName(), options); } /** * Schedules a new workflow with a specified set of options for execution. * * @param any Workflow type - * @param name Class extending Workflow to start an instance of. + * @param name name of the workflow to schedule * @param options the options for the new workflow, including input, instance ID, etc. * @return the instanceId parameter value. */ public String scheduleNewWorkflow(String name, NewWorkflowOptions options) { NewOrchestrationInstanceOptions orchestrationInstanceOptions = fromNewWorkflowOptions(options); - return this.innerClient.scheduleNewOrchestrationInstance(name, orchestrationInstanceOptions); } From 5648faf0075eda983588d2b5c3de11950329af96 Mon Sep 17 00:00:00 2001 From: Javier Aliaga Date: Fri, 30 Jan 2026 12:07:25 +0100 Subject: [PATCH 10/10] chore: Integration test Signed-off-by: Javier Aliaga --- .../durabletask/DurableTaskGrpcWorker.java | 4 +- .../full/FullVersioningWorkflowsIT.java | 221 ++++++++++++++++++ .../version/full/VersionedWorkflows.java | 81 +++++++ .../version/full/WorkflowV1Worker.java | 22 ++ .../version/full/WorkflowV2Worker.java | 22 ++ .../workflows/runtime/WorkflowRuntime.java | 1 + 6 files changed, 350 insertions(+), 1 deletion(-) create mode 100644 sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/full/FullVersioningWorkflowsIT.java create mode 100644 sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/full/VersionedWorkflows.java create mode 100644 sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/full/WorkflowV1Worker.java create mode 100644 sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/full/WorkflowV2Worker.java diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java index ebbcfa3f55..d142ff4bca 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java @@ -117,7 +117,9 @@ public void start() { * */ public void close() { - this.workerThread.interrupt(); + if (this.workerThread != null) { + this.workerThread.interrupt(); + } this.isNormalShutdown = true; this.shutDownWorkerPool(); this.closeSideCarChannel(); diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/full/FullVersioningWorkflowsIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/full/FullVersioningWorkflowsIT.java new file mode 100644 index 0000000000..81f4d58eba --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/full/FullVersioningWorkflowsIT.java @@ -0,0 +1,221 @@ +/* + * Copyright 2024 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.it.testcontainers.workflows.version.full; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.dapr.config.Properties; +import io.dapr.it.spring.data.CustomMySQLContainer; +import io.dapr.it.testcontainers.ContainerConstants; +import io.dapr.it.testcontainers.workflows.TestWorkflowsApplication; +import io.dapr.it.testcontainers.workflows.TestWorkflowsConfiguration; +import io.dapr.testcontainers.Component; +import io.dapr.testcontainers.DaprContainer; +import io.dapr.testcontainers.DaprLogLevel; +import io.dapr.testcontainers.DaprPlacementContainer; +import io.dapr.testcontainers.DaprSchedulerContainer; +import io.dapr.workflows.client.DaprWorkflowClient; +import io.dapr.workflows.client.WorkflowState; +import io.dapr.workflows.runtime.WorkflowRuntime; +import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.containers.wait.strategy.WaitStrategy; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.HashMap; +import java.util.Map; + +import static io.dapr.it.spring.data.DaprSpringDataConstants.STATE_STORE_NAME; +import static io.dapr.testcontainers.DaprContainerConstants.DAPR_PLACEMENT_IMAGE_TAG; +import static io.dapr.testcontainers.DaprContainerConstants.DAPR_SCHEDULER_IMAGE_TAG; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +@SpringBootTest( + webEnvironment = WebEnvironment.RANDOM_PORT, + classes = { + TestWorkflowsConfiguration.class, + TestWorkflowsApplication.class + } +) +@Testcontainers +@Tag("testcontainers") +public class FullVersioningWorkflowsIT { + + private static final Network DAPR_NETWORK = Network.newNetwork(); + + private static final WaitStrategy MYSQL_WAIT_STRATEGY = Wait + .forLogMessage(".*port: 3306 MySQL Community Server \\(GPL\\).*", 1) + .withStartupTimeout(Duration.of(60, ChronoUnit.SECONDS)); + + private static final String STATE_STORE_DSN = "mysql:password@tcp(mysql:3306)/"; + private static final Map STATE_STORE_PROPERTIES = createStateStoreProperties(); + + @Container + private static final MySQLContainer MY_SQL_CONTAINER = new CustomMySQLContainer<>("mysql:5.7.34") + .withNetworkAliases("mysql") + .withDatabaseName("dapr_db") + .withUsername("mysql") + .withPassword("password") + .withNetwork(DAPR_NETWORK) + .waitingFor(MYSQL_WAIT_STRATEGY); + + @Container + private final static DaprPlacementContainer sharedPlacementContainer = new DaprPlacementContainer(DAPR_PLACEMENT_IMAGE_TAG) + .withNetwork(DAPR_NETWORK) + .withNetworkAliases("placement") + .withReuse(false); + + @Container + private final static DaprSchedulerContainer sharedSchedulerContainer = new DaprSchedulerContainer(DAPR_SCHEDULER_IMAGE_TAG) + .withNetwork(DAPR_NETWORK) + .withNetworkAliases("scheduler") + .withReuse(false); + + @Container + private static final DaprContainer DAPR_CONTAINER_V1 = new DaprContainer(DockerImageName.parse("daprio/daprd:1.17.0-rc.2").asCompatibleSubstituteFor("daprio/daprd:1.16.0-rc.5")) + .withAppName("dapr-worker-v1") + .withNetwork(DAPR_NETWORK) + .withComponent(new Component(STATE_STORE_NAME, "state.mysql", "v1", STATE_STORE_PROPERTIES)) + .withPlacementContainer(sharedPlacementContainer) + .withSchedulerContainer(sharedSchedulerContainer) + .withDaprLogLevel(DaprLogLevel.DEBUG) + .withLogConsumer(outputFrame -> System.out.println("daprV1 -> " +outputFrame.getUtf8String())) + .withAppChannelAddress("host.testcontainers.internal") + .dependsOn(MY_SQL_CONTAINER, sharedPlacementContainer, sharedSchedulerContainer); + + @Container + private static final DaprContainer DAPR_CONTAINER_V2 = new DaprContainer(DockerImageName.parse("daprio/daprd:1.17.0-rc.2").asCompatibleSubstituteFor("daprio/daprd:1.16.0-rc.5")) + .withAppName("dapr-worker-v2") + .withNetwork(DAPR_NETWORK) + .withComponent(new Component(STATE_STORE_NAME, "state.mysql", "v1", STATE_STORE_PROPERTIES)) + .withPlacementContainer(sharedPlacementContainer) + .withSchedulerContainer(sharedSchedulerContainer) + .withDaprLogLevel(DaprLogLevel.DEBUG) + .withLogConsumer(outputFrame -> System.out.println("daprV2 -> " + outputFrame.getUtf8String())) + .withAppChannelAddress("host.testcontainers.internal") + .dependsOn(MY_SQL_CONTAINER, sharedPlacementContainer, sharedSchedulerContainer); + + @Container + private final static GenericContainer workerV1 = new GenericContainer<>(ContainerConstants.JDK_17_TEMURIN_JAMMY) + .withCopyFileToContainer(MountableFile.forHostPath("target"), "/app") + .withWorkingDirectory("/app") + .withCommand("java", "-cp", "test-classes:classes:dependency/*:*", + "-Ddapr.app.id=dapr-worker-v1", + "-Ddapr.grpc.endpoint=dapr-worker-v1:50001", + "-Ddapr.http.endpoint=dapr-worker-v1:3500", + "io.dapr.it.testcontainers.workflows.version.full.WorkflowV2Worker") + .withNetwork(DAPR_NETWORK) + .dependsOn(DAPR_CONTAINER_V1) + .waitingFor(Wait.forLogMessage(".*WorkerV1 started.*", 1)) + .withLogConsumer(outputFrame -> System.out.println("WorkerV1: " + outputFrame.getUtf8String())); + +// This container will be started manually + private final static GenericContainer workerV2 = new GenericContainer<>(ContainerConstants.JDK_17_TEMURIN_JAMMY) + .withCopyFileToContainer(MountableFile.forHostPath("target"), "/app") + .withWorkingDirectory("/app") + .withCommand("java", "-cp", "test-classes:classes:dependency/*:*", + "-Ddapr.app.id=dapr-worker-v2", + "-Ddapr.grpc.endpoint=dapr-worker-v2:50001", + "-Ddapr.http.endpoint=dapr-worker-v2:3500", + "io.dapr.it.testcontainers.workflows.version.full.WorkflowV1Worker") + .withNetwork(DAPR_NETWORK) + .dependsOn(DAPR_CONTAINER_V2) + .waitingFor(Wait.forLogMessage(".*WorkerV2 started.*", 1)) + .withLogConsumer(outputFrame -> System.out.println("WorkerV2: " + outputFrame.getUtf8String())); + + + private static Map createStateStoreProperties() { + Map result = new HashMap<>(); + + result.put("keyPrefix", "name"); + result.put("schemaName", "dapr_db"); + result.put("actorStateStore", "true"); + result.put("connectionString", STATE_STORE_DSN); + + return result; + } + + @DynamicPropertySource + static void daprProperties(DynamicPropertyRegistry registry) { + registry.add("dapr.http.endpoint", DAPR_CONTAINER_V1::getHttpEndpoint); + registry.add("dapr.grpc.endpoint", DAPR_CONTAINER_V1::getGrpcEndpoint); + } + + @Test + public void testWorkflows() throws Exception { + DaprWorkflowClient workflowClientV1 = daprWorkflowClient(DAPR_CONTAINER_V1.getHttpEndpoint(), DAPR_CONTAINER_V1.getGrpcEndpoint()); +// Start workflow V1 + String instanceIdV1 = workflowClientV1.scheduleNewWorkflow("VersionWorkflow"); + workflowClientV1.waitForWorkflowStart(instanceIdV1, Duration.ofSeconds(10), false); + + + workerV2.start(); + DaprWorkflowClient workflowClientV2 = daprWorkflowClient(DAPR_CONTAINER_V2.getHttpEndpoint(), DAPR_CONTAINER_V2.getGrpcEndpoint()); + + // Start workflow V2 + String instanceIdV2 = workflowClientV2.scheduleNewWorkflow("VersionWorkflow"); + workflowClientV2.waitForWorkflowStart(instanceIdV2, Duration.ofSeconds(10), false); + +// Continue workflow V1 + workflowClientV1.raiseEvent(instanceIdV1, "test", null); + + // Wait for workflow to complete + Duration timeout = Duration.ofSeconds(10); + WorkflowState workflowStatusV1 = workflowClientV1.waitForWorkflowCompletion(instanceIdV1, timeout, true); + WorkflowState workflowStatusV2 = workflowClientV2.waitForWorkflowCompletion(instanceIdV2, timeout, true); + + assertNotNull(workflowStatusV1); + assertNotNull(workflowStatusV2); + + String resultV1 = workflowStatusV1.readOutputAs(String.class); + assertEquals("Activity1, Activity2", resultV1); + + String resultV2 = workflowStatusV1.readOutputAs(String.class); + assertEquals("Activity3, Activity4", resultV2); + } + + + + public DaprWorkflowClient daprWorkflowClient( + String daprHttpEndpoint, + String daprGrpcEndpoint + ){ + Map overrides = Map.of( + "dapr.http.endpoint", daprHttpEndpoint, + "dapr.grpc.endpoint", daprGrpcEndpoint + ); + + return new DaprWorkflowClient(new Properties(overrides)); + } + + +} + diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/full/VersionedWorkflows.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/full/VersionedWorkflows.java new file mode 100644 index 0000000000..0d835c9be8 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/full/VersionedWorkflows.java @@ -0,0 +1,81 @@ +package io.dapr.it.testcontainers.workflows.version.full; + +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; +import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; + +public class VersionedWorkflows { + public static final String ACTIVITY_1 = "Activity1"; + public static final String ACTIVITY_2 = "Activity2"; + public static final String ACTIVITY_3 = "Activity3"; + public static final String ACTIVITY_4 = "Activity4"; + + + + public static class FullVersionWorkflowV1 implements Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting Workflow V1: {}", ctx.getName()); + + String result = ""; + result += ctx.callActivity(VersionedWorkflows.ACTIVITY_1, String.class).await() +", "; + ctx.waitForExternalEvent("test").await(); + result += ctx.callActivity(VersionedWorkflows.ACTIVITY_2, String.class).await(); + + ctx.getLogger().info("Workflow finished with result: {}", result); + ctx.complete(result); + }; + } + } + + public static class FullVersionWorkflowV2 implements Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting Workflow V2: {}", ctx.getName()); + + String result = ""; + result += ctx.callActivity(VersionedWorkflows.ACTIVITY_3, String.class).await() +", "; + result += ctx.callActivity(VersionedWorkflows.ACTIVITY_4, String.class).await(); + + ctx.getLogger().info("Workflow finished with result: {}", result); + ctx.complete(result); + }; + } + } + + public static void addWorkflowV1(WorkflowRuntimeBuilder workflowRuntimeBuilder) { + workflowRuntimeBuilder.registerWorkflow("VersionWorkflow", + VersionedWorkflows.FullVersionWorkflowV1.class, + "V1", + true); + + workflowRuntimeBuilder.registerActivity(VersionedWorkflows.ACTIVITY_1, (ctx -> { + System.out.println("Activity1 called."); + return VersionedWorkflows.ACTIVITY_1; + })); + + workflowRuntimeBuilder.registerActivity(VersionedWorkflows.ACTIVITY_2, (ctx -> { + System.out.println("Activity2 called."); + return VersionedWorkflows.ACTIVITY_2; + })); + } + + public static void addWorkflowV2(WorkflowRuntimeBuilder workflowRuntimeBuilder) { + workflowRuntimeBuilder.registerWorkflow("VersionWorkflow", + VersionedWorkflows.FullVersionWorkflowV2.class, + "V2", + true); + + workflowRuntimeBuilder.registerActivity(VersionedWorkflows.ACTIVITY_3, (ctx -> { + System.out.println("Activity3 called."); + return VersionedWorkflows.ACTIVITY_3; + })); + + workflowRuntimeBuilder.registerActivity(VersionedWorkflows.ACTIVITY_4, (ctx -> { + System.out.println("Activity4 called."); + return VersionedWorkflows.ACTIVITY_4; + })); + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/full/WorkflowV1Worker.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/full/WorkflowV1Worker.java new file mode 100644 index 0000000000..cdca0941b1 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/full/WorkflowV1Worker.java @@ -0,0 +1,22 @@ +package io.dapr.it.testcontainers.workflows.version.full; + +import io.dapr.it.testcontainers.workflows.multiapp.MultiAppWorkflow; +import io.dapr.workflows.runtime.WorkflowRuntime; +import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; + +public class WorkflowV1Worker { + public static void main(String[] args) throws Exception { + System.out.println("=== Starting Workflow V1 Runtime ==="); + + // Register the Workflow with the builder + WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder(); + VersionedWorkflows.addWorkflowV1(builder); + + // Build and start the workflow runtime + try (WorkflowRuntime runtime = builder.build()) { + System.out.println("WorkerV1 started"); + System.out.println("Waiting for workflow orchestration requests..."); + runtime.start(); + } + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/full/WorkflowV2Worker.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/full/WorkflowV2Worker.java new file mode 100644 index 0000000000..725adcfbab --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/full/WorkflowV2Worker.java @@ -0,0 +1,22 @@ +package io.dapr.it.testcontainers.workflows.version.full; + +import io.dapr.workflows.runtime.WorkflowRuntime; +import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; + +public class WorkflowV2Worker { + public static void main(String[] args) throws Exception { + System.out.println("=== Starting Workflow V2 Runtime ==="); + + // Register the Workflow with the builder + WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder(); + VersionedWorkflows.addWorkflowV1(builder); + VersionedWorkflows.addWorkflowV2(builder); + + // Build and start the workflow runtime + try (WorkflowRuntime runtime = builder.build()) { + System.out.println("WorkerV2 started"); + System.out.println("Waiting for workflow orchestration requests..."); + runtime.start(); + } + } +} diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntime.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntime.java index 09f831a563..f5d47f3e79 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntime.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntime.java @@ -70,6 +70,7 @@ public void start(boolean block) { public void close() { this.shutDownWorkerPool(); this.closeSideCarChannel(); + this.worker.close(); } private void closeSideCarChannel() {