diff --git a/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java b/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java index 9bb2b713439d..c03c3323fe8b 100644 --- a/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java +++ b/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java @@ -24,6 +24,7 @@ import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.regex.Pattern; @@ -37,6 +38,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.transforms.Transforms; import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.DateTimeUtil; import org.apache.iceberg.variants.PhysicalType; @@ -737,11 +739,28 @@ private static String sanitizeVariantValue( private static PartitionSpec identitySpec(Schema schema, int... ids) { PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(schema); + Map idToParent = TypeUtil.indexParents(schema.asStruct()); for (int id : ids) { - specBuilder.identity(schema.findColumnName(id)); + String columnName = schema.findColumnName(id); + if (columnName != null && canBePartitionSource(id, schema, idToParent)) { + specBuilder.identity(columnName); + } } return specBuilder.build(); } + + private static boolean canBePartitionSource( + int fieldId, Schema schema, Map idToParent) { + Integer parentId = idToParent.get(fieldId); + while (parentId != null) { + Type parentType = schema.findType(parentId); + if (parentType == null || !parentType.isStructType()) { + return false; + } + parentId = idToParent.get(parentId); + } + return true; + } } diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java index 0ff3a949ae51..006b1edf1ce7 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java @@ -245,6 +245,37 @@ public void testRewriteSummary() throws Exception { EnvironmentContext.ENGINE_VERSION, v -> assertThat(v).startsWith("3.4")); } + @TestTemplate + public void testRewritePositionDeletesWithArrayColumns() throws Exception { + sql( + "CREATE TABLE %s (id BIGINT, data STRING, items ARRAY>) " + + "USING iceberg TBLPROPERTIES" + + "('format-version'='2', 'write.delete.mode'='merge-on-read', 'write.update.mode'='merge-on-read')", + tableName); + + sql( + "INSERT INTO %s VALUES " + + "(1, 'a', array(named_struct('value', cast(10 as bigint), 'count', 1))), " + + "(2, 'b', array(named_struct('value', cast(20 as bigint), 'count', 2))), " + + "(3, 'c', array(named_struct('value', cast(30 as bigint), 'count', 3))), " + + "(4, 'd', array(named_struct('value', cast(40 as bigint), 'count', 4))), " + + "(5, 'e', array(named_struct('value', cast(50 as bigint), 'count', 5))), " + + "(6, 'f', array(named_struct('value', cast(60 as bigint), 'count', 6)))", + tableName); + + sql("DELETE FROM %s WHERE id = 1", tableName); + sql("DELETE FROM %s WHERE id = 2", tableName); + + Table table = validationCatalog.loadTable(tableIdent); + assertThat(TestHelpers.deleteFiles(table)).hasSizeGreaterThanOrEqualTo(1); + + sql( + "CALL %s.system.rewrite_position_delete_files(" + + "table => '%s'," + + "options => map('rewrite-all','true'))", + catalogName, tableIdent); + } + private Map snapshotSummary() { return validationCatalog.loadTable(tableIdent).currentSnapshot().summary(); } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java index feafaff27b45..dd2bd051cd8a 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java @@ -245,6 +245,37 @@ public void testRewriteSummary() throws Exception { EnvironmentContext.ENGINE_VERSION, v -> assertThat(v).startsWith("3.5")); } + @TestTemplate + public void testRewritePositionDeletesWithArrayColumns() throws Exception { + sql( + "CREATE TABLE %s (id BIGINT, data STRING, items ARRAY>) " + + "USING iceberg TBLPROPERTIES" + + "('format-version'='2', 'write.delete.mode'='merge-on-read', 'write.update.mode'='merge-on-read')", + tableName); + + sql( + "INSERT INTO %s VALUES " + + "(1, 'a', array(named_struct('value', cast(10 as bigint), 'count', 1))), " + + "(2, 'b', array(named_struct('value', cast(20 as bigint), 'count', 2))), " + + "(3, 'c', array(named_struct('value', cast(30 as bigint), 'count', 3))), " + + "(4, 'd', array(named_struct('value', cast(40 as bigint), 'count', 4))), " + + "(5, 'e', array(named_struct('value', cast(50 as bigint), 'count', 5))), " + + "(6, 'f', array(named_struct('value', cast(60 as bigint), 'count', 6)))", + tableName); + + sql("DELETE FROM %s WHERE id = 1", tableName); + sql("DELETE FROM %s WHERE id = 2", tableName); + + Table table = validationCatalog.loadTable(tableIdent); + assertThat(TestHelpers.deleteFiles(table)).hasSizeGreaterThanOrEqualTo(1); + + sql( + "CALL %s.system.rewrite_position_delete_files(" + + "table => '%s'," + + "options => map('rewrite-all','true'))", + catalogName, tableIdent); + } + private Map snapshotSummary() { return validationCatalog.loadTable(tableIdent).currentSnapshot().summary(); } diff --git a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java index 006379adda56..e9bddcc09d12 100644 --- a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java +++ b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java @@ -245,6 +245,37 @@ public void testRewriteSummary() throws Exception { EnvironmentContext.ENGINE_VERSION, v -> assertThat(v).startsWith("4.0")); } + @TestTemplate + public void testRewritePositionDeletesWithArrayColumns() throws Exception { + sql( + "CREATE TABLE %s (id BIGINT, data STRING, items ARRAY>) " + + "USING iceberg TBLPROPERTIES" + + "('format-version'='2', 'write.delete.mode'='merge-on-read', 'write.update.mode'='merge-on-read')", + tableName); + + sql( + "INSERT INTO %s VALUES " + + "(1, 'a', array(named_struct('value', cast(10 as bigint), 'count', 1))), " + + "(2, 'b', array(named_struct('value', cast(20 as bigint), 'count', 2))), " + + "(3, 'c', array(named_struct('value', cast(30 as bigint), 'count', 3))), " + + "(4, 'd', array(named_struct('value', cast(40 as bigint), 'count', 4))), " + + "(5, 'e', array(named_struct('value', cast(50 as bigint), 'count', 5))), " + + "(6, 'f', array(named_struct('value', cast(60 as bigint), 'count', 6)))", + tableName); + + sql("DELETE FROM %s WHERE id = 1", tableName); + sql("DELETE FROM %s WHERE id = 2", tableName); + + Table table = validationCatalog.loadTable(tableIdent); + assertThat(TestHelpers.deleteFiles(table)).hasSizeGreaterThanOrEqualTo(1); + + sql( + "CALL %s.system.rewrite_position_delete_files(" + + "table => '%s'," + + "options => map('rewrite-all','true'))", + catalogName, tableIdent); + } + private Map snapshotSummary() { return validationCatalog.loadTable(tableIdent).currentSnapshot().summary(); } diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java index 311cf763eeef..63f3303ea663 100644 --- a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java @@ -245,6 +245,37 @@ public void testRewriteSummary() throws Exception { EnvironmentContext.ENGINE_VERSION, v -> assertThat(v).startsWith("4.1")); } + @TestTemplate + public void testRewritePositionDeletesWithArrayColumns() throws Exception { + sql( + "CREATE TABLE %s (id BIGINT, data STRING, items ARRAY>) " + + "USING iceberg TBLPROPERTIES" + + "('format-version'='2', 'write.delete.mode'='merge-on-read', 'write.update.mode'='merge-on-read')", + tableName); + + sql( + "INSERT INTO %s VALUES " + + "(1, 'a', array(named_struct('value', cast(10 as bigint), 'count', 1))), " + + "(2, 'b', array(named_struct('value', cast(20 as bigint), 'count', 2))), " + + "(3, 'c', array(named_struct('value', cast(30 as bigint), 'count', 3))), " + + "(4, 'd', array(named_struct('value', cast(40 as bigint), 'count', 4))), " + + "(5, 'e', array(named_struct('value', cast(50 as bigint), 'count', 5))), " + + "(6, 'f', array(named_struct('value', cast(60 as bigint), 'count', 6)))", + tableName); + + sql("DELETE FROM %s WHERE id = 1", tableName); + sql("DELETE FROM %s WHERE id = 2", tableName); + + Table table = validationCatalog.loadTable(tableIdent); + assertThat(TestHelpers.deleteFiles(table)).hasSizeGreaterThanOrEqualTo(1); + + sql( + "CALL %s.system.rewrite_position_delete_files(" + + "table => '%s'," + + "options => map('rewrite-all','true'))", + catalogName, tableIdent); + } + private Map snapshotSummary() { return validationCatalog.loadTable(tableIdent).currentSnapshot().summary(); }