-
Notifications
You must be signed in to change notification settings - Fork 3k
Core: Fix data loss in partial variant shredding #15087
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
9032acd
5218092
04745aa
00b22dd
24bb7ea
f34042a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,6 +41,7 @@ | |
| import org.apache.iceberg.relocated.com.google.common.collect.Iterables; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
| import org.apache.iceberg.types.Types; | ||
| import org.apache.iceberg.variants.ShreddedObject; | ||
| import org.apache.iceberg.variants.ValueArray; | ||
| import org.apache.iceberg.variants.Variant; | ||
| import org.apache.iceberg.variants.VariantArray; | ||
|
|
@@ -52,6 +53,7 @@ | |
| import org.apache.parquet.hadoop.ParquetFileReader; | ||
| import org.apache.parquet.schema.LogicalTypeAnnotation; | ||
| import org.apache.parquet.schema.MessageType; | ||
| import org.junit.jupiter.api.Test; | ||
| import org.junit.jupiter.params.ParameterizedTest; | ||
| import org.junit.jupiter.params.provider.FieldSource; | ||
|
|
||
|
|
@@ -280,4 +282,53 @@ private static ValueArray array(VariantValue... values) { | |
|
|
||
| return arr; | ||
| } | ||
|
|
||
| @Test | ||
| public void testPartialShreddingWithShreddedObject() throws IOException { | ||
| // Test for issue #15086: partial shredding with ShreddedObject created using put() | ||
| // Create a ShreddedObject with multiple fields, then partially shred it | ||
| VariantMetadata metadata = Variants.metadata("id", "name", "city"); | ||
|
|
||
| // Create objects using ShreddedObject.put() instead of serialized buffers | ||
| List<Record> records = Lists.newArrayList(); | ||
| for (int i = 0; i < 3; i++) { | ||
| ShreddedObject obj = Variants.object(metadata); | ||
| obj.put("id", Variants.of(1000L + i)); | ||
| obj.put("name", Variants.of("user_" + i)); | ||
| obj.put("city", Variants.of("city_" + i)); | ||
|
|
||
| Variant variant = Variant.of(metadata, obj); | ||
| Record record = RECORD.copy("id", i, "var", variant); | ||
| records.add(record); | ||
| } | ||
|
|
||
| // Shredding function that only shreds the "id" field | ||
| VariantShreddingFunction partialShredding = | ||
| (id, name) -> { | ||
| VariantMetadata shreddedMetadata = Variants.metadata("id"); | ||
| ShreddedObject shreddedObject = Variants.object(shreddedMetadata); | ||
| shreddedObject.put("id", Variants.of(1234L)); | ||
| return ParquetVariantUtil.toParquetSchema(shreddedObject); | ||
| }; | ||
|
|
||
| // Write and read back | ||
| List<Record> actual = writeAndRead(partialShredding, records); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think an end-to-end test should live in
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I looked into this a bit more and ended up producing a test for diff --git a/core/src/main/java/org/apache/iceberg/variants/ShreddedObject.java b/core/src/main/java/org/apache/iceberg/variants/ShreddedObject.java
index c1fba073cf..5d2bfc48fa 100644
--- a/core/src/main/java/org/apache/iceberg/variants/ShreddedObject.java
+++ b/core/src/main/java/org/apache/iceberg/variants/ShreddedObject.java
@@ -153,12 +153,12 @@ public class ShreddedObject implements VariantObject {
private SerializationState(
VariantMetadata metadata,
VariantObject unshredded,
- Map<String, VariantValue> shreddedFields,
+ Map<String, VariantValue> shredded,
Set<String> removedFields) {
this.metadata = metadata;
// field ID size is the size needed to store the largest field ID in the data
this.fieldIdSize = VariantUtil.sizeOf(metadata.dictionarySize());
- this.shreddedFields = Maps.newHashMap(shreddedFields);
+ this.shreddedFields = Maps.newHashMap(shredded);
int totalDataSize = 0;
// get the unshredded field names and values as byte buffers
diff --git a/core/src/test/java/org/apache/iceberg/variants/TestShreddedObject.java b/core/src/test/java/org/apache/iceberg/variants/TestShreddedObject.java
index 1ebb433a54..0cb043d420 100644
--- a/core/src/test/java/org/apache/iceberg/variants/TestShreddedObject.java
+++ b/core/src/test/java/org/apache/iceberg/variants/TestShreddedObject.java
@@ -213,6 +213,28 @@ public class TestShreddedObject {
.isEqualTo(DateTimeUtil.isoDateToDays("2024-10-12"));
}
+ @Test
+ public void testPartiallyShreddedUnserializedObjectSerializationMinimalBuffer() {
+ ShreddedObject partial = createUnserializedObject(FIELDS);
+ VariantMetadata metadata = partial.metadata();
+
+ // replace field c with a new value
+ partial.put("c", Variants.ofIsoDate("2024-10-12"));
+ partial.remove("b");
+
+ VariantValue value = roundTripMinimalBuffer(partial, metadata);
+
+ assertThat(value).isInstanceOf(SerializedObject.class);
+ SerializedObject actual = (SerializedObject) value;
+
+ assertThat(actual.get("a")).isInstanceOf(VariantPrimitive.class);
+ assertThat(actual.get("a").asPrimitive().get()).isEqualTo(34);
+ assertThat(actual.get("c")).isInstanceOf(VariantPrimitive.class);
+ assertThat(actual.get("c").type()).isEqualTo(PhysicalType.DATE);
+ assertThat(actual.get("c").asPrimitive().get())
+ .isEqualTo(DateTimeUtil.isoDateToDays("2024-10-12"));
+ }
+
@Test
public void testPartiallyShreddedObjectSerializationLargeBuffer() {
ShreddedObject partial = createUnshreddedObject(FIELDS);
@@ -370,6 +392,12 @@ public class TestShreddedObject {
return Variants.value(metadata, slice);
}
+ private static ShreddedObject createUnserializedObject(Map<String, VariantValue> fields) {
+ ByteBuffer metadataBuffer = VariantTestUtil.createMetadata(fields.keySet(), false);
+ VariantMetadata metadata = SerializedMetadata.from(metadataBuffer);
+ return new ShreddedObject(metadata, createShreddedObject(metadata, fields));
+ }
+
/** Creates a ShreddedObject with fields in its shredded map, using the given metadata */
private static ShreddedObject createShreddedObject(
VariantMetadata metadata, Map<String, VariantValue> fields) {Feel free to use that test case. It would also be nice to have a Parquet test like this one. If you end up keeping it, please use helper methods to create the Parquet schema (see
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK. I'll include your test case, and I'd like also to keep my test case. I change the code from construcing parquet schema manually to using |
||
|
|
||
| // Verify all records match | ||
| assertThat(actual).hasSameSizeAs(records); | ||
| for (int i = 0; i < records.size(); i++) { | ||
| Record expected = records.get(i); | ||
| Record read = actual.get(i); | ||
|
|
||
| InternalTestHelpers.assertEquals(SCHEMA.asStruct(), expected, read); | ||
|
|
||
| // Also verify the variant object has all fields intact | ||
| Variant readVariant = (Variant) read.getField("var"); | ||
| VariantObject readObj = readVariant.value().asObject(); | ||
| assertThat(readObj.numFields()).isEqualTo(3); | ||
| assertThat(readObj.get("id").asPrimitive().get()).isEqualTo(1000L + i); | ||
| assertThat(readObj.get("name").asPrimitive().get()).isEqualTo("user_" + i); | ||
| assertThat(readObj.get("city").asPrimitive().get()).isEqualTo("city_" + i); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't there a more direct way to test this in
TestShreddedObject? I think it's fine to have an end-to-end test, but I'd like to see a more direct one that doesn't go through Parquet.