diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index cfa0082a..971d3e32 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -2,6 +2,8 @@ import functools import logging +import os +import signal import time from collections import defaultdict from typing import ( @@ -139,6 +141,7 @@ def __init__( commit_policy: CommitPolicy = ONCE_PER_SECOND, dlq_policy: Optional[DlqPolicy[TStrategyPayload]] = None, join_timeout: Optional[float] = None, + run_stuck_detector: bool = False, ) -> None: self.__consumer = consumer self.__processor_factory = processor_factory @@ -169,6 +172,11 @@ def __init__( DlqPolicyWrapper(dlq_policy) if dlq_policy is not None else None ) + self.__stuck = True + + if run_stuck_detector: + self.stuck_detector_run() + def _close_strategy() -> None: start_close = time.time() @@ -348,6 +356,30 @@ def run(self) -> None: logger.info("Processor terminated") raise + def stuck_detector_run(self) -> None: + def report_stuck(*args: Any) -> None: + # send stacktrace to sentry using exc_info + logger.warning("main thread stuck for more than 60 seconds", exc_info=True) + + signal.signal(signal.SIGUSR1, report_stuck) + + import threading + + def f() -> None: + i = 0 + while True: + if self.__stuck: + i += 1 + + if i >= 60: + os.kill(os.getpid(), signal.SIGUSR1) + return + + time.sleep(1) + + t = threading.Thread(target=f, daemon=True) + t.start() + def _clear_backpressure(self) -> None: if self.__backpressure_timestamp is not None: self.__metrics_buffer.incr_timing( @@ -394,6 +426,7 @@ def _handle_invalid_message(self, exc: InvalidMessage) -> None: def _run_once(self) -> None: self.__metrics_buffer.incr_counter("arroyo.consumer.run.count", 1) + self.__stuck = False message_carried_over = self.__message is not None