-
Notifications
You must be signed in to change notification settings - Fork 411
Description
Related to #2775
In order to support, scan planning for the REST catalog. The API returns file scan tasks as JSON, and I need to deserialize them into DataFile and DeleteFile objects. The API returns JSON like this:
{
"plan-status": "completed",
"delete-files": [
{
"spec-id": 0,
"content": "position-deletes",
"file-path": "s3://bucket/deletes.parquet",
"file-format": "parquet",
"partition": ["test"],
"file-size-in-bytes": 1529,
"record-count": 1,
"column-sizes": {"keys": [2147483546], "values": [134]},
"lower-bounds": {"keys": [2147483546], "values": ["73333A2F..."]},
...
}
],
"file-scan-tasks": [
{
"data-file": {
"spec-id": 0,
"content": "data",
"file-path": "s3://bucket/data.parquet",
...
},
"delete-file-references": [0],
"residual-filter": true
}
]
}
The format is defined in the https://github.com/apache/iceberg/blob/main/open-api/rest-catalog-open-api.yaml#L4337-L4389, and Java parses it via ContentFileParser.java.
Issue
The REST API representation differs from our internal representation:
- Partition is unbound
["test"]instead of a Record - Maps are
{"keys": [...], "values": [...]}instead of{key: value} - Bounds are primitives (bytes, hex)
- content is
position-deletesstring instead of enum int
The current state of our python DataFile:
- Extends
Record(for Avro compatibility) - Uses positional array access (
_data[pos]) - Constructed via
DataFile.from_args()factory - Tightly coupled to Avro reader/writer via
StructProtocol
Our DataFile isn't Pydantic, so we can't just do DataFile.model_validate(json) with validators to handle these conversions. Also, DataFile handles both data files and delete files via the content field. So it's really a content file.
Options
1. Translation layer (RestContentFile)
Create a separate Pydantic model that parses JSON with validators, then converts to DataFile.
Pros:
- Clean separation of concerns
- No risk to Avro code path
- Easy to test independently
Cons:
- Significant code duplication (all fields defined twice)
- Maintenance burden (keep two classes in sync)
- Conversion overhead
- I've prototyped this here and it's quite verbose
Example:
class RestContentFile(IcebergBaseModel):
# All fields with validators...
content: str # Validates and converts to our content enum
partition: list[Any] # Unbound values
def to_datafile(self) -> DataFile:
# Manual conversion logic...
2. Make DataFile Pydantic
Then it could parse JSON directly with pydantic.
The Challenge with this is that DataFile is coupled to Avro for fields, and extends Record. the Avro reader constructs objects with positional args like DataFile(None, None, ...) then fills by index. We'd need to converge here.
3. Manual parsing
Transform raw JSON dict manually and construct DataFile without Pydantic.
Pros:
- No duplication
- Full control over conversion
- Simple and don't need to mess with existing avro functionality
Cons:
- Lose Pydantic's validation benefits
Reccomendation
I'm Leaning towards B, as it would reduce a lot of duplication. However, it seems can't directly extend both Record and BaseModel due to a metaclass conflict.
I'm Leaning towards option 2 since it would reduce a lot of duplication. However, we can't directly extend both Record and BaseModel due to a metaclass conflict, but we can implement the same StructProtocol interface:
class DataFile(IcebergBaseModel):
content: DataFileContent = Field(default=DataFileContent.DATA)
file_path: str = Field(alias="file-path")
file_format: FileFormat = Field(alias="file-format")
# fields with validators for pydantic conversion
# Field order must match DATA_FILE_TYPE for Avro StructProtocol compatibility.
# The Avro reader/writer accesses fields by position, not name.
_FIELD_ORDER: ClassVar[tuple[str, ...]] = ("content", "file_path", ...)
def __new__(cls, *args, **kwargs):
if args and not kwargs:
# Positional args from Avro reader and bypass validation
return cls.model_construct(**dict(zip(cls._FIELD_ORDER, args)))
return super().__new__(cls)
# StructProtocol interface
def __getitem__(self, pos: int):
return getattr(self, self._FIELD_ORDER[pos])
def __setitem__(self, pos: int, value):
setattr(self, self._FIELD_ORDER[pos], value)
def __len__(self):
return len(self._FIELD_ORDER)
But ultimately, I wanted to get input before making changes since this touches a core model. Open to suggestions on the approach.
cc: @Fokko @kevinjqliu @HonahX