From 9a4b91de21a8f02dd7d5fb93854f53f17a7d76e9 Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Mon, 8 Dec 2025 13:50:24 -0800 Subject: [PATCH 1/6] deletion vector write --- pyiceberg/table/puffin.py | 126 ++++++++++++++++++++++++++++++++++++- tests/table/test_puffin.py | 77 ++++++++++++++++++++++- 2 files changed, 201 insertions(+), 2 deletions(-) diff --git a/pyiceberg/table/puffin.py b/pyiceberg/table/puffin.py index 917d387f45..da0074a954 100644 --- a/pyiceberg/table/puffin.py +++ b/pyiceberg/table/puffin.py @@ -14,8 +14,10 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import io import math -from typing import TYPE_CHECKING, Literal +import zlib +from typing import TYPE_CHECKING, Dict, Iterable, List, Literal, Optional from pydantic import Field from pyroaring import BitMap, FrozenBitMap @@ -27,6 +29,7 @@ # Short for: Puffin Fratercula arctica, version 1 MAGIC_BYTES = b"PFA1" +DELETION_VECTOR_MAGIC = b"\xd1\xd3\x39\x64" EMPTY_BITMAP = FrozenBitMap() MAX_JAVA_SIGNED = int(math.pow(2, 31)) - 1 PROPERTY_REFERENCED_DATA_FILE = "referenced-data-file" @@ -62,6 +65,35 @@ def _deserialize_bitmap(pl: bytes) -> list[BitMap]: return bitmaps +def _serialize_bitmaps(bitmaps: Dict[int, BitMap]) -> bytes: + """ + Serializes a dictionary of bitmaps into a byte array. + + The format is: + - 8 bytes: number of bitmaps (little-endian) + - For each bitmap: + - 4 bytes: key (little-endian) + - n bytes: serialized bitmap + """ + with io.BytesIO() as out: + sorted_keys = sorted(bitmaps.keys()) + + # number of bitmaps + out.write(len(sorted_keys).to_bytes(8, "little")) + + for key in sorted_keys: + if key < 0: + raise ValueError(f"Invalid unsigned key: {key}") + if key > MAX_JAVA_SIGNED: + raise ValueError(f"Key {key} is too large, max {MAX_JAVA_SIGNED} to maintain compatibility with Java impl") + + # key + out.write(key.to_bytes(4, "little")) + # bitmap + out.write(bitmaps[key].serialize()) + return out.getvalue() + + class PuffinBlobMetadata(IcebergBaseModel): type: Literal["deletion-vector-v1"] = Field() fields: list[int] = Field() @@ -114,3 +146,95 @@ def __init__(self, puffin: bytes) -> None: def to_vector(self) -> dict[str, "pa.ChunkedArray"]: return {path: _bitmaps_to_chunked_array(bitmaps) for path, bitmaps in self._deletion_vectors.items()} + + +class PuffinWriter: + _blobs: List[PuffinBlobMetadata] + _blob_payloads: List[bytes] + + def __init__(self) -> None: + self._blobs = [] + self._blob_payloads = [] + + def add( + self, + positions: Iterable[int], + referenced_data_file: str, + ) -> None: + # 1. Create bitmaps from positions + bitmaps: Dict[int, BitMap] = {} + cardinality = 0 + for pos in positions: + cardinality += 1 + key = pos >> 32 + low_bits = pos & 0xFFFFFFFF + if key not in bitmaps: + bitmaps[key] = BitMap() + bitmaps[key].add(low_bits) + + # 2. Serialize bitmaps for the vector payload + vector_payload = _serialize_bitmaps(bitmaps) + + # 3. Construct the full blob payload for deletion-vector-v1 + with io.BytesIO() as blob_payload_buffer: + # Magic bytes for DV + blob_payload_buffer.write(DELETION_VECTOR_MAGIC) + # The vector itself + blob_payload_buffer.write(vector_payload) + + # The content for CRC calculation + crc_content = blob_payload_buffer.getvalue() + crc32 = zlib.crc32(crc_content) + + # The full blob to be stored in the Puffin file + with io.BytesIO() as full_blob_buffer: + # Combined length of the vector and magic bytes stored as 4 bytes, big-endian + full_blob_buffer.write(len(crc_content).to_bytes(4, "big")) + # The content (magic + vector) + full_blob_buffer.write(crc_content) + # A CRC-32 checksum of the magic bytes and serialized vector as 4 bytes, big-endian + full_blob_buffer.write(crc32.to_bytes(4, "big")) + + self._blob_payloads.append(full_blob_buffer.getvalue()) + + # 4. Create blob metadata + properties = {PROPERTY_REFERENCED_DATA_FILE: referenced_data_file, "cardinality": str(cardinality)} + + self._blobs.append( + PuffinBlobMetadata( + type="deletion-vector-v1", + fields=[], + snapshot_id=-1, + sequence_number=-1, + offset=0, # Will be set later + length=0, # Will be set later + properties=properties, + compression_codec=None, # Explicitly None + ) + ) + + def finish(self) -> bytes: + with io.BytesIO() as out: + payload_buffer = io.BytesIO() + for blob_payload in self._blob_payloads: + payload_buffer.write(blob_payload) + + # Set offsets and lengths in metadata + current_offset = 4 # Start after file magic + for i, blob_payload in enumerate(self._blob_payloads): + self._blobs[i].offset = current_offset + self._blobs[i].length = len(blob_payload) + current_offset += len(blob_payload) + + footer = Footer(blobs=self._blobs) + footer_payload_bytes = footer.model_dump_json(by_alias=True, exclude_none=True).encode("utf-8") + + # Final assembly + out.write(MAGIC_BYTES) + out.write(payload_buffer.getvalue()) + out.write(footer_payload_bytes) + out.write(len(footer_payload_bytes).to_bytes(4, "little")) + out.write((0).to_bytes(4, "little")) # flags + out.write(MAGIC_BYTES) + + return out.getvalue() diff --git a/tests/table/test_puffin.py b/tests/table/test_puffin.py index bf8c82014c..0e9c881860 100644 --- a/tests/table/test_puffin.py +++ b/tests/table/test_puffin.py @@ -19,7 +19,7 @@ import pytest from pyroaring import BitMap -from pyiceberg.table.puffin import _deserialize_bitmap +from pyiceberg.table.puffin import _deserialize_bitmap, PuffinFile, PuffinWriter def _open_file(file: str) -> bytes: @@ -71,3 +71,78 @@ def test_map_high_vals() -> None: with pytest.raises(ValueError, match="Key 4022190063 is too large, max 2147483647 to maintain compatibility with Java impl"): _ = _deserialize_bitmap(puffin) + + +def test_puffin_round_trip(): + # Define some deletion positions for multiple files + deletions1 = [10, 20, 30] + deletions2 = [5, (1 << 32) + 1] # Test with a high-bit position + + file1_path = "path/to/data1.parquet" + file2_path = "path/to/data2.parquet" + + # Write the Puffin file + writer = PuffinWriter() + writer.add(positions=deletions1, referenced_data_file=file1_path) + writer.add(positions=deletions2, referenced_data_file=file2_path) + puffin_bytes = writer.finish() + + # Read the Puffin file back + reader = PuffinFile(puffin_bytes) + + # Assert footer metadata + assert len(reader.footer.blobs) == 2 + + blob1_meta = reader.footer.blobs[0] + assert blob1_meta.properties[PuffinFile.PROPERTY_REFERENCED_DATA_FILE] == file1_path + assert blob1_meta.properties["cardinality"] == str(len(deletions1)) + + blob2_meta = reader.footer.blobs[1] + assert blob2_meta.properties[PuffinFile.PROPERTY_REFERENCED_DATA_FILE] == file2_path + assert blob2_meta.properties["cardinality"] == str(len(deletions2)) + + # Assert the content of deletion vectors + read_vectors = reader.to_vector() + + assert file1_path in read_vectors + assert file2_path in read_vectors + + assert read_vectors[file1_path].to_pylist() == sorted(deletions1) + assert read_vectors[file2_path].to_pylist() == sorted(deletions2) + + +def test_write_and_read_puffin_file(): + writer = PuffinWriter() + writer.add(positions=[1, 2, 3], referenced_data_file="file1.parquet") + writer.add(positions=[4, 5, 6], referenced_data_file="file2.parquet") + puffin_bytes = writer.finish() + + reader = PuffinFile(puffin_bytes) + + assert len(reader.footer.blobs) == 2 + blob1 = reader.footer.blobs[0] + blob2 = reader.footer.blobs[1] + + assert blob1.properties["referenced-data-file"] == "file1.parquet" + assert blob1.properties["cardinality"] == "3" + assert blob1.type == "deletion-vector-v1" + assert blob1.snapshot_id == -1 + assert blob1.sequence_number == -1 + assert blob1.compression_codec is None + + assert blob2.properties["referenced-data-file"] == "file2.parquet" + assert blob2.properties["cardinality"] == "3" + + vectors = reader.to_vector() + assert len(vectors) == 2 + assert vectors["file1.parquet"].to_pylist() == [1, 2, 3] + assert vectors["file2.parquet"].to_pylist() == [4, 5, 6] + + +def test_puffin_file_with_no_blobs(): + writer = PuffinWriter() + puffin_bytes = writer.finish() + + reader = PuffinFile(puffin_bytes) + assert len(reader.footer.blobs) == 0 + assert len(reader.to_vector()) == 0 From 4db1734cd4ee7d3c7fcfaa4e61e26cb8f0002c0c Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Mon, 8 Dec 2025 14:10:57 -0800 Subject: [PATCH 2/6] test fix --- pyiceberg/table/puffin.py | 12 +++++++----- tests/table/test_puffin.py | 6 +++--- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/pyiceberg/table/puffin.py b/pyiceberg/table/puffin.py index da0074a954..8a7d4c2215 100644 --- a/pyiceberg/table/puffin.py +++ b/pyiceberg/table/puffin.py @@ -219,14 +219,16 @@ def finish(self) -> bytes: for blob_payload in self._blob_payloads: payload_buffer.write(blob_payload) - # Set offsets and lengths in metadata - current_offset = 4 # Start after file magic + updated_blobs_metadata: List[PuffinBlobMetadata] = [] + current_offset = 4 # Start after file magic (4 bytes) for i, blob_payload in enumerate(self._blob_payloads): - self._blobs[i].offset = current_offset - self._blobs[i].length = len(blob_payload) + original_metadata_dict = self._blobs[i].model_dump(by_alias=True, exclude_none=True) + original_metadata_dict["offset"] = current_offset + original_metadata_dict["length"] = len(blob_payload) + updated_blobs_metadata.append(PuffinBlobMetadata(**original_metadata_dict)) current_offset += len(blob_payload) - footer = Footer(blobs=self._blobs) + footer = Footer(blobs=updated_blobs_metadata) footer_payload_bytes = footer.model_dump_json(by_alias=True, exclude_none=True).encode("utf-8") # Final assembly diff --git a/tests/table/test_puffin.py b/tests/table/test_puffin.py index 0e9c881860..c71afd24af 100644 --- a/tests/table/test_puffin.py +++ b/tests/table/test_puffin.py @@ -19,7 +19,7 @@ import pytest from pyroaring import BitMap -from pyiceberg.table.puffin import _deserialize_bitmap, PuffinFile, PuffinWriter +from pyiceberg.table.puffin import _deserialize_bitmap, PuffinFile, PuffinWriter, PROPERTY_REFERENCED_DATA_FILE def _open_file(file: str) -> bytes: @@ -94,11 +94,11 @@ def test_puffin_round_trip(): assert len(reader.footer.blobs) == 2 blob1_meta = reader.footer.blobs[0] - assert blob1_meta.properties[PuffinFile.PROPERTY_REFERENCED_DATA_FILE] == file1_path + assert blob1_meta.properties[PROPERTY_REFERENCED_DATA_FILE] == file1_path assert blob1_meta.properties["cardinality"] == str(len(deletions1)) blob2_meta = reader.footer.blobs[1] - assert blob2_meta.properties[PuffinFile.PROPERTY_REFERENCED_DATA_FILE] == file2_path + assert blob2_meta.properties[PROPERTY_REFERENCED_DATA_FILE] == file2_path assert blob2_meta.properties["cardinality"] == str(len(deletions2)) # Assert the content of deletion vectors From 71dd92510dee5785a1fe6a9e1eec804806ee6b29 Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Mon, 8 Dec 2025 14:17:09 -0800 Subject: [PATCH 3/6] lint fixes --- pyiceberg/table/puffin.py | 15 ++++++++------- tests/table/test_puffin.py | 14 +++++++------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/pyiceberg/table/puffin.py b/pyiceberg/table/puffin.py index 8a7d4c2215..c54173e01a 100644 --- a/pyiceberg/table/puffin.py +++ b/pyiceberg/table/puffin.py @@ -17,7 +17,8 @@ import io import math import zlib -from typing import TYPE_CHECKING, Dict, Iterable, List, Literal, Optional +from collections.abc import Iterable +from typing import TYPE_CHECKING, Literal from pydantic import Field from pyroaring import BitMap, FrozenBitMap @@ -65,9 +66,9 @@ def _deserialize_bitmap(pl: bytes) -> list[BitMap]: return bitmaps -def _serialize_bitmaps(bitmaps: Dict[int, BitMap]) -> bytes: +def _serialize_bitmaps(bitmaps: dict[int, BitMap]) -> bytes: """ - Serializes a dictionary of bitmaps into a byte array. + Serialize a dictionary of bitmaps into a byte array. The format is: - 8 bytes: number of bitmaps (little-endian) @@ -149,8 +150,8 @@ def to_vector(self) -> dict[str, "pa.ChunkedArray"]: class PuffinWriter: - _blobs: List[PuffinBlobMetadata] - _blob_payloads: List[bytes] + _blobs: list[PuffinBlobMetadata] + _blob_payloads: list[bytes] def __init__(self) -> None: self._blobs = [] @@ -162,7 +163,7 @@ def add( referenced_data_file: str, ) -> None: # 1. Create bitmaps from positions - bitmaps: Dict[int, BitMap] = {} + bitmaps: dict[int, BitMap] = {} cardinality = 0 for pos in positions: cardinality += 1 @@ -219,7 +220,7 @@ def finish(self) -> bytes: for blob_payload in self._blob_payloads: payload_buffer.write(blob_payload) - updated_blobs_metadata: List[PuffinBlobMetadata] = [] + updated_blobs_metadata: list[PuffinBlobMetadata] = [] current_offset = 4 # Start after file magic (4 bytes) for i, blob_payload in enumerate(self._blob_payloads): original_metadata_dict = self._blobs[i].model_dump(by_alias=True, exclude_none=True) diff --git a/tests/table/test_puffin.py b/tests/table/test_puffin.py index c71afd24af..403b2e038f 100644 --- a/tests/table/test_puffin.py +++ b/tests/table/test_puffin.py @@ -19,7 +19,7 @@ import pytest from pyroaring import BitMap -from pyiceberg.table.puffin import _deserialize_bitmap, PuffinFile, PuffinWriter, PROPERTY_REFERENCED_DATA_FILE +from pyiceberg.table.puffin import PROPERTY_REFERENCED_DATA_FILE, PuffinFile, PuffinWriter, _deserialize_bitmap def _open_file(file: str) -> bytes: @@ -73,10 +73,10 @@ def test_map_high_vals() -> None: _ = _deserialize_bitmap(puffin) -def test_puffin_round_trip(): +def test_puffin_round_trip() -> None: # Define some deletion positions for multiple files deletions1 = [10, 20, 30] - deletions2 = [5, (1 << 32) + 1] # Test with a high-bit position + deletions2 = [5, (1 << 32) + 1] # Test with a high-bit position file1_path = "path/to/data1.parquet" file2_path = "path/to/data2.parquet" @@ -92,7 +92,7 @@ def test_puffin_round_trip(): # Assert footer metadata assert len(reader.footer.blobs) == 2 - + blob1_meta = reader.footer.blobs[0] assert blob1_meta.properties[PROPERTY_REFERENCED_DATA_FILE] == file1_path assert blob1_meta.properties["cardinality"] == str(len(deletions1)) @@ -103,7 +103,7 @@ def test_puffin_round_trip(): # Assert the content of deletion vectors read_vectors = reader.to_vector() - + assert file1_path in read_vectors assert file2_path in read_vectors @@ -111,7 +111,7 @@ def test_puffin_round_trip(): assert read_vectors[file2_path].to_pylist() == sorted(deletions2) -def test_write_and_read_puffin_file(): +def test_write_and_read_puffin_file() -> None: writer = PuffinWriter() writer.add(positions=[1, 2, 3], referenced_data_file="file1.parquet") writer.add(positions=[4, 5, 6], referenced_data_file="file2.parquet") @@ -139,7 +139,7 @@ def test_write_and_read_puffin_file(): assert vectors["file2.parquet"].to_pylist() == [4, 5, 6] -def test_puffin_file_with_no_blobs(): +def test_puffin_file_with_no_blobs() -> None: writer = PuffinWriter() puffin_bytes = writer.finish() From 3efd28ed1958ae30cb149bd067a97d3074f04c54 Mon Sep 17 00:00:00 2001 From: Gabriel Lesperance Date: Wed, 10 Dec 2025 21:13:10 -0500 Subject: [PATCH 4/6] test: Add Spark interop test for Puffin DV reader Verify pyiceberg's PuffinFile reader can parse deletion vectors written by Spark. Uses coalesce(1) to force Spark to create DVs instead of COW. --- .../integration/test_puffin_spark_interop.py | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 tests/integration/test_puffin_spark_interop.py diff --git a/tests/integration/test_puffin_spark_interop.py b/tests/integration/test_puffin_spark_interop.py new file mode 100644 index 0000000000..be19276cd4 --- /dev/null +++ b/tests/integration/test_puffin_spark_interop.py @@ -0,0 +1,93 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import pytest +from pyspark.sql import SparkSession + +from pyiceberg.catalog.rest import RestCatalog +from pyiceberg.manifest import ManifestContent +from pyiceberg.table.puffin import PuffinFile + + +def run_spark_commands(spark: SparkSession, sqls: list[str]) -> None: + for sql in sqls: + spark.sql(sql) + + +@pytest.mark.integration +def test_read_spark_written_puffin_dv(spark: SparkSession, session_catalog: RestCatalog) -> None: + """Verify pyiceberg can read Puffin DVs written by Spark.""" + identifier = "default.spark_puffin_format_test" + + run_spark_commands(spark, [f"DROP TABLE IF EXISTS {identifier}"]) + run_spark_commands( + spark, + [ + f""" + CREATE TABLE {identifier} (id BIGINT) + USING iceberg + TBLPROPERTIES ( + 'format-version' = '3', + 'write.delete.mode' = 'merge-on-read' + ) + """, + ], + ) + + df = spark.range(1, 51) + df.coalesce(1).writeTo(identifier).append() + + files_before = spark.sql(f"SELECT * FROM {identifier}.files").collect() + assert len(files_before) == 1, f"Expected 1 file, got {len(files_before)}" + + run_spark_commands(spark, [f"DELETE FROM {identifier} WHERE id IN (10, 20, 30, 40)"]) + + table = session_catalog.load_table(identifier) + current_snapshot = table.current_snapshot() + assert current_snapshot is not None + + manifests = current_snapshot.manifests(table.io) + delete_manifests = [m for m in manifests if m.content == ManifestContent.DELETES] + assert len(delete_manifests) > 0, "Expected delete manifest with DVs" + + delete_manifest = delete_manifests[0] + entries = list(delete_manifest.fetch_manifest_entry(table.io)) + assert len(entries) > 0, "Expected at least one delete file entry" + + delete_entry = entries[0] + puffin_path = delete_entry.data_file.file_path + assert puffin_path.endswith(".puffin"), f"Expected Puffin file, got: {puffin_path}" + + input_file = table.io.new_input(puffin_path) + with input_file.open() as f: + puffin_bytes = f.read() + + puffin = PuffinFile(puffin_bytes) + + assert len(puffin.footer.blobs) == 1, "Expected exactly one blob" + + blob = puffin.footer.blobs[0] + assert blob.type == "deletion-vector-v1" + assert "referenced-data-file" in blob.properties + assert blob.properties["cardinality"] == "4" + + dv_dict = puffin.to_vector() + assert len(dv_dict) == 1, "Expected one data file's deletions" + + for data_file_path, chunked_array in dv_dict.items(): + positions = chunked_array.to_pylist() + assert len(positions) == 4, f"Expected 4 deleted positions, got {len(positions)}" + assert sorted(positions) == [9, 19, 29, 39], f"Unexpected positions: {positions}" From 228263c37c46a5637ae45f17679859c379bcaf5c Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Thu, 11 Dec 2025 16:43:05 -0800 Subject: [PATCH 5/6] PR comments --- pyiceberg/table/puffin.py | 28 +++++++++++------ tests/table/test_puffin.py | 62 +++++++++++++++----------------------- 2 files changed, 44 insertions(+), 46 deletions(-) diff --git a/pyiceberg/table/puffin.py b/pyiceberg/table/puffin.py index c54173e01a..18516b33fc 100644 --- a/pyiceberg/table/puffin.py +++ b/pyiceberg/table/puffin.py @@ -152,27 +152,34 @@ def to_vector(self) -> dict[str, "pa.ChunkedArray"]: class PuffinWriter: _blobs: list[PuffinBlobMetadata] _blob_payloads: list[bytes] + _created_by: str | None - def __init__(self) -> None: + def __init__(self, created_by: str | None = None) -> None: self._blobs = [] self._blob_payloads = [] + self._created_by = created_by - def add( + def set_blob( self, positions: Iterable[int], referenced_data_file: str, ) -> None: + # We only support one blob at the moment + self._blobs = [] + self._blob_payloads = [] + # 1. Create bitmaps from positions bitmaps: dict[int, BitMap] = {} - cardinality = 0 for pos in positions: - cardinality += 1 key = pos >> 32 low_bits = pos & 0xFFFFFFFF if key not in bitmaps: bitmaps[key] = BitMap() bitmaps[key].add(low_bits) + # Calculate the cardinality from the bitmaps + cardinality = sum(len(bm) for bm in bitmaps.values()) + # 2. Serialize bitmaps for the vector payload vector_payload = _serialize_bitmaps(bitmaps) @@ -204,13 +211,13 @@ def add( self._blobs.append( PuffinBlobMetadata( type="deletion-vector-v1", - fields=[], + fields=[2147483645], # Java INT_MAX - 2, reserved field id for deletion vectors snapshot_id=-1, sequence_number=-1, - offset=0, # Will be set later - length=0, # Will be set later + offset=0, # TODO: Use DeleteFileIndex data + length=0, # TODO: Use DeleteFileIndex data properties=properties, - compression_codec=None, # Explicitly None + compression_codec=None, ) ) @@ -229,12 +236,15 @@ def finish(self) -> bytes: updated_blobs_metadata.append(PuffinBlobMetadata(**original_metadata_dict)) current_offset += len(blob_payload) - footer = Footer(blobs=updated_blobs_metadata) + footer = Footer( + blobs=updated_blobs_metadata, properties={"created-by": self._created_by} if self._created_by else {} + ) footer_payload_bytes = footer.model_dump_json(by_alias=True, exclude_none=True).encode("utf-8") # Final assembly out.write(MAGIC_BYTES) out.write(payload_buffer.getvalue()) + out.write(MAGIC_BYTES) out.write(footer_payload_bytes) out.write(len(footer_payload_bytes).to_bytes(4, "little")) out.write((0).to_bytes(4, "little")) # flags diff --git a/tests/table/test_puffin.py b/tests/table/test_puffin.py index 403b2e038f..c39a9da0fd 100644 --- a/tests/table/test_puffin.py +++ b/tests/table/test_puffin.py @@ -74,68 +74,55 @@ def test_map_high_vals() -> None: def test_puffin_round_trip() -> None: - # Define some deletion positions for multiple files - deletions1 = [10, 20, 30] - deletions2 = [5, (1 << 32) + 1] # Test with a high-bit position + # Define some deletion positions for a file + deletions = [5, (1 << 32) + 1, 5] # Test with a high-bit position and duplicate - file1_path = "path/to/data1.parquet" - file2_path = "path/to/data2.parquet" + file_path = "path/to/data.parquet" # Write the Puffin file - writer = PuffinWriter() - writer.add(positions=deletions1, referenced_data_file=file1_path) - writer.add(positions=deletions2, referenced_data_file=file2_path) + writer = PuffinWriter(created_by="my-test-app") + writer.set_blob(positions=deletions, referenced_data_file=file_path) puffin_bytes = writer.finish() # Read the Puffin file back reader = PuffinFile(puffin_bytes) # Assert footer metadata - assert len(reader.footer.blobs) == 2 - - blob1_meta = reader.footer.blobs[0] - assert blob1_meta.properties[PROPERTY_REFERENCED_DATA_FILE] == file1_path - assert blob1_meta.properties["cardinality"] == str(len(deletions1)) + assert reader.footer.properties["created-by"] == "my-test-app" + assert len(reader.footer.blobs) == 1 - blob2_meta = reader.footer.blobs[1] - assert blob2_meta.properties[PROPERTY_REFERENCED_DATA_FILE] == file2_path - assert blob2_meta.properties["cardinality"] == str(len(deletions2)) + blob_meta = reader.footer.blobs[0] + assert blob_meta.properties[PROPERTY_REFERENCED_DATA_FILE] == file_path + assert blob_meta.properties["cardinality"] == str(len(set(deletions))) # Assert the content of deletion vectors read_vectors = reader.to_vector() - assert file1_path in read_vectors - assert file2_path in read_vectors - - assert read_vectors[file1_path].to_pylist() == sorted(deletions1) - assert read_vectors[file2_path].to_pylist() == sorted(deletions2) + assert file_path in read_vectors + assert read_vectors[file_path].to_pylist() == sorted(list(set(deletions))) def test_write_and_read_puffin_file() -> None: writer = PuffinWriter() - writer.add(positions=[1, 2, 3], referenced_data_file="file1.parquet") - writer.add(positions=[4, 5, 6], referenced_data_file="file2.parquet") + writer.set_blob(positions=[1, 2, 3], referenced_data_file="file1.parquet") + writer.set_blob(positions=[4, 5, 6], referenced_data_file="file2.parquet") puffin_bytes = writer.finish() reader = PuffinFile(puffin_bytes) - assert len(reader.footer.blobs) == 2 - blob1 = reader.footer.blobs[0] - blob2 = reader.footer.blobs[1] - - assert blob1.properties["referenced-data-file"] == "file1.parquet" - assert blob1.properties["cardinality"] == "3" - assert blob1.type == "deletion-vector-v1" - assert blob1.snapshot_id == -1 - assert blob1.sequence_number == -1 - assert blob1.compression_codec is None + assert len(reader.footer.blobs) == 1 + blob = reader.footer.blobs[0] - assert blob2.properties["referenced-data-file"] == "file2.parquet" - assert blob2.properties["cardinality"] == "3" + assert blob.properties["referenced-data-file"] == "file2.parquet" + assert blob.properties["cardinality"] == "3" + assert blob.type == "deletion-vector-v1" + assert blob.snapshot_id == -1 + assert blob.sequence_number == -1 + assert blob.compression_codec is None vectors = reader.to_vector() - assert len(vectors) == 2 - assert vectors["file1.parquet"].to_pylist() == [1, 2, 3] + assert len(vectors) == 1 + assert "file1.parquet" not in vectors assert vectors["file2.parquet"].to_pylist() == [4, 5, 6] @@ -146,3 +133,4 @@ def test_puffin_file_with_no_blobs() -> None: reader = PuffinFile(puffin_bytes) assert len(reader.footer.blobs) == 0 assert len(reader.to_vector()) == 0 + assert "created-by" not in reader.footer.properties From 36bb37f5e8bda092b8dfc6d2ce62fce08aa9b9be Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Thu, 11 Dec 2025 16:46:01 -0800 Subject: [PATCH 6/6] lint --- pyiceberg/table/puffin.py | 10 ++++------ tests/integration/test_puffin_spark_interop.py | 2 +- tests/table/test_puffin.py | 2 +- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/pyiceberg/table/puffin.py b/pyiceberg/table/puffin.py index 18516b33fc..8acf21f974 100644 --- a/pyiceberg/table/puffin.py +++ b/pyiceberg/table/puffin.py @@ -211,11 +211,11 @@ def set_blob( self._blobs.append( PuffinBlobMetadata( type="deletion-vector-v1", - fields=[2147483645], # Java INT_MAX - 2, reserved field id for deletion vectors + fields=[2147483645], # Java INT_MAX - 2, reserved field id for deletion vectors snapshot_id=-1, sequence_number=-1, - offset=0, # TODO: Use DeleteFileIndex data - length=0, # TODO: Use DeleteFileIndex data + offset=0, # TODO: Use DeleteFileIndex data + length=0, # TODO: Use DeleteFileIndex data properties=properties, compression_codec=None, ) @@ -236,9 +236,7 @@ def finish(self) -> bytes: updated_blobs_metadata.append(PuffinBlobMetadata(**original_metadata_dict)) current_offset += len(blob_payload) - footer = Footer( - blobs=updated_blobs_metadata, properties={"created-by": self._created_by} if self._created_by else {} - ) + footer = Footer(blobs=updated_blobs_metadata, properties={"created-by": self._created_by} if self._created_by else {}) footer_payload_bytes = footer.model_dump_json(by_alias=True, exclude_none=True).encode("utf-8") # Final assembly diff --git a/tests/integration/test_puffin_spark_interop.py b/tests/integration/test_puffin_spark_interop.py index be19276cd4..d4c6735fca 100644 --- a/tests/integration/test_puffin_spark_interop.py +++ b/tests/integration/test_puffin_spark_interop.py @@ -87,7 +87,7 @@ def test_read_spark_written_puffin_dv(spark: SparkSession, session_catalog: Rest dv_dict = puffin.to_vector() assert len(dv_dict) == 1, "Expected one data file's deletions" - for data_file_path, chunked_array in dv_dict.items(): + for _data_file_path, chunked_array in dv_dict.items(): positions = chunked_array.to_pylist() assert len(positions) == 4, f"Expected 4 deleted positions, got {len(positions)}" assert sorted(positions) == [9, 19, 29, 39], f"Unexpected positions: {positions}" diff --git a/tests/table/test_puffin.py b/tests/table/test_puffin.py index c39a9da0fd..1ea0913e29 100644 --- a/tests/table/test_puffin.py +++ b/tests/table/test_puffin.py @@ -99,7 +99,7 @@ def test_puffin_round_trip() -> None: read_vectors = reader.to_vector() assert file_path in read_vectors - assert read_vectors[file_path].to_pylist() == sorted(list(set(deletions))) + assert read_vectors[file_path].to_pylist() == sorted(set(deletions)) def test_write_and_read_puffin_file() -> None: