From ad0d51c2bc6e9f4de1d15cd11168c5ef586d4480 Mon Sep 17 00:00:00 2001 From: geruh Date: Tue, 23 Dec 2025 16:11:27 -0800 Subject: [PATCH 1/2] feat: Add models for rest scan planning --- pyiceberg/catalog/rest/scan_planning.py | 208 +++++++++ tests/catalog/test_scan_planning_models.py | 464 +++++++++++++++++++++ 2 files changed, 672 insertions(+) create mode 100644 pyiceberg/catalog/rest/scan_planning.py create mode 100644 tests/catalog/test_scan_planning_models.py diff --git a/pyiceberg/catalog/rest/scan_planning.py b/pyiceberg/catalog/rest/scan_planning.py new file mode 100644 index 0000000000..ddccf1d9e3 --- /dev/null +++ b/pyiceberg/catalog/rest/scan_planning.py @@ -0,0 +1,208 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import date, datetime, time +from decimal import Decimal +from typing import Annotated, Generic, Literal, TypeAlias, TypeVar +from uuid import UUID + +from pydantic import Field, model_validator + +from pyiceberg.catalog.rest.response import ErrorResponseMessage +from pyiceberg.expressions import BooleanExpression +from pyiceberg.manifest import FileFormat +from pyiceberg.typedef import IcebergBaseModel + +# Primitive types that can appear in partition values and bounds +PrimitiveTypeValue: TypeAlias = bool | int | float | str | Decimal | UUID | date | time | datetime | bytes + +V = TypeVar("V") + + +class KeyValueMap(IcebergBaseModel, Generic[V]): + """Map serialized as parallel key/value arrays for column statistics.""" + + keys: list[int] = Field(default_factory=list) + values: list[V] = Field(default_factory=list) + + @model_validator(mode="after") + def _validate_lengths_match(self) -> KeyValueMap[V]: + if len(self.keys) != len(self.values): + raise ValueError(f"keys and values must have same length: {len(self.keys)} != {len(self.values)}") + return self + + def to_dict(self) -> dict[int, V]: + """Convert to dictionary mapping field ID to value.""" + return dict(zip(self.keys, self.values, strict=True)) + + +class CountMap(KeyValueMap[int]): + """Map of field IDs to counts.""" + + +class ValueMap(KeyValueMap[PrimitiveTypeValue]): + """Map of field IDs to primitive values (for lower/upper bounds).""" + + +class StorageCredential(IcebergBaseModel): + """Storage credential for accessing content files.""" + + prefix: str = Field(description="Storage location prefix this credential applies to") + config: dict[str, str] = Field(default_factory=dict) + + +class RESTContentFile(IcebergBaseModel): + """Base model for data and delete files from REST API.""" + + spec_id: int = Field(alias="spec-id") + partition: list[PrimitiveTypeValue] = Field(default_factory=list) + content: Literal["data", "position-deletes", "equality-deletes"] + file_path: str = Field(alias="file-path") + file_format: FileFormat = Field(alias="file-format") + file_size_in_bytes: int = Field(alias="file-size-in-bytes") + record_count: int = Field(alias="record-count") + key_metadata: str | None = Field(alias="key-metadata", default=None) + split_offsets: list[int] | None = Field(alias="split-offsets", default=None) + sort_order_id: int | None = Field(alias="sort-order-id", default=None) + + +class RESTDataFile(RESTContentFile): + """Data file from REST API.""" + + content: Literal["data"] = Field(default="data") + first_row_id: int | None = Field(alias="first-row-id", default=None) + column_sizes: CountMap | None = Field(alias="column-sizes", default=None) + value_counts: CountMap | None = Field(alias="value-counts", default=None) + null_value_counts: CountMap | None = Field(alias="null-value-counts", default=None) + nan_value_counts: CountMap | None = Field(alias="nan-value-counts", default=None) + lower_bounds: ValueMap | None = Field(alias="lower-bounds", default=None) + upper_bounds: ValueMap | None = Field(alias="upper-bounds", default=None) + + +class RESTPositionDeleteFile(RESTContentFile): + """Position delete file from REST API.""" + + content: Literal["position-deletes"] = Field(default="position-deletes") + referenced_data_file: str | None = Field(alias="referenced-data-file", default=None) + content_offset: int | None = Field(alias="content-offset", default=None) + content_size_in_bytes: int | None = Field(alias="content-size-in-bytes", default=None) + + +class RESTEqualityDeleteFile(RESTContentFile): + """Equality delete file from REST API.""" + + content: Literal["equality-deletes"] = Field(default="equality-deletes") + equality_ids: list[int] | None = Field(alias="equality-ids", default=None) + + +# Discriminated union for delete files +RESTDeleteFile = Annotated[ + RESTPositionDeleteFile | RESTEqualityDeleteFile, + Field(discriminator="content"), +] + + +class RESTFileScanTask(IcebergBaseModel): + """A file scan task from the REST server.""" + + data_file: RESTDataFile = Field(alias="data-file") + delete_file_references: list[int] | None = Field(alias="delete-file-references", default=None) + residual_filter: BooleanExpression | None = Field(alias="residual-filter", default=None) + + +class ScanTasks(IcebergBaseModel): + """Container for scan tasks returned by the server.""" + + delete_files: list[RESTDeleteFile] = Field(alias="delete-files", default_factory=list) + file_scan_tasks: list[RESTFileScanTask] = Field(alias="file-scan-tasks", default_factory=list) + plan_tasks: list[str] = Field(alias="plan-tasks", default_factory=list) + + @model_validator(mode="after") + def _validate_delete_file_references(self) -> ScanTasks: + # validate delete file references are in bounds + max_idx = len(self.delete_files) - 1 + for task in self.file_scan_tasks: + for idx in task.delete_file_references or []: + if idx < 0 or idx > max_idx: + raise ValueError(f"Invalid delete file reference: {idx} (valid range: 0-{max_idx})") + + if self.delete_files and not self.file_scan_tasks: + raise ValueError("Invalid response: deleteFiles should only be returned with fileScanTasks that reference them") + + return self + + +class PlanCompleted(ScanTasks): + """Completed scan plan result.""" + + status: Literal["completed"] = "completed" + plan_id: str | None = Field(alias="plan-id", default=None) + storage_credentials: list[StorageCredential] | None = Field(alias="storage-credentials", default=None) + + +class PlanSubmitted(IcebergBaseModel): + """Scan plan submitted, poll for completion.""" + + status: Literal["submitted"] = "submitted" + plan_id: str | None = Field(alias="plan-id", default=None) + + +class PlanCancelled(IcebergBaseModel): + """Planning was cancelled.""" + + status: Literal["cancelled"] = "cancelled" + + +class PlanFailed(IcebergBaseModel): + """Planning failed with error.""" + + status: Literal["failed"] = "failed" + error: ErrorResponseMessage + + +PlanningResponse = Annotated[ + PlanCompleted | PlanSubmitted | PlanCancelled | PlanFailed, + Field(discriminator="status"), +] + + +class PlanTableScanRequest(IcebergBaseModel): + """Request body for planning a REST scan.""" + + snapshot_id: int | None = Field(alias="snapshot-id", default=None) + select: list[str] | None = Field(default=None) + filter: BooleanExpression | None = Field(default=None) + case_sensitive: bool = Field(alias="case-sensitive", default=True) + use_snapshot_schema: bool = Field(alias="use-snapshot-schema", default=False) + start_snapshot_id: int | None = Field(alias="start-snapshot-id", default=None) + end_snapshot_id: int | None = Field(alias="end-snapshot-id", default=None) + stats_fields: list[str] | None = Field(alias="stats-fields", default=None) + + @model_validator(mode="after") + def _validate_snapshot_fields(self) -> PlanTableScanRequest: + if self.start_snapshot_id is not None and self.end_snapshot_id is None: + raise ValueError("end-snapshot-id is required when start-snapshot-id is specified") + if self.snapshot_id is not None and self.start_snapshot_id is not None: + raise ValueError("Cannot specify both snapshot-id and start-snapshot-id") + return self + + +class FetchScanTasksRequest(IcebergBaseModel): + """Request body for fetching scan tasks endpoint.""" + + plan_task: str = Field(alias="plan-task") diff --git a/tests/catalog/test_scan_planning_models.py b/tests/catalog/test_scan_planning_models.py new file mode 100644 index 0000000000..3f0dd6978c --- /dev/null +++ b/tests/catalog/test_scan_planning_models.py @@ -0,0 +1,464 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from typing import Any + +import pytest +from pydantic import TypeAdapter, ValidationError + +from pyiceberg.catalog.rest.scan_planning import ( + CountMap, + FetchScanTasksRequest, + PlanCancelled, + PlanCompleted, + PlanningResponse, + PlanSubmitted, + PlanTableScanRequest, + RESTDataFile, + RESTDeleteFile, + RESTEqualityDeleteFile, + RESTFileScanTask, + RESTPositionDeleteFile, + ScanTasks, + StorageCredential, + ValueMap, +) +from pyiceberg.expressions import AlwaysTrue, EqualTo, Reference +from pyiceberg.manifest import FileFormat + + +def test_count_map_valid() -> None: + cm = CountMap(keys=[1, 2, 3], values=[100, 200, 300]) + assert cm.to_dict() == {1: 100, 2: 200, 3: 300} + + +def test_count_map_empty() -> None: + cm = CountMap() + assert cm.to_dict() == {} + + +def test_count_map_length_mismatch() -> None: + with pytest.raises(ValidationError) as exc_info: + CountMap(keys=[1, 2, 3], values=[100, 200]) + assert "must have same length" in str(exc_info.value) + + +def test_value_map_mixed_types() -> None: + vm = ValueMap(keys=[1, 2, 3], values=[True, 42, "val"]) + assert vm.to_dict() == {1: True, 2: 42, 3: "val"} + + +def test_data_file_parsing() -> None: + data = { + "spec-id": 0, + "content": "data", + "file-path": "s3://bucket/table/file.parquet", + "file-format": "parquet", + "file-size-in-bytes": 1024, + "record-count": 100, + } + df = RESTDataFile.model_validate(data) + assert df.content == "data" + assert df.file_path == "s3://bucket/table/file.parquet" + assert df.file_format == FileFormat.PARQUET + assert df.file_size_in_bytes == 1024 + + +def test_data_file_with_stats() -> None: + data = { + "spec-id": 0, + "content": "data", + "file-path": "s3://bucket/table/file.parquet", + "file-format": "parquet", + "file-size-in-bytes": 1024, + "record-count": 100, + "column-sizes": {"keys": [1, 2], "values": [500, 524]}, + "value-counts": {"keys": [1, 2], "values": [100, 100]}, + } + df = RESTDataFile.model_validate(data) + assert df.column_sizes is not None + assert df.column_sizes.to_dict() == {1: 500, 2: 524} + + +def test_position_delete_file() -> None: + data = { + "spec-id": 0, + "content": "position-deletes", + "file-path": "s3://bucket/table/delete.parquet", + "file-format": "parquet", + "file-size-in-bytes": 512, + "record-count": 10, + "content-offset": 100, + "content-size-in-bytes": 200, + } + pdf = RESTPositionDeleteFile.model_validate(data) + assert pdf.content == "position-deletes" + assert pdf.content_offset == 100 + assert pdf.content_size_in_bytes == 200 + + +def test_position_delete_requires_size_with_offset() -> None: + data = { + "spec-id": 0, + "content": "position-deletes", + "file-path": "s3://bucket/table/delete.parquet", + "file-format": "parquet", + "file-size-in-bytes": 512, + "record-count": 10, + "content-offset": 100, + } + with pytest.raises(ValidationError) as exc_info: + RESTPositionDeleteFile.model_validate(data) + assert "content-size-in-bytes is required" in str(exc_info.value) + + +def test_equality_delete_file() -> None: + data = { + "spec-id": 0, + "content": "equality-deletes", + "file-path": "s3://bucket/table/eq-delete.parquet", + "file-format": "parquet", + "file-size-in-bytes": 256, + "record-count": 5, + "equality-ids": [1, 2], + } + edf = RESTEqualityDeleteFile.model_validate(data) + assert edf.content == "equality-deletes" + assert edf.equality_ids == [1, 2] + + +def test_file_format_case_insensitive() -> None: + for fmt in ["parquet", "PARQUET", "Parquet"]: + data = { + "spec-id": 0, + "content": "data", + "file-path": "/path", + "file-format": fmt, + "file-size-in-bytes": 100, + "record-count": 10, + } + df = RESTDataFile.model_validate(data) + assert df.file_format == FileFormat.PARQUET + + +@pytest.mark.parametrize( + "format_str,expected", + [ + ("parquet", FileFormat.PARQUET), + ("avro", FileFormat.AVRO), + ("orc", FileFormat.ORC), + ], +) +def test_file_formats(format_str: str, expected: FileFormat) -> None: + data = { + "spec-id": 0, + "content": "data", + "file-path": f"s3://bucket/table/path/file.{format_str}", + "file-format": format_str, + "file-size-in-bytes": 1024, + "record-count": 100, + } + df = RESTDataFile.model_validate(data) + assert df.file_format == expected + + +def test_delete_file_discriminator_position() -> None: + data = { + "spec-id": 0, + "content": "position-deletes", + "file-path": "s3://bucket/table/delete.parquet", + "file-format": "parquet", + "file-size-in-bytes": 256, + "record-count": 5, + } + result = TypeAdapter(RESTDeleteFile).validate_python(data) + assert isinstance(result, RESTPositionDeleteFile) + + +def test_delete_file_discriminator_equality() -> None: + data = { + "spec-id": 0, + "content": "equality-deletes", + "file-path": "s3://bucket/table/delete.parquet", + "file-format": "parquet", + "file-size-in-bytes": 256, + "record-count": 5, + "equality-ids": [1], + } + result = TypeAdapter(RESTDeleteFile).validate_python(data) + assert isinstance(result, RESTEqualityDeleteFile) + + +def test_basic_scan_task() -> None: + data = { + "data-file": { + "spec-id": 0, + "content": "data", + "file-path": "s3://bucket/table/file.parquet", + "file-format": "parquet", + "file-size-in-bytes": 1024, + "record-count": 100, + } + } + task = RESTFileScanTask.model_validate(data) + assert task.data_file.file_path == "s3://bucket/table/file.parquet" + assert task.delete_file_references is None + assert task.residual_filter is None + + +def test_scan_task_with_delete_references() -> None: + data = { + "data-file": { + "spec-id": 0, + "content": "data", + "file-path": "s3://bucket/table/file.parquet", + "file-format": "parquet", + "file-size-in-bytes": 1024, + "record-count": 100, + }, + "delete-file-references": [0, 1, 2], + } + task = RESTFileScanTask.model_validate(data) + assert task.delete_file_references == [0, 1, 2] + + +def test_scan_task_with_residual_filter_true() -> None: + data = { + "data-file": { + "spec-id": 0, + "content": "data", + "file-path": "s3://bucket/table/file.parquet", + "file-format": "parquet", + "file-size-in-bytes": 1024, + "record-count": 100, + }, + "residual-filter": True, + } + task = RESTFileScanTask.model_validate(data) + assert isinstance(task.residual_filter, AlwaysTrue) + + +def test_empty_scan_tasks() -> None: + data: dict[str, Any] = { + "delete-files": [], + "file-scan-tasks": [], + "plan-tasks": [], + } + scan_tasks = ScanTasks.model_validate(data) + assert len(scan_tasks.file_scan_tasks) == 0 + assert len(scan_tasks.delete_files) == 0 + assert len(scan_tasks.plan_tasks) == 0 + + +def test_scan_tasks_with_files() -> None: + data = { + "delete-files": [ + { + "spec-id": 0, + "content": "position-deletes", + "file-path": "s3://bucket/table/delete.parquet", + "file-format": "parquet", + "file-size-in-bytes": 256, + "record-count": 5, + } + ], + "file-scan-tasks": [ + { + "data-file": { + "spec-id": 0, + "content": "data", + "file-path": "s3://bucket/table/data.parquet", + "file-format": "parquet", + "file-size-in-bytes": 1024, + "record-count": 100, + }, + "delete-file-references": [0], + } + ], + "plan-tasks": ["token-1", "token-2"], + } + scan_tasks = ScanTasks.model_validate(data) + assert len(scan_tasks.delete_files) == 1 + assert len(scan_tasks.file_scan_tasks) == 1 + assert len(scan_tasks.plan_tasks) == 2 + + +def test_invalid_delete_file_reference() -> None: + data = { + "delete-files": [], + "file-scan-tasks": [ + { + "data-file": { + "spec-id": 0, + "content": "data", + "file-path": "s3://bucket/table/data.parquet", + "file-format": "parquet", + "file-size-in-bytes": 1024, + "record-count": 100, + }, + "delete-file-references": [0], + } + ], + "plan-tasks": [], + } + with pytest.raises(ValidationError) as exc_info: + ScanTasks.model_validate(data) + assert "Invalid delete file reference" in str(exc_info.value) + + +def test_delete_files_require_file_scan_tasks() -> None: + data = { + "delete-files": [ + { + "spec-id": 0, + "content": "position-deletes", + "file-path": "s3://bucket/table/delete.parquet", + "file-format": "parquet", + "file-size-in-bytes": 256, + "record-count": 5, + } + ], + "file-scan-tasks": [], + "plan-tasks": [], + } + with pytest.raises(ValidationError) as exc_info: + ScanTasks.model_validate(data) + assert "deleteFiles should only be returned with fileScanTasks" in str(exc_info.value) + +def test_minimal_request() -> None: + request = PlanTableScanRequest() + dumped = request.model_dump(by_alias=True, exclude_none=True) + assert dumped == {"case-sensitive": True, "use-snapshot-schema": False} + + +def test_request_with_snapshot_id() -> None: + request = PlanTableScanRequest(snapshot_id=12345) + dumped = request.model_dump(by_alias=True, exclude_none=True) + assert dumped["snapshot-id"] == 12345 + + +def test_request_with_select_and_filter() -> None: + request = PlanTableScanRequest( + select=["id", "name"], + filter=EqualTo(Reference("id"), 42), + ) + dumped = request.model_dump(by_alias=True, exclude_none=True) + assert dumped["select"] == ["id", "name"] + assert "filter" in dumped + + +def test_incremental_scan_request() -> None: + request = PlanTableScanRequest( + start_snapshot_id=100, + end_snapshot_id=200, + ) + dumped = request.model_dump(by_alias=True, exclude_none=True) + assert dumped["start-snapshot-id"] == 100 + assert dumped["end-snapshot-id"] == 200 + + +def test_start_snapshot_requires_end_snapshot() -> None: + with pytest.raises(ValidationError) as exc_info: + PlanTableScanRequest(start_snapshot_id=100) + assert "end-snapshot-id is required" in str(exc_info.value) + + +def test_snapshot_id_conflicts_with_start_snapshot() -> None: + with pytest.raises(ValidationError) as exc_info: + PlanTableScanRequest(snapshot_id=50, start_snapshot_id=100, end_snapshot_id=200) + assert "Cannot specify both" in str(exc_info.value) + + +def test_fetch_scan_tasks_request() -> None: + request = FetchScanTasksRequest(plan_task="token-abc-123") + dumped = request.model_dump(by_alias=True) + assert dumped == {"plan-task": "token-abc-123"} + + +def test_completed_response() -> None: + data = { + "status": "completed", + "plan-id": "plan-123", + "delete-files": [], + "file-scan-tasks": [], + "plan-tasks": [], + } + result = TypeAdapter(PlanningResponse).validate_python(data) + assert isinstance(result, PlanCompleted) + assert result.plan_id == "plan-123" + + +def test_completed_response_without_plan_id() -> None: + data = { + "status": "completed", + "delete-files": [], + "file-scan-tasks": [], + "plan-tasks": [], + } + result = TypeAdapter(PlanningResponse).validate_python(data) + assert isinstance(result, PlanCompleted) + assert result.plan_id is None + + +def test_completed_response_with_credentials() -> None: + data = { + "status": "completed", + "delete-files": [], + "file-scan-tasks": [], + "plan-tasks": [], + "storage-credentials": [ + {"prefix": "s3://bucket/", "config": {}}, + ], + } + result = TypeAdapter(PlanningResponse).validate_python(data) + assert isinstance(result, PlanCompleted) + assert result.storage_credentials is not None + assert len(result.storage_credentials) == 1 + + +def test_submitted_response() -> None: + data = { + "status": "submitted", + "plan-id": "drus-plan", + } + result = TypeAdapter(PlanningResponse).validate_python(data) + assert isinstance(result, PlanSubmitted) + assert result.plan_id == "drus-plan" + + +def test_submitted_response_without_plan_id() -> None: + data = {"status": "submitted"} + result = TypeAdapter(PlanningResponse).validate_python(data) + assert isinstance(result, PlanSubmitted) + + +def test_cancelled_response() -> None: + data = {"status": "cancelled"} + result = TypeAdapter(PlanningResponse).validate_python(data) + assert isinstance(result, PlanCancelled) + + +def test_storage_credential_parsing() -> None: + data = { + "prefix": "s3://bucket/path/", + "config": { + "s3.access-key-id": "key", + "s3.secret-access-key": "secret", + }, + } + cred = StorageCredential.model_validate(data) + assert cred.prefix == "s3://bucket/path/" + assert cred.config["s3.access-key-id"] == "key" From e5242cf7e34e85e0348b62f9076a9370aebb42e1 Mon Sep 17 00:00:00 2001 From: geruh Date: Tue, 23 Dec 2025 18:09:00 -0800 Subject: [PATCH 2/2] fix lint and remove test --- tests/catalog/test_scan_planning_models.py | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/tests/catalog/test_scan_planning_models.py b/tests/catalog/test_scan_planning_models.py index 3f0dd6978c..9f03c8f7cd 100644 --- a/tests/catalog/test_scan_planning_models.py +++ b/tests/catalog/test_scan_planning_models.py @@ -110,21 +110,6 @@ def test_position_delete_file() -> None: assert pdf.content_size_in_bytes == 200 -def test_position_delete_requires_size_with_offset() -> None: - data = { - "spec-id": 0, - "content": "position-deletes", - "file-path": "s3://bucket/table/delete.parquet", - "file-format": "parquet", - "file-size-in-bytes": 512, - "record-count": 10, - "content-offset": 100, - } - with pytest.raises(ValidationError) as exc_info: - RESTPositionDeleteFile.model_validate(data) - assert "content-size-in-bytes is required" in str(exc_info.value) - - def test_equality_delete_file() -> None: data = { "spec-id": 0, @@ -338,6 +323,7 @@ def test_delete_files_require_file_scan_tasks() -> None: ScanTasks.model_validate(data) assert "deleteFiles should only be returned with fileScanTasks" in str(exc_info.value) + def test_minimal_request() -> None: request = PlanTableScanRequest() dumped = request.model_dump(by_alias=True, exclude_none=True)