diff --git a/README.md b/README.md index 0df50d2..a82f135 100644 --- a/README.md +++ b/README.md @@ -281,6 +281,79 @@ client = DiodeClient( ) ``` +### Message chunking + +When ingesting large numbers of entities, you may need to split them into smaller chunks to avoid exceeding the gRPC message size limit for a single `ingest()` call. The SDK provides chunking utilities that automatically split entity lists into appropriately sized chunks. + +#### How it works + +The SDK uses a **greedy bin-packing algorithm** that: +1. Accumulates entities until adding the next entity would exceed the size limit +2. Starts a new chunk when the limit would be exceeded +3. Ensures each chunk stays safely under the configured limit (default: 3 MB) + +#### Basic usage + +```python +from netboxlabs.diode.sdk import DiodeClient, create_message_chunks +from netboxlabs.diode.sdk.ingester import Device, Entity + +with DiodeClient( + target="grpc://localhost:8080/diode", + app_name="my-app", + app_version="1.0.0", +) as client: + # Create a large list of entities + entities = [] + for i in range(10000): + device = Device( + name=f"Device {i}", + device_type="Device Type A", + site="Site ABC", + role="Role ABC", + ) + entities.append(Entity(device=device)) + + # Split into chunks (default 3 MB per chunk), then ingest each chunk separately. + for chunk in create_message_chunks(entities): + client.ingest(entities=chunk) +``` + +#### Custom chunk size + +You can customize the chunk size if needed: + +```python +from netboxlabs.diode.sdk import create_message_chunks + +# Use a larger chunk size (3.5 MB) +chunks = create_message_chunks(entities, max_chunk_size_mb=3.5) + +# Use a smaller chunk size for conservative chunking (2 MB) +chunks = create_message_chunks(entities, max_chunk_size_mb=2.0) +``` + +#### Estimating message size + +You can estimate the serialized size of entities before chunking: + +```python +from netboxlabs.diode.sdk import estimate_message_size + +size_bytes = estimate_message_size(entities) +size_mb = size_bytes / (1024 * 1024) +print(f"Total size: {size_mb:.2f} MB") + +# Decide whether chunking is needed +if size_mb > 3.0: + for chunk in create_message_chunks(entities): + client.ingest(entities=chunk) +else: + # Small enough to send in one request + client.ingest(entities=entities) +``` + + ### Dry run mode `DiodeDryRunClient` generates ingestion requests without contacting a Diode server. Requests are printed to stdout by default, or written to JSON files when `output_dir` (or the `DIODE_DRY_RUN_OUTPUT_DIR` environment variable) is specified. The `app_name` parameter serves as the filename prefix; if not provided, `dryrun` is used as the default prefix. The file name is suffixed with a nanosecond-precision timestamp, resulting in the format `_.json`. diff --git a/netboxlabs/diode/sdk/__init__.py b/netboxlabs/diode/sdk/__init__.py index ac8d2ff..58798fc 100644 --- a/netboxlabs/diode/sdk/__init__.py +++ b/netboxlabs/diode/sdk/__init__.py @@ -2,6 +2,10 @@ # Copyright 2024 NetBox Labs Inc """NetBox Labs, Diode - SDK.""" +from netboxlabs.diode.sdk.chunking import ( + create_message_chunks, + estimate_message_size, +) from netboxlabs.diode.sdk.client import ( DiodeClient, DiodeDryRunClient, @@ -9,6 +13,8 @@ load_dryrun_entities, ) +assert create_message_chunks +assert estimate_message_size assert DiodeClient assert DiodeDryRunClient assert DiodeOTLPClient diff --git a/netboxlabs/diode/sdk/chunking.py b/netboxlabs/diode/sdk/chunking.py new file mode 100644 index 0000000..9580ddc --- /dev/null +++ b/netboxlabs/diode/sdk/chunking.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python +# Copyright 2026 NetBox Labs Inc +""" +Message chunking utilities for Diode SDK. + +This module provides utilities for chunking large lists of entities into +size-appropriate chunks for gRPC ingestion, ensuring no chunk exceeds +the gRPC message size limit. +""" + +from collections.abc import Iterable + +from .diode.v1 import ingester_pb2 + + +def create_message_chunks( + entities: Iterable[ingester_pb2.Entity], max_chunk_size_mb: float = 3.0 +) -> list[list[ingester_pb2.Entity]]: + """ + Create size-aware chunks from entities using greedy bin-packing. + + This function chunks entities to ensure each chunk stays under the specified + size limit. It uses a greedy bin-packing algorithm that accumulates entities + until adding the next entity would exceed the limit, then starts a new chunk. + + The default chunk size of 3.0 MB provides a safe margin below the gRPC 4 MB + message size limit, accounting for protobuf serialization overhead. + + Args: + entities: Iterable of Entity protobuf messages to chunk + max_chunk_size_mb: Maximum chunk size in MB (default 3.0) + + Returns: + List of entity chunks, each under max_chunk_size_mb. Returns at least + one chunk even if the input is empty. + + Examples: + >>> entities = [entity1, entity2, entity3, ...] + >>> chunks = create_message_chunks(entities) + >>> for chunk in chunks: + ... client.ingest(chunk) + + >>> # Use a custom chunk size + >>> chunks = create_message_chunks(entities, max_chunk_size_mb=3.5) + + """ + # Convert iterable to list if necessary for size estimation + if not isinstance(entities, list): + entities = list(entities) + + if not entities: + return [entities] + + # Convert MB to bytes + max_chunk_size_bytes = int(max_chunk_size_mb * 1024 * 1024) + + # Quick check: if all entities fit in one chunk, return early + total_size = estimate_message_size(entities) + if total_size <= max_chunk_size_bytes: + return [entities] + + # Greedy bin-packing: accumulate entities until limit reached + base_overhead = ingester_pb2.IngestRequest().ByteSize() + chunks = [] + current_chunk: list[ingester_pb2.Entity] = [] + current_chunk_size = base_overhead # Start with overhead for the chunk + + for entity in entities: + entity_size = entity.ByteSize() + projected_size = current_chunk_size + entity_size + + # Check if adding this entity would exceed limit + if current_chunk and projected_size > max_chunk_size_bytes: + # Finalize current chunk and start new one + chunks.append(current_chunk) + current_chunk = [entity] + current_chunk_size = base_overhead + entity_size + else: + # Add entity to current chunk + current_chunk.append(entity) + current_chunk_size = projected_size + + # Add final chunk if not empty + if current_chunk: + chunks.append(current_chunk) + + return chunks if chunks else [entities] + + +def estimate_message_size(entities: Iterable[ingester_pb2.Entity]) -> int: + """ + Estimate the serialized size of entities in bytes. + + Calculates the total size by summing individual entity sizes plus the + IngestRequest protobuf overhead. + + Args: + entities: Iterable of Entity protobuf messages + + Returns: + Estimated size in bytes including IngestRequest overhead + + Examples: + >>> entities = [entity1, entity2, entity3] + >>> size_bytes = estimate_message_size(entities) + >>> size_mb = size_bytes / (1024 * 1024) + >>> print(f"Estimated size: {size_mb:.2f} MB") + + """ + # Convert iterable to list if necessary + if not isinstance(entities, list): + entities = list(entities) + + base_overhead = ingester_pb2.IngestRequest().ByteSize() + entity_sizes_sum = sum(entity.ByteSize() for entity in entities) + return base_overhead + entity_sizes_sum diff --git a/netboxlabs/diode/sdk/client.py b/netboxlabs/diode/sdk/client.py index e6c8cf5..9ece94d 100644 --- a/netboxlabs/diode/sdk/client.py +++ b/netboxlabs/diode/sdk/client.py @@ -668,7 +668,7 @@ def __init__( options=channel_opts, ) else: - _LOGGER.debug(f"Setting up gRPC insecure channel") + _LOGGER.debug("Setting up gRPC insecure channel") base_channel = grpc.insecure_channel( target=self._target, options=channel_opts, diff --git a/tests/test_chunking.py b/tests/test_chunking.py new file mode 100644 index 0000000..c440a70 --- /dev/null +++ b/tests/test_chunking.py @@ -0,0 +1,201 @@ +#!/usr/bin/env python +# Copyright 2026 NetBox Labs Inc +"""Tests for message chunking utilities.""" + +from unittest.mock import patch + +from netboxlabs.diode.sdk.chunking import create_message_chunks, estimate_message_size +from netboxlabs.diode.sdk.diode.v1 import ingester_pb2 + + +def test_create_message_chunks_empty_list(): + """Test create_message_chunks with an empty entity list.""" + entities = [] + chunks = create_message_chunks(entities) + + assert len(chunks) == 1 + assert chunks[0] == [] + + +def test_create_message_chunks_single_chunk(): + """Test create_message_chunks when entities fit in a single chunk.""" + # Create small mock entities that will fit in one chunk + entities = [] + for i in range(5): + entity = ingester_pb2.Entity() + entity.device.name = f"test_device_{i}" + entities.append(entity) + + # Mock size to be small (under 3 MB default) + with patch("netboxlabs.diode.sdk.chunking.estimate_message_size", return_value=1024): + chunks = create_message_chunks(entities) + + assert len(chunks) == 1 + assert len(chunks[0]) == 5 + assert chunks[0] == entities + + +def test_create_message_chunks_multiple_chunks(): + """Test create_message_chunks when entities need to be split into multiple chunks.""" + # Create entities that will exceed the target size + entities = [] + for i in range(10): + entity = ingester_pb2.Entity() + entity.device.name = f"test_device_{i}" + entities.append(entity) + + # Mock size to be larger than target (3MB default) + with patch("netboxlabs.diode.sdk.chunking.estimate_message_size", return_value=5 * 1024 * 1024): + # Also need to mock ByteSize for individual entities and base overhead + with patch.object(ingester_pb2.Entity, "ByteSize", return_value=600000): # ~600KB each + with patch.object(ingester_pb2.IngestRequest, "ByteSize", return_value=100): + chunks = create_message_chunks(entities) + + # Should have multiple chunks + assert len(chunks) > 1 + + # All entities should be present across chunks + total_entities = sum(len(chunk) for chunk in chunks) + assert total_entities == 10 + + # Each chunk should have at least 1 entity + for chunk in chunks: + assert len(chunk) >= 1 + + +def test_create_message_chunks_one_entity_per_chunk(): + """Test create_message_chunks when each entity needs its own chunk.""" + entities = [] + for i in range(3): + entity = ingester_pb2.Entity() + entity.device.name = f"large_device_{i}" + entities.append(entity) + + # Mock very large size to force one entity per chunk + # Each entity is 3.5 MB, forcing one per chunk with 3 MB limit + with patch("netboxlabs.diode.sdk.chunking.estimate_message_size", return_value=20 * 1024 * 1024): + with patch.object(ingester_pb2.Entity, "ByteSize", return_value=3 * 1024 * 1024 + 500000): + with patch.object(ingester_pb2.IngestRequest, "ByteSize", return_value=100): + chunks = create_message_chunks(entities) + + # Should have 3 chunks with 1 entity each + assert len(chunks) == 3 + for chunk in chunks: + assert len(chunk) == 1 + + +def test_estimate_message_size(): + """Test estimate_message_size method.""" + # Create mock entities + entities = [] + for i in range(3): + entity = ingester_pb2.Entity() + entity.device.name = f"test_device_{i}" + entities.append(entity) + + # Call the function + size = estimate_message_size(entities) + + # Should return a positive integer + assert isinstance(size, int) + assert size > 0 + + +def test_estimate_message_size_empty_list(): + """Test estimate_message_size with an empty entity list.""" + entities = [] + + size = estimate_message_size(entities) + + # Should return base overhead (positive value for protobuf header) + assert isinstance(size, int) + assert size >= 0 + + +def test_create_message_chunks_custom_chunk_size(): + """Test create_message_chunks with a custom chunk size.""" + # Create entities + entities = [] + for i in range(10): + entity = ingester_pb2.Entity() + entity.device.name = f"test_device_{i}" + entities.append(entity) + + # Use 3.5 MB chunk size (like orb-discovery) + # Mock size estimation to return 5 MB (exceeds 3.5 MB limit) + with patch("netboxlabs.diode.sdk.chunking.estimate_message_size", return_value=5 * 1024 * 1024): + with patch.object(ingester_pb2.Entity, "ByteSize", return_value=600000): # ~600KB each + with patch.object(ingester_pb2.IngestRequest, "ByteSize", return_value=100): + chunks = create_message_chunks(entities, max_chunk_size_mb=3.5) + + # Should have multiple chunks due to size limit + assert len(chunks) > 1 + + # All entities should be present + total_entities = sum(len(chunk) for chunk in chunks) + assert total_entities == 10 + + +def test_create_message_chunks_preserves_order(): + """Test that create_message_chunks preserves entity order.""" + # Create entities with identifiable names + entities = [] + for i in range(20): + entity = ingester_pb2.Entity() + entity.device.name = f"device_{i:03d}" + entities.append(entity) + + # Mock to force multiple chunks + with patch("netboxlabs.diode.sdk.chunking.estimate_message_size", return_value=10 * 1024 * 1024): + with patch.object(ingester_pb2.Entity, "ByteSize", return_value=600000): + with patch.object(ingester_pb2.IngestRequest, "ByteSize", return_value=100): + chunks = create_message_chunks(entities) + + # Flatten chunks and verify order + flattened = [] + for chunk in chunks: + flattened.extend(chunk) + + assert len(flattened) == 20 + for i, entity in enumerate(flattened): + assert entity.device.name == f"device_{i:03d}" + + +def test_create_message_chunks_with_iterable(): + """Test create_message_chunks with a generator/iterator input.""" + # Create generator + def entity_generator(): + for i in range(5): + entity = ingester_pb2.Entity() + entity.device.name = f"test_device_{i}" + yield entity + + # Should work with generator (converted to list internally) + chunks = create_message_chunks(entity_generator()) + + assert len(chunks) >= 1 + total_entities = sum(len(chunk) for chunk in chunks) + assert total_entities == 5 + + +def test_create_message_chunks_single_large_entity(): + """ + Test create_message_chunks with a single entity that exceeds chunk size. + + This edge case verifies the function doesn't fail when a single entity + is larger than the chunk size limit. + """ + entity = ingester_pb2.Entity() + entity.device.name = "huge_device" + entities = [entity] + + # Mock a very large entity (5 MB) that exceeds 3 MB limit + with patch("netboxlabs.diode.sdk.chunking.estimate_message_size", return_value=5 * 1024 * 1024): + with patch.object(ingester_pb2.Entity, "ByteSize", return_value=5 * 1024 * 1024): + with patch.object(ingester_pb2.IngestRequest, "ByteSize", return_value=100): + chunks = create_message_chunks(entities) + + # Should still return one chunk with the single entity + assert len(chunks) == 1 + assert len(chunks[0]) == 1 + assert chunks[0][0].device.name == "huge_device"