From d62c53e68b7b8cbc2a2f8579c71a9025021114fa Mon Sep 17 00:00:00 2001 From: siri-varma Date: Wed, 7 May 2025 13:58:41 -0700 Subject: [PATCH 1/2] Add managed Signed-off-by: siri-varma --- .../durabletask/DurableTaskGrpcWorker.java | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java index 3650bcbf..8a34a35f 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java @@ -37,6 +37,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { private final ExecutorService workerPool; private final TaskHubSidecarServiceBlockingStub sidecarClient; + private final boolean isExecutorServiceManaged; private volatile boolean isNormalShutdown = false; DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) { @@ -67,6 +68,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter(); this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL; this.workerPool = builder.executorService != null ? builder.executorService : Executors.newCachedThreadPool(); + this.isExecutorServiceManaged = builder.executorService == null; } /** @@ -81,10 +83,10 @@ public void start() { } /** - * Closes the internally managed gRPC channel, if one exists. + * Closes the internally managed gRPC channel and executor service, if one exists. *

- * This method is a no-op if this client object was created using a builder with a gRPC channel object explicitly - * configured. + * Only the internally managed GRPC Channel and Executor services are closed. If any of them are supplied, + * it is the responsibility of the supplier to manage them. */ public void close() { this.isNormalShutdown = true; @@ -116,7 +118,7 @@ public void startAndBlock() { logger); // TODO: How do we interrupt manually? - while (!this.workerPool.isShutdown()) { + while (true) { try { GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest.newBuilder().build(); Iterator workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest); @@ -236,17 +238,19 @@ private void closeSideCarChannel() { } private void shutDownWorkerPool() { - if (!this.isNormalShutdown) { - logger.log(Level.WARNING, "ExecutorService shutdown initiated unexpectedly. No new tasks will be accepted"); - } + if (this.isExecutorServiceManaged) { + if (!this.isNormalShutdown) { + logger.log(Level.WARNING, "ExecutorService shutdown initiated unexpectedly. No new tasks will be accepted"); + } - this.workerPool.shutdown(); - try { - if (!this.workerPool.awaitTermination(60, TimeUnit.SECONDS)) { - this.workerPool.shutdownNow(); + this.workerPool.shutdown(); + try { + if (!this.workerPool.awaitTermination(60, TimeUnit.SECONDS)) { + this.workerPool.shutdownNow(); + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); } - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); } } From 8f06a7e5c3b51e6a611dda380e0180a46d996e17 Mon Sep 17 00:00:00 2001 From: siri-varma Date: Wed, 7 May 2025 14:01:40 -0700 Subject: [PATCH 2/2] Add managed Signed-off-by: siri-varma --- .../java/com/microsoft/durabletask/DurableTaskGrpcWorker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java index 8a34a35f..d48c1c4b 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java @@ -86,7 +86,7 @@ public void start() { * Closes the internally managed gRPC channel and executor service, if one exists. *

* Only the internally managed GRPC Channel and Executor services are closed. If any of them are supplied, - * it is the responsibility of the supplier to manage them. + * it is the responsibility of the supplier to take care of them. */ public void close() { this.isNormalShutdown = true;