diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java index 3650bcbf..d48c1c4b 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java @@ -37,6 +37,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { private final ExecutorService workerPool; private final TaskHubSidecarServiceBlockingStub sidecarClient; + private final boolean isExecutorServiceManaged; private volatile boolean isNormalShutdown = false; DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) { @@ -67,6 +68,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter(); this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL; this.workerPool = builder.executorService != null ? builder.executorService : Executors.newCachedThreadPool(); + this.isExecutorServiceManaged = builder.executorService == null; } /** @@ -81,10 +83,10 @@ public void start() { } /** - * Closes the internally managed gRPC channel, if one exists. + * Closes the internally managed gRPC channel and executor service, if one exists. *

- * This method is a no-op if this client object was created using a builder with a gRPC channel object explicitly - * configured. + * Only the internally managed GRPC Channel and Executor services are closed. If any of them are supplied, + * it is the responsibility of the supplier to take care of them. */ public void close() { this.isNormalShutdown = true; @@ -116,7 +118,7 @@ public void startAndBlock() { logger); // TODO: How do we interrupt manually? - while (!this.workerPool.isShutdown()) { + while (true) { try { GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest.newBuilder().build(); Iterator workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest); @@ -236,17 +238,19 @@ private void closeSideCarChannel() { } private void shutDownWorkerPool() { - if (!this.isNormalShutdown) { - logger.log(Level.WARNING, "ExecutorService shutdown initiated unexpectedly. No new tasks will be accepted"); - } + if (this.isExecutorServiceManaged) { + if (!this.isNormalShutdown) { + logger.log(Level.WARNING, "ExecutorService shutdown initiated unexpectedly. No new tasks will be accepted"); + } - this.workerPool.shutdown(); - try { - if (!this.workerPool.awaitTermination(60, TimeUnit.SECONDS)) { - this.workerPool.shutdownNow(); + this.workerPool.shutdown(); + try { + if (!this.workerPool.awaitTermination(60, TimeUnit.SECONDS)) { + this.workerPool.shutdownNow(); + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); } - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); } }