From b62d3d1a7cab648dbffd45b8860d04c981a678aa Mon Sep 17 00:00:00 2001 From: Maxwell Lou Date: Tue, 3 Jun 2025 11:22:39 -0400 Subject: [PATCH] Fix queue drain bug, causing child processes to hang and never join --- utilities/workers/queue_proxy_wrapper.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/utilities/workers/queue_proxy_wrapper.py b/utilities/workers/queue_proxy_wrapper.py index 633efece..e2467641 100644 --- a/utilities/workers/queue_proxy_wrapper.py +++ b/utilities/workers/queue_proxy_wrapper.py @@ -10,6 +10,7 @@ class QueueProxyWrapper: """ Wrapper for an underlying queue proxy which also stores maxsize. + `maxsize <= 0` means the queue has infinite size. """ __QUEUE_TIMEOUT = 0.1 # seconds @@ -29,8 +30,7 @@ def fill_queue_with_sentinel(self, timeout: float = 0.0) -> None: timeout = self.__QUEUE_TIMEOUT try: - self.queue.put(None, timeout=timeout) - for _ in range(1, self.maxsize): + for _ in range(self.maxsize): self.queue.put(None, timeout=timeout) except queue.Full: return @@ -45,7 +45,8 @@ def drain_queue(self, timeout: float = 0.0) -> None: timeout = self.__QUEUE_TIMEOUT try: - self.queue.get(timeout=timeout) + for _ in range(self.maxsize): + self.queue.get(timeout=timeout) except queue.Empty: return