Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions arroyo/processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import functools
import logging
import os
import signal
import time
from collections import defaultdict
from typing import (
Expand Down Expand Up @@ -139,6 +141,7 @@ def __init__(
commit_policy: CommitPolicy = ONCE_PER_SECOND,
dlq_policy: Optional[DlqPolicy[TStrategyPayload]] = None,
join_timeout: Optional[float] = None,
run_stuck_detector: bool = False,
) -> None:
self.__consumer = consumer
self.__processor_factory = processor_factory
Expand Down Expand Up @@ -169,6 +172,11 @@ def __init__(
DlqPolicyWrapper(dlq_policy) if dlq_policy is not None else None
)

self.__stuck = True

if run_stuck_detector:
self.stuck_detector_run()

def _close_strategy() -> None:
start_close = time.time()

Expand Down Expand Up @@ -348,6 +356,30 @@ def run(self) -> None:
logger.info("Processor terminated")
raise

def stuck_detector_run(self) -> None:
def report_stuck(*args: Any) -> None:
# send stacktrace to sentry using exc_info
logger.warning("main thread stuck for more than 60 seconds", exc_info=True)

signal.signal(signal.SIGUSR1, report_stuck)

import threading

def f() -> None:
i = 0
while True:
if self.__stuck:
i += 1

if i >= 60:
os.kill(os.getpid(), signal.SIGUSR1)
return

time.sleep(1)

t = threading.Thread(target=f, daemon=True)
t.start()

def _clear_backpressure(self) -> None:
if self.__backpressure_timestamp is not None:
self.__metrics_buffer.incr_timing(
Expand Down Expand Up @@ -394,6 +426,7 @@ def _handle_invalid_message(self, exc: InvalidMessage) -> None:

def _run_once(self) -> None:
self.__metrics_buffer.incr_counter("arroyo.consumer.run.count", 1)
self.__stuck = False

message_carried_over = self.__message is not None

Expand Down