From c2e5245939c92f7f29890f1f63e40838205c928d Mon Sep 17 00:00:00 2001 From: Cassandra Coyle Date: Wed, 7 May 2025 12:59:09 -0500 Subject: [PATCH 1/3] try-catch Signed-off-by: Cassandra Coyle --- .../durabletask/DurableTaskGrpcWorker.java | 24 +++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java index 28a6e358..fbd714d3 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java @@ -136,7 +136,17 @@ public void startAndBlock() { .setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus())) .build(); - this.sidecarClient.completeOrchestratorTask(response); + try { + this.sidecarClient.completeOrchestratorTask(response); + } catch (StatusRuntimeException e) { + if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { + logger.log(Level.WARNING, "The sidecar at address {0} is unavailable while completing the orchestrator task.", this.getSidecarAddress()); + } else if (e.getStatus().getCode() == Status.Code.CANCELLED) { + logger.log(Level.WARNING, "Durable Task worker has disconnected from {0} while completing the orchestrator task.", this.getSidecarAddress()); + } else { + logger.log(Level.WARNING, "Unexpected failure completing the orchestrator task at {0}.", this.getSidecarAddress()); + } + } }); } else if (requestType == RequestCase.ACTIVITYREQUEST) { ActivityRequest activityRequest = workItem.getActivityRequest(); @@ -169,7 +179,17 @@ public void startAndBlock() { responseBuilder.setFailureDetails(failureDetails); } - this.sidecarClient.completeActivityTask(responseBuilder.build()); + try { + this.sidecarClient.completeActivityTask(responseBuilder.build()); + } catch (StatusRuntimeException e) { + if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { + logger.log(Level.WARNING, "The sidecar at address {0} is unavailable while completing the activity task.", this.getSidecarAddress()); + } else if (e.getStatus().getCode() == Status.Code.CANCELLED) { + logger.log(Level.WARNING, "Durable Task worker has disconnected from {0} while completing the activity task.", this.getSidecarAddress()); + } else { + logger.log(Level.WARNING, "Unexpected failure completing the activity task at {0}.", this.getSidecarAddress()); + } + } }); } else { logger.log(Level.WARNING, "Received and dropped an unknown '{0}' work-item from the sidecar.", requestType); From 18e5901e8531141e80fde18492937d2a9213d9c3 Mon Sep 17 00:00:00 2001 From: Cassandra Coyle Date: Wed, 7 May 2025 13:03:23 -0500 Subject: [PATCH 2/3] log unexpected shutdown Signed-off-by: Cassandra Coyle --- .../com/microsoft/durabletask/DurableTaskGrpcWorker.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java index fbd714d3..bc89352b 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java @@ -37,6 +37,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { private final ExecutorService workerPool; private final TaskHubSidecarServiceBlockingStub sidecarClient; + private volatile boolean isNormalShutdown = false; DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) { this.orchestrationFactories.putAll(builder.orchestrationFactories); @@ -218,6 +219,7 @@ public void startAndBlock() { * Stops the current worker's listen loop, preventing any new orchestrator or activity events from being processed. */ public void stop() { + this.isNormalShutdown = true; this.close(); } @@ -234,7 +236,9 @@ private void closeSideCarChannel() { } private void shutDownWorkerPool() { - logger.log(Level.WARNING, "ExecutorService shutdown initiated. No new tasks will be accepted"); + if (!this.isNormalShutdown) { + logger.log(Level.WARNING, "ExecutorService shutdown initiated unexpectedly. No new tasks will be accepted"); + } this.workerPool.shutdown(); try { From f7ddbbade335556ca2af9699ce2261e7371451b9 Mon Sep 17 00:00:00 2001 From: Cassandra Coyle Date: Wed, 7 May 2025 13:20:33 -0500 Subject: [PATCH 3/3] move shutdown to close() Signed-off-by: Cassandra Coyle --- .../java/com/microsoft/durabletask/DurableTaskGrpcWorker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java index bc89352b..3650bcbf 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java @@ -87,6 +87,7 @@ public void start() { * configured. */ public void close() { + this.isNormalShutdown = true; this.shutDownWorkerPool(); this.closeSideCarChannel(); } @@ -219,7 +220,6 @@ public void startAndBlock() { * Stops the current worker's listen loop, preventing any new orchestrator or activity events from being processed. */ public void stop() { - this.isNormalShutdown = true; this.close(); }