From a574f2109565725a93e3c3a058d313dd038e49b4 Mon Sep 17 00:00:00 2001 From: Riya Chakraborty Date: Thu, 3 Apr 2025 13:30:28 -0700 Subject: [PATCH 1/6] add logs --- arroyo/processing/processor.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index cfa0082a..9fb46e53 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -35,6 +35,7 @@ METRICS_FREQUENCY_SEC = 1.0 # In seconds BACKPRESSURE_THRESHOLD = 5.0 # In seconds +LOGGING_FREQUENCY_SEC = 60.0 # In seconds F = TypeVar("F", bound=Callable[[Any], Any]) @@ -169,6 +170,10 @@ def __init__( DlqPolicyWrapper(dlq_policy) if dlq_policy is not None else None ) + self.__last_run_log_ts = time.time() + self.__last_pause_ts = time.time() + self.__last_empty_msg_ts = time.time() + def _close_strategy() -> None: start_close = time.time() @@ -395,6 +400,10 @@ def _handle_invalid_message(self, exc: InvalidMessage) -> None: def _run_once(self) -> None: self.__metrics_buffer.incr_counter("arroyo.consumer.run.count", 1) + if time.time() - self.__last_run_log_ts >= LOGGING_FREQUENCY_SEC: + logger.info("Arroyo consumer _run_once loop started") + self.__last_run_log_ts = time.time() + message_carried_over = self.__message is not None if not message_carried_over: @@ -408,6 +417,12 @@ def _run_once(self) -> None: self.__metrics_buffer.incr_timing( "arroyo.consumer.poll.time", time.time() - start_poll ) + + if self.__message is None and not self.__is_paused: + if time.time() - self.__last_empty_msg_ts >= LOGGING_FREQUENCY_SEC: + logger.info("Consumer is not paused, but did not receive a message from underlying consumer") + self.__last_empty_msg_ts = time.time() + except RecoverableError: return @@ -477,6 +492,10 @@ def _run_once(self) -> None: # getting revoked by the broker after reaching the max.poll.interval.ms # Polling a paused consumer should never yield a message. assert self.__consumer.poll(0.1) is None + + if time.time() - self.__last_pause_ts >= LOGGING_FREQUENCY_SEC: + logger.info("Consumer is paused, polling") + self.__last_pause_ts = time.time() else: time.sleep(0.01) From 66ed79663b6bd0e74abefca5224227f4db61915b Mon Sep 17 00:00:00 2001 From: Riya Chakraborty Date: Thu, 3 Apr 2025 15:11:37 -0700 Subject: [PATCH 2/6] modify logs --- arroyo/processing/processor.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index 9fb46e53..68721f96 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -418,6 +418,7 @@ def _run_once(self) -> None: "arroyo.consumer.poll.time", time.time() - start_poll ) + # Records a log if the consumer has been active but receiving no message from poll() for longer than a threshold duration if self.__message is None and not self.__is_paused: if time.time() - self.__last_empty_msg_ts >= LOGGING_FREQUENCY_SEC: logger.info("Consumer is not paused, but did not receive a message from underlying consumer") @@ -438,6 +439,9 @@ def _run_once(self) -> None: "arroyo.consumer.processing.time", time.time() - start_poll ) if self.__message is not None: + + # Reset the timer + self.__last_empty_msg_ts = time.time() try: start_submit = time.time() message = ( @@ -482,6 +486,9 @@ def _run_once(self) -> None: paused_partitions, ) self.__is_paused = False + + # Reset if we unpause + self.__last_pause_ts = time.time() # unpause paused partitions... just in case a subset is paused self.__metrics_buffer.incr_counter( "arroyo.consumer.resume", 1 @@ -493,6 +500,7 @@ def _run_once(self) -> None: # Polling a paused consumer should never yield a message. assert self.__consumer.poll(0.1) is None + # Records a log if the consumer has been paused for longer than a threshold duration if time.time() - self.__last_pause_ts >= LOGGING_FREQUENCY_SEC: logger.info("Consumer is paused, polling") self.__last_pause_ts = time.time() @@ -508,6 +516,8 @@ def _run_once(self) -> None: self.__metrics_buffer.incr_counter("arroyo.consumer.resume", 1) self.__consumer.resume([*self.__consumer.tell().keys()]) self.__is_paused = False + # Reset the timer since we unpaused + self.__last_pause_ts = time.time() # Clear backpressure timestamp if it is set self._clear_backpressure() From 3a51425dba8f33de454de061b2d6c93affa11003 Mon Sep 17 00:00:00 2001 From: Riya Chakraborty Date: Fri, 4 Apr 2025 14:11:48 -0700 Subject: [PATCH 3/6] better log --- arroyo/processing/processor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index 68721f96..8924213b 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -421,7 +421,7 @@ def _run_once(self) -> None: # Records a log if the consumer has been active but receiving no message from poll() for longer than a threshold duration if self.__message is None and not self.__is_paused: if time.time() - self.__last_empty_msg_ts >= LOGGING_FREQUENCY_SEC: - logger.info("Consumer is not paused, but did not receive a message from underlying consumer") + logger.info(f"Consumer is not paused and did not receive a message from underlying consumer for {LOGGING_FREQUENCY_SEC} seconds") self.__last_empty_msg_ts = time.time() except RecoverableError: @@ -502,7 +502,7 @@ def _run_once(self) -> None: # Records a log if the consumer has been paused for longer than a threshold duration if time.time() - self.__last_pause_ts >= LOGGING_FREQUENCY_SEC: - logger.info("Consumer is paused, polling") + logger.info(f"Consumer has been paused for {LOGGING_FREQUENCY_SEC} seconds") self.__last_pause_ts = time.time() else: time.sleep(0.01) From d4dfc610fa5d85e71c4d2c280331d27118ec9dfa Mon Sep 17 00:00:00 2001 From: Riya Chakraborty Date: Fri, 4 Apr 2025 15:13:52 -0700 Subject: [PATCH 4/6] move logs around --- arroyo/processing/processor.py | 42 ++++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index 8924213b..d57b2525 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -35,7 +35,7 @@ METRICS_FREQUENCY_SEC = 1.0 # In seconds BACKPRESSURE_THRESHOLD = 5.0 # In seconds -LOGGING_FREQUENCY_SEC = 60.0 # In seconds +LOGGING_FREQUENCY_SEC = 180.0 # In seconds F = TypeVar("F", bound=Callable[[Any], Any]) @@ -170,9 +170,9 @@ def __init__( DlqPolicyWrapper(dlq_policy) if dlq_policy is not None else None ) - self.__last_run_log_ts = time.time() - self.__last_pause_ts = time.time() - self.__last_empty_msg_ts = time.time() + self.__last_run_log_ts = time.time() # This is for throttling the logging of each run loop per-consumer + self.__last_pause_ts = None + self.__last_empty_msg_ts = None def _close_strategy() -> None: start_close = time.time() @@ -418,11 +418,24 @@ def _run_once(self) -> None: "arroyo.consumer.poll.time", time.time() - start_poll ) - # Records a log if the consumer has been active but receiving no message from poll() for longer than a threshold duration - if self.__message is None and not self.__is_paused: - if time.time() - self.__last_empty_msg_ts >= LOGGING_FREQUENCY_SEC: - logger.info(f"Consumer is not paused and did not receive a message from underlying consumer for {LOGGING_FREQUENCY_SEC} seconds") - self.__last_empty_msg_ts = time.time() + if self.__message is None: + if not self.__is_paused: + if self.__last_empty_msg_ts is None: + self.__last_empty_msg_ts = time.time() + + # Records a log if the consumer has been active but receiving no message from poll() for longer than a threshold duration + if time.time() - self.__last_empty_msg_ts >= LOGGING_FREQUENCY_SEC: + logger.info(f"Consumer is not paused but did not receive a message from underlying consumer for {LOGGING_FREQUENCY_SEC} seconds") + self.__last_empty_msg_ts = time.time() + + else: + if self.__last_pause_ts is None: + self.__last_pause_ts = time.time() + + # Records a log if the consumer has been paused for longer than a threshold duration + if time.time() - self.__last_pause_ts >= LOGGING_FREQUENCY_SEC: + logger.info(f"Consumer has been paused for {LOGGING_FREQUENCY_SEC} seconds") + self.__last_pause_ts = time.time() except RecoverableError: return @@ -441,7 +454,7 @@ def _run_once(self) -> None: if self.__message is not None: # Reset the timer - self.__last_empty_msg_ts = time.time() + self.__last_empty_msg_ts = None try: start_submit = time.time() message = ( @@ -488,7 +501,7 @@ def _run_once(self) -> None: self.__is_paused = False # Reset if we unpause - self.__last_pause_ts = time.time() + self.__last_pause_ts = None # unpause paused partitions... just in case a subset is paused self.__metrics_buffer.incr_counter( "arroyo.consumer.resume", 1 @@ -499,11 +512,6 @@ def _run_once(self) -> None: # getting revoked by the broker after reaching the max.poll.interval.ms # Polling a paused consumer should never yield a message. assert self.__consumer.poll(0.1) is None - - # Records a log if the consumer has been paused for longer than a threshold duration - if time.time() - self.__last_pause_ts >= LOGGING_FREQUENCY_SEC: - logger.info(f"Consumer has been paused for {LOGGING_FREQUENCY_SEC} seconds") - self.__last_pause_ts = time.time() else: time.sleep(0.01) @@ -517,7 +525,7 @@ def _run_once(self) -> None: self.__consumer.resume([*self.__consumer.tell().keys()]) self.__is_paused = False # Reset the timer since we unpaused - self.__last_pause_ts = time.time() + self.__last_pause_ts = None # Clear backpressure timestamp if it is set self._clear_backpressure() From 6cfe424e7d91e415bb10a258d17b50dd57d3d0ed Mon Sep 17 00:00:00 2001 From: Riya Chakraborty Date: Mon, 7 Apr 2025 10:15:21 -0700 Subject: [PATCH 5/6] typinng --- arroyo/processing/processor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index d57b2525..61665b63 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -424,7 +424,7 @@ def _run_once(self) -> None: self.__last_empty_msg_ts = time.time() # Records a log if the consumer has been active but receiving no message from poll() for longer than a threshold duration - if time.time() - self.__last_empty_msg_ts >= LOGGING_FREQUENCY_SEC: + elif time.time() - self.__last_empty_msg_ts >= LOGGING_FREQUENCY_SEC: logger.info(f"Consumer is not paused but did not receive a message from underlying consumer for {LOGGING_FREQUENCY_SEC} seconds") self.__last_empty_msg_ts = time.time() @@ -433,7 +433,7 @@ def _run_once(self) -> None: self.__last_pause_ts = time.time() # Records a log if the consumer has been paused for longer than a threshold duration - if time.time() - self.__last_pause_ts >= LOGGING_FREQUENCY_SEC: + elif time.time() - self.__last_pause_ts >= LOGGING_FREQUENCY_SEC: logger.info(f"Consumer has been paused for {LOGGING_FREQUENCY_SEC} seconds") self.__last_pause_ts = time.time() From 99c36ed1e32cbce95c586bfa193fed359e7a0065 Mon Sep 17 00:00:00 2001 From: Riya Chakraborty Date: Mon, 7 Apr 2025 10:21:05 -0700 Subject: [PATCH 6/6] typing --- arroyo/processing/processor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index 61665b63..5a7fbbef 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -171,8 +171,8 @@ def __init__( ) self.__last_run_log_ts = time.time() # This is for throttling the logging of each run loop per-consumer - self.__last_pause_ts = None - self.__last_empty_msg_ts = None + self.__last_pause_ts: Optional[float] = None + self.__last_empty_msg_ts: Optional[float] = None def _close_strategy() -> None: start_close = time.time()