Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions mlflow/utils/async_logging/async_logging_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,23 +141,40 @@ def _fetch_batch_from_queue(self) -> list[RunBatch]:
batches = []
if self._queue.empty():
return batches
queue_size = self._queue.qsize() # Estimate the queue's size.
merged_batch = self._queue.get()
for i in range(queue_size - 1):
if self._queue.empty():

# Efficiently drain the queue into a local list to minimize locking/queue interaction overhead
local_batches = []
while True:
try:
local_batches.append(self._queue.get_nowait())
except Exception:
# When queue is empty, break
# `queue_size` is an estimate, so we need to check if the queue is empty.
break
batch = self._queue.get()

if not local_batches:
return batches

merged_batch = local_batches[0]

for batch in local_batches[1:]:
# Precompute primitive lengths to avoid repeated `len(...) + ...` allocations
merged_metrics_len = len(merged_batch.metrics)
merged_params_len = len(merged_batch.params)
merged_tags_len = len(merged_batch.tags)
batch_metrics_len = len(batch.metrics)
batch_params_len = len(batch.params)
batch_tags_len = len(batch.tags)


if (
merged_batch.run_id != batch.run_id
or (
len(merged_batch.metrics + merged_batch.params + merged_batch.tags)
+ len(batch.metrics + batch.params + batch.tags)
)
>= _MAX_ITEMS_PER_BATCH
or len(merged_batch.params) + len(batch.params) >= _MAX_PARAMS_PER_BATCH
or len(merged_batch.tags) + len(batch.tags) >= _MAX_TAGS_PER_BATCH
merged_metrics_len + merged_params_len + merged_tags_len +
batch_metrics_len + batch_params_len + batch_tags_len
) >= _MAX_ITEMS_PER_BATCH
or merged_params_len + batch_params_len >= _MAX_PARAMS_PER_BATCH
or merged_tags_len + batch_tags_len >= _MAX_TAGS_PER_BATCH
):
# Make a new batch if the run_id is different or the batch is full.
batches.append(merged_batch)
Expand Down