diff --git a/mlflow/utils/async_logging/async_logging_queue.py b/mlflow/utils/async_logging/async_logging_queue.py index 08096a575a08c..c9c2941b83b72 100644 --- a/mlflow/utils/async_logging/async_logging_queue.py +++ b/mlflow/utils/async_logging/async_logging_queue.py @@ -141,23 +141,40 @@ def _fetch_batch_from_queue(self) -> list[RunBatch]: batches = [] if self._queue.empty(): return batches - queue_size = self._queue.qsize() # Estimate the queue's size. - merged_batch = self._queue.get() - for i in range(queue_size - 1): - if self._queue.empty(): + + # Efficiently drain the queue into a local list to minimize locking/queue interaction overhead + local_batches = [] + while True: + try: + local_batches.append(self._queue.get_nowait()) + except Exception: + # When queue is empty, break # `queue_size` is an estimate, so we need to check if the queue is empty. break - batch = self._queue.get() + + if not local_batches: + return batches + + merged_batch = local_batches[0] + + for batch in local_batches[1:]: + # Precompute primitive lengths to avoid repeated `len(...) + ...` allocations + merged_metrics_len = len(merged_batch.metrics) + merged_params_len = len(merged_batch.params) + merged_tags_len = len(merged_batch.tags) + batch_metrics_len = len(batch.metrics) + batch_params_len = len(batch.params) + batch_tags_len = len(batch.tags) + if ( merged_batch.run_id != batch.run_id or ( - len(merged_batch.metrics + merged_batch.params + merged_batch.tags) - + len(batch.metrics + batch.params + batch.tags) - ) - >= _MAX_ITEMS_PER_BATCH - or len(merged_batch.params) + len(batch.params) >= _MAX_PARAMS_PER_BATCH - or len(merged_batch.tags) + len(batch.tags) >= _MAX_TAGS_PER_BATCH + merged_metrics_len + merged_params_len + merged_tags_len + + batch_metrics_len + batch_params_len + batch_tags_len + ) >= _MAX_ITEMS_PER_BATCH + or merged_params_len + batch_params_len >= _MAX_PARAMS_PER_BATCH + or merged_tags_len + batch_tags_len >= _MAX_TAGS_PER_BATCH ): # Make a new batch if the run_id is different or the batch is full. batches.append(merged_batch)