From ccd22ec0f06b22df77ed13eb20015872df5099ea Mon Sep 17 00:00:00 2001 From: Lukasz Drozdz Date: Tue, 13 Jan 2026 17:28:21 +0100 Subject: [PATCH 1/4] add message chunking to sdk --- netboxlabs/diode/sdk/__init__.py | 6 + netboxlabs/diode/sdk/chunking.py | 111 +++++++++++++++++ tests/test_chunking.py | 200 +++++++++++++++++++++++++++++++ 3 files changed, 317 insertions(+) create mode 100644 netboxlabs/diode/sdk/chunking.py create mode 100644 tests/test_chunking.py diff --git a/netboxlabs/diode/sdk/__init__.py b/netboxlabs/diode/sdk/__init__.py index ac8d2ff..58798fc 100644 --- a/netboxlabs/diode/sdk/__init__.py +++ b/netboxlabs/diode/sdk/__init__.py @@ -2,6 +2,10 @@ # Copyright 2024 NetBox Labs Inc """NetBox Labs, Diode - SDK.""" +from netboxlabs.diode.sdk.chunking import ( + create_message_chunks, + estimate_message_size, +) from netboxlabs.diode.sdk.client import ( DiodeClient, DiodeDryRunClient, @@ -9,6 +13,8 @@ load_dryrun_entities, ) +assert create_message_chunks +assert estimate_message_size assert DiodeClient assert DiodeDryRunClient assert DiodeOTLPClient diff --git a/netboxlabs/diode/sdk/chunking.py b/netboxlabs/diode/sdk/chunking.py new file mode 100644 index 0000000..4d89813 --- /dev/null +++ b/netboxlabs/diode/sdk/chunking.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python +# Copyright 2024 NetBox Labs Inc +"""Message chunking utilities for Diode SDK. + +This module provides utilities for chunking large lists of entities into +size-appropriate chunks for gRPC ingestion, ensuring no chunk exceeds +the gRPC message size limit. +""" + +from typing import Iterable + +from .diode.v1 import ingester_pb2 + + +def create_message_chunks( + entities: Iterable[ingester_pb2.Entity], max_chunk_size_mb: float = 3.0 +) -> list[list[ingester_pb2.Entity]]: + """Create size-aware chunks from entities using greedy bin-packing. + + This function chunks entities to ensure each chunk stays under the specified + size limit. It uses a greedy bin-packing algorithm that accumulates entities + until adding the next entity would exceed the limit, then starts a new chunk. + + The default chunk size of 3.0 MB provides a safe margin below the gRPC 4 MB + message size limit, accounting for protobuf serialization overhead. + + Args: + entities: Iterable of Entity protobuf messages to chunk + max_chunk_size_mb: Maximum chunk size in MB (default 3.0) + + Returns: + List of entity chunks, each under max_chunk_size_mb. Returns at least + one chunk even if the input is empty. + + Examples: + >>> entities = [entity1, entity2, entity3, ...] + >>> chunks = create_message_chunks(entities) + >>> for chunk in chunks: + ... client.ingest(chunk) + + >>> # Use a custom chunk size + >>> chunks = create_message_chunks(entities, max_chunk_size_mb=3.5) + """ + # Convert iterable to list if necessary for size estimation + if not isinstance(entities, list): + entities = list(entities) + + if not entities: + return [entities] + + # Convert MB to bytes + max_chunk_size_bytes = int(max_chunk_size_mb * 1024 * 1024) + + # Quick check: if all entities fit in one chunk, return early + total_size = estimate_message_size(entities) + if total_size <= max_chunk_size_bytes: + return [entities] + + # Greedy bin-packing: accumulate entities until limit reached + base_overhead = ingester_pb2.IngestRequest().ByteSize() + chunks = [] + current_chunk: list[ingester_pb2.Entity] = [] + current_chunk_size = base_overhead # Start with overhead for the chunk + + for entity in entities: + entity_size = entity.ByteSize() + projected_size = current_chunk_size + entity_size + + # Check if adding this entity would exceed limit + if current_chunk and projected_size > max_chunk_size_bytes: + # Finalize current chunk and start new one + chunks.append(current_chunk) + current_chunk = [entity] + current_chunk_size = base_overhead + entity_size + else: + # Add entity to current chunk + current_chunk.append(entity) + current_chunk_size = projected_size + + # Add final chunk if not empty + if current_chunk: + chunks.append(current_chunk) + + return chunks if chunks else [entities] + + +def estimate_message_size(entities: Iterable[ingester_pb2.Entity]) -> int: + """Estimate the serialized size of entities in bytes. + + Calculates the total size by summing individual entity sizes plus the + IngestRequest protobuf overhead. + + Args: + entities: Iterable of Entity protobuf messages + + Returns: + Estimated size in bytes including IngestRequest overhead + + Examples: + >>> entities = [entity1, entity2, entity3] + >>> size_bytes = estimate_message_size(entities) + >>> size_mb = size_bytes / (1024 * 1024) + >>> print(f"Estimated size: {size_mb:.2f} MB") + """ + # Convert iterable to list if necessary + if not isinstance(entities, list): + entities = list(entities) + + base_overhead = ingester_pb2.IngestRequest().ByteSize() + entity_sizes_sum = sum(entity.ByteSize() for entity in entities) + return base_overhead + entity_sizes_sum diff --git a/tests/test_chunking.py b/tests/test_chunking.py new file mode 100644 index 0000000..6688271 --- /dev/null +++ b/tests/test_chunking.py @@ -0,0 +1,200 @@ +#!/usr/bin/env python +# Copyright 2024 NetBox Labs Inc +"""Tests for message chunking utilities.""" + +from unittest.mock import patch + +from netboxlabs.diode.sdk.chunking import create_message_chunks, estimate_message_size +from netboxlabs.diode.sdk.diode.v1 import ingester_pb2 + + +def test_create_message_chunks_empty_list(): + """Test create_message_chunks with an empty entity list.""" + entities = [] + chunks = create_message_chunks(entities) + + assert len(chunks) == 1 + assert chunks[0] == [] + + +def test_create_message_chunks_single_chunk(): + """Test create_message_chunks when entities fit in a single chunk.""" + # Create small mock entities that will fit in one chunk + entities = [] + for i in range(5): + entity = ingester_pb2.Entity() + entity.device.name = f"test_device_{i}" + entities.append(entity) + + # Mock size to be small (under 3 MB default) + with patch("netboxlabs.diode.sdk.chunking.estimate_message_size", return_value=1024): + chunks = create_message_chunks(entities) + + assert len(chunks) == 1 + assert len(chunks[0]) == 5 + assert chunks[0] == entities + + +def test_create_message_chunks_multiple_chunks(): + """Test create_message_chunks when entities need to be split into multiple chunks.""" + # Create entities that will exceed the target size + entities = [] + for i in range(10): + entity = ingester_pb2.Entity() + entity.device.name = f"test_device_{i}" + entities.append(entity) + + # Mock size to be larger than target (3MB default) + with patch("netboxlabs.diode.sdk.chunking.estimate_message_size", return_value=5 * 1024 * 1024): + # Also need to mock ByteSize for individual entities and base overhead + with patch.object(ingester_pb2.Entity, "ByteSize", return_value=600000): # ~600KB each + with patch.object(ingester_pb2.IngestRequest, "ByteSize", return_value=100): + chunks = create_message_chunks(entities) + + # Should have multiple chunks + assert len(chunks) > 1 + + # All entities should be present across chunks + total_entities = sum(len(chunk) for chunk in chunks) + assert total_entities == 10 + + # Each chunk should have at least 1 entity + for chunk in chunks: + assert len(chunk) >= 1 + + +def test_create_message_chunks_one_entity_per_chunk(): + """Test create_message_chunks when each entity needs its own chunk.""" + entities = [] + for i in range(3): + entity = ingester_pb2.Entity() + entity.device.name = f"large_device_{i}" + entities.append(entity) + + # Mock very large size to force one entity per chunk + # Each entity is 3.5 MB, forcing one per chunk with 3 MB limit + with patch("netboxlabs.diode.sdk.chunking.estimate_message_size", return_value=20 * 1024 * 1024): + with patch.object(ingester_pb2.Entity, "ByteSize", return_value=3 * 1024 * 1024 + 500000): + with patch.object(ingester_pb2.IngestRequest, "ByteSize", return_value=100): + chunks = create_message_chunks(entities) + + # Should have 3 chunks with 1 entity each + assert len(chunks) == 3 + for chunk in chunks: + assert len(chunk) == 1 + + +def test_estimate_message_size(): + """Test estimate_message_size method.""" + # Create mock entities + entities = [] + for i in range(3): + entity = ingester_pb2.Entity() + entity.device.name = f"test_device_{i}" + entities.append(entity) + + # Call the function + size = estimate_message_size(entities) + + # Should return a positive integer + assert isinstance(size, int) + assert size > 0 + + +def test_estimate_message_size_empty_list(): + """Test estimate_message_size with an empty entity list.""" + entities = [] + + size = estimate_message_size(entities) + + # Should return base overhead (positive value for protobuf header) + assert isinstance(size, int) + assert size >= 0 + + +def test_create_message_chunks_custom_chunk_size(): + """Test create_message_chunks with a custom chunk size.""" + # Create entities + entities = [] + for i in range(10): + entity = ingester_pb2.Entity() + entity.device.name = f"test_device_{i}" + entities.append(entity) + + # Use 3.5 MB chunk size (like orb-discovery) + # Mock size estimation to return 5 MB (exceeds 3.5 MB limit) + with patch("netboxlabs.diode.sdk.chunking.estimate_message_size", return_value=5 * 1024 * 1024): + with patch.object(ingester_pb2.Entity, "ByteSize", return_value=600000): # ~600KB each + with patch.object(ingester_pb2.IngestRequest, "ByteSize", return_value=100): + chunks = create_message_chunks(entities, max_chunk_size_mb=3.5) + + # Should have multiple chunks due to size limit + assert len(chunks) > 1 + + # All entities should be present + total_entities = sum(len(chunk) for chunk in chunks) + assert total_entities == 10 + + +def test_create_message_chunks_preserves_order(): + """Test that create_message_chunks preserves entity order.""" + # Create entities with identifiable names + entities = [] + for i in range(20): + entity = ingester_pb2.Entity() + entity.device.name = f"device_{i:03d}" + entities.append(entity) + + # Mock to force multiple chunks + with patch("netboxlabs.diode.sdk.chunking.estimate_message_size", return_value=10 * 1024 * 1024): + with patch.object(ingester_pb2.Entity, "ByteSize", return_value=600000): + with patch.object(ingester_pb2.IngestRequest, "ByteSize", return_value=100): + chunks = create_message_chunks(entities) + + # Flatten chunks and verify order + flattened = [] + for chunk in chunks: + flattened.extend(chunk) + + assert len(flattened) == 20 + for i, entity in enumerate(flattened): + assert entity.device.name == f"device_{i:03d}" + + +def test_create_message_chunks_with_iterable(): + """Test create_message_chunks with a generator/iterator input.""" + # Create generator + def entity_generator(): + for i in range(5): + entity = ingester_pb2.Entity() + entity.device.name = f"test_device_{i}" + yield entity + + # Should work with generator (converted to list internally) + chunks = create_message_chunks(entity_generator()) + + assert len(chunks) >= 1 + total_entities = sum(len(chunk) for chunk in chunks) + assert total_entities == 5 + + +def test_create_message_chunks_single_large_entity(): + """Test create_message_chunks with a single entity that exceeds chunk size. + + This edge case verifies the function doesn't fail when a single entity + is larger than the chunk size limit. + """ + entity = ingester_pb2.Entity() + entity.device.name = "huge_device" + entities = [entity] + + # Mock a very large entity (5 MB) that exceeds 3 MB limit + with patch("netboxlabs.diode.sdk.chunking.estimate_message_size", return_value=5 * 1024 * 1024): + with patch.object(ingester_pb2.Entity, "ByteSize", return_value=5 * 1024 * 1024): + with patch.object(ingester_pb2.IngestRequest, "ByteSize", return_value=100): + chunks = create_message_chunks(entities) + + # Should still return one chunk with the single entity + assert len(chunks) == 1 + assert len(chunks[0]) == 1 + assert chunks[0][0].device.name == "huge_device" From 0b208e290365163f3e06a7cbe4df9558ada3e858 Mon Sep 17 00:00:00 2001 From: Lukasz Drozdz Date: Wed, 14 Jan 2026 13:09:05 +0100 Subject: [PATCH 2/4] add readme doc --- README.md | 72 ++++++++++++++++++++++++++++++++ netboxlabs/diode/sdk/chunking.py | 2 +- tests/test_chunking.py | 2 +- 3 files changed, 74 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 0df50d2..40423b7 100644 --- a/README.md +++ b/README.md @@ -281,6 +281,78 @@ client = DiodeClient( ) ``` +### Message chunking + +When ingesting large numbers of entities, you may need to split them into smaller chunks to avoid exceeding the gRPC message size limit for a single `ingest()` call. The SDK provides chunking utilities that automatically split entity lists into appropriately sized chunks. + +#### How it works + +The SDK uses a **greedy bin-packing algorithm** that: +1. Accumulates entities until adding the next entity would exceed the size limit +2. Starts a new chunk when the limit would be exceeded +3. Ensures each chunk stays safely under the configured limit (default: 3 MB) + +#### Basic usage + +```python +from netboxlabs.diode.sdk import DiodeClient, create_message_chunks +from netboxlabs.diode.sdk.ingester import Device, Entity + +with DiodeClient( + target="grpc://localhost:8080/diode", + app_name="my-app", + app_version="1.0.0", +) as client: + # Create a large list of entities + entities = [] + for i in range(10000): + device = Device( + name=f"Device {i}", + device_type="Device Type A", + site="Site ABC", + role="Role ABC", + ) + entities.append(Entity(device=device)) + + # Split into chunks (default 3 MB per chunk), then ingest each chunk separately. + for chunk in create_message_chunks(entities): + client.ingest(entities=chunk) +``` + +#### Custom chunk size + +You can customize the chunk size if needed: + +```python +from netboxlabs.diode.sdk import create_message_chunks + +# Use a larger chunk size (3.5 MB) +chunks = create_message_chunks(entities, max_chunk_size_mb=3.5) + +# Use a smaller chunk size for conservative chunking (2 MB) +chunks = create_message_chunks(entities, max_chunk_size_mb=2.0) +``` + +#### Estimating message size + +You can estimate the serialized size of entities before chunking: + +```python +from netboxlabs.diode.sdk import estimate_message_size + +size_bytes = estimate_message_size(entities) +size_mb = size_bytes / (1024 * 1024) +print(f"Total size: {size_mb:.2f} MB") + +# Decide whether chunking is needed +if size_mb > 3.0: + chunks = create_message_chunks(entities) +else: + # Small enough to send in one request + client.ingest(entities=entities) +``` + + ### Dry run mode `DiodeDryRunClient` generates ingestion requests without contacting a Diode server. Requests are printed to stdout by default, or written to JSON files when `output_dir` (or the `DIODE_DRY_RUN_OUTPUT_DIR` environment variable) is specified. The `app_name` parameter serves as the filename prefix; if not provided, `dryrun` is used as the default prefix. The file name is suffixed with a nanosecond-precision timestamp, resulting in the format `_.json`. diff --git a/netboxlabs/diode/sdk/chunking.py b/netboxlabs/diode/sdk/chunking.py index 4d89813..4f47ad3 100644 --- a/netboxlabs/diode/sdk/chunking.py +++ b/netboxlabs/diode/sdk/chunking.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -# Copyright 2024 NetBox Labs Inc +# Copyright 2026 NetBox Labs Inc """Message chunking utilities for Diode SDK. This module provides utilities for chunking large lists of entities into diff --git a/tests/test_chunking.py b/tests/test_chunking.py index 6688271..519994c 100644 --- a/tests/test_chunking.py +++ b/tests/test_chunking.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -# Copyright 2024 NetBox Labs Inc +# Copyright 2026 NetBox Labs Inc """Tests for message chunking utilities.""" from unittest.mock import patch From 06bea68a5e8b294d1e40935f49399c6b885dfd37 Mon Sep 17 00:00:00 2001 From: Lukasz Drozdz Date: Thu, 15 Jan 2026 09:06:21 +0100 Subject: [PATCH 3/4] fix lint --- netboxlabs/diode/sdk/chunking.py | 13 +++++++++---- netboxlabs/diode/sdk/client.py | 2 +- tests/test_chunking.py | 3 ++- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/netboxlabs/diode/sdk/chunking.py b/netboxlabs/diode/sdk/chunking.py index 4f47ad3..9580ddc 100644 --- a/netboxlabs/diode/sdk/chunking.py +++ b/netboxlabs/diode/sdk/chunking.py @@ -1,13 +1,14 @@ #!/usr/bin/env python # Copyright 2026 NetBox Labs Inc -"""Message chunking utilities for Diode SDK. +""" +Message chunking utilities for Diode SDK. This module provides utilities for chunking large lists of entities into size-appropriate chunks for gRPC ingestion, ensuring no chunk exceeds the gRPC message size limit. """ -from typing import Iterable +from collections.abc import Iterable from .diode.v1 import ingester_pb2 @@ -15,7 +16,8 @@ def create_message_chunks( entities: Iterable[ingester_pb2.Entity], max_chunk_size_mb: float = 3.0 ) -> list[list[ingester_pb2.Entity]]: - """Create size-aware chunks from entities using greedy bin-packing. + """ + Create size-aware chunks from entities using greedy bin-packing. This function chunks entities to ensure each chunk stays under the specified size limit. It uses a greedy bin-packing algorithm that accumulates entities @@ -40,6 +42,7 @@ def create_message_chunks( >>> # Use a custom chunk size >>> chunks = create_message_chunks(entities, max_chunk_size_mb=3.5) + """ # Convert iterable to list if necessary for size estimation if not isinstance(entities, list): @@ -85,7 +88,8 @@ def create_message_chunks( def estimate_message_size(entities: Iterable[ingester_pb2.Entity]) -> int: - """Estimate the serialized size of entities in bytes. + """ + Estimate the serialized size of entities in bytes. Calculates the total size by summing individual entity sizes plus the IngestRequest protobuf overhead. @@ -101,6 +105,7 @@ def estimate_message_size(entities: Iterable[ingester_pb2.Entity]) -> int: >>> size_bytes = estimate_message_size(entities) >>> size_mb = size_bytes / (1024 * 1024) >>> print(f"Estimated size: {size_mb:.2f} MB") + """ # Convert iterable to list if necessary if not isinstance(entities, list): diff --git a/netboxlabs/diode/sdk/client.py b/netboxlabs/diode/sdk/client.py index e6c8cf5..9ece94d 100644 --- a/netboxlabs/diode/sdk/client.py +++ b/netboxlabs/diode/sdk/client.py @@ -668,7 +668,7 @@ def __init__( options=channel_opts, ) else: - _LOGGER.debug(f"Setting up gRPC insecure channel") + _LOGGER.debug("Setting up gRPC insecure channel") base_channel = grpc.insecure_channel( target=self._target, options=channel_opts, diff --git a/tests/test_chunking.py b/tests/test_chunking.py index 519994c..c440a70 100644 --- a/tests/test_chunking.py +++ b/tests/test_chunking.py @@ -179,7 +179,8 @@ def entity_generator(): def test_create_message_chunks_single_large_entity(): - """Test create_message_chunks with a single entity that exceeds chunk size. + """ + Test create_message_chunks with a single entity that exceeds chunk size. This edge case verifies the function doesn't fail when a single entity is larger than the chunk size limit. From 802cd16ceffbdfce56e1ce131cfb6d97108853d8 Mon Sep 17 00:00:00 2001 From: Lukasz Drozdz Date: Thu, 15 Jan 2026 09:23:10 +0100 Subject: [PATCH 4/4] clarify docs --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 40423b7..a82f135 100644 --- a/README.md +++ b/README.md @@ -346,7 +346,8 @@ print(f"Total size: {size_mb:.2f} MB") # Decide whether chunking is needed if size_mb > 3.0: - chunks = create_message_chunks(entities) + for chunk in create_message_chunks(entities): + client.ingest(entities=chunk) else: # Small enough to send in one request client.ingest(entities=entities)