diff --git a/api/src/main/java/org/apache/iceberg/PartitionStatistics.java b/api/src/main/java/org/apache/iceberg/PartitionStatistics.java index c0c4c07b7e27..b965f32161da 100644 --- a/api/src/main/java/org/apache/iceberg/PartitionStatistics.java +++ b/api/src/main/java/org/apache/iceberg/PartitionStatistics.java @@ -18,8 +18,101 @@ */ package org.apache.iceberg; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Types; + /** Interface for partition statistics returned from a {@link PartitionStatisticsScan}. */ public interface PartitionStatistics extends StructLike { + Types.NestedField EMPTY_PARTITION_FIELD = + Types.NestedField.required(1, "partition", Types.StructType.of()); + Types.NestedField SPEC_ID = Types.NestedField.required(2, "spec_id", Types.IntegerType.get()); + Types.NestedField DATA_RECORD_COUNT = + Types.NestedField.required(3, "data_record_count", Types.LongType.get()); + Types.NestedField DATA_FILE_COUNT = + Types.NestedField.required(4, "data_file_count", Types.IntegerType.get()); + Types.NestedField TOTAL_DATA_FILE_SIZE_IN_BYTES = + Types.NestedField.required(5, "total_data_file_size_in_bytes", Types.LongType.get()); + Types.NestedField POSITION_DELETE_RECORD_COUNT = + Types.NestedField.optional(6, "position_delete_record_count", Types.LongType.get()); + Types.NestedField POSITION_DELETE_FILE_COUNT = + Types.NestedField.optional(7, "position_delete_file_count", Types.IntegerType.get()); + Types.NestedField EQUALITY_DELETE_RECORD_COUNT = + Types.NestedField.optional(8, "equality_delete_record_count", Types.LongType.get()); + Types.NestedField EQUALITY_DELETE_FILE_COUNT = + Types.NestedField.optional(9, "equality_delete_file_count", Types.IntegerType.get()); + Types.NestedField TOTAL_RECORD_COUNT = + Types.NestedField.optional(10, "total_record_count", Types.LongType.get()); + Types.NestedField LAST_UPDATED_AT = + Types.NestedField.optional(11, "last_updated_at", Types.LongType.get()); + Types.NestedField LAST_UPDATED_SNAPSHOT_ID = + Types.NestedField.optional(12, "last_updated_snapshot_id", Types.LongType.get()); + // Using default value for v3 field to support v3 reader reading file written by v2 + Types.NestedField DV_COUNT = + Types.NestedField.required("dv_count") + .withId(13) + .ofType(Types.IntegerType.get()) + .withInitialDefault(Literal.of(0)) + .withWriteDefault(Literal.of(0)) + .build(); + + static Schema schema(Types.StructType unifiedPartitionType, int formatVersion) { + Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned"); + Preconditions.checkState(formatVersion > 0, "Invalid format version: %d", formatVersion); + + if (formatVersion <= 2) { + return v2Schema(unifiedPartitionType); + } + + return v3Schema(unifiedPartitionType); + } + + private static Schema v2Schema(Types.StructType unifiedPartitionType) { + return new Schema( + Types.NestedField.required( + EMPTY_PARTITION_FIELD.fieldId(), EMPTY_PARTITION_FIELD.name(), unifiedPartitionType), + SPEC_ID, + DATA_RECORD_COUNT, + DATA_FILE_COUNT, + TOTAL_DATA_FILE_SIZE_IN_BYTES, + POSITION_DELETE_RECORD_COUNT, + POSITION_DELETE_FILE_COUNT, + EQUALITY_DELETE_RECORD_COUNT, + EQUALITY_DELETE_FILE_COUNT, + TOTAL_RECORD_COUNT, + LAST_UPDATED_AT, + LAST_UPDATED_SNAPSHOT_ID); + } + + private static Schema v3Schema(Types.StructType unifiedPartitionType) { + return new Schema( + Types.NestedField.required( + EMPTY_PARTITION_FIELD.fieldId(), EMPTY_PARTITION_FIELD.name(), unifiedPartitionType), + SPEC_ID, + DATA_RECORD_COUNT, + DATA_FILE_COUNT, + TOTAL_DATA_FILE_SIZE_IN_BYTES, + Types.NestedField.required( + POSITION_DELETE_RECORD_COUNT.fieldId(), + POSITION_DELETE_RECORD_COUNT.name(), + Types.LongType.get()), + Types.NestedField.required( + POSITION_DELETE_FILE_COUNT.fieldId(), + POSITION_DELETE_FILE_COUNT.name(), + Types.IntegerType.get()), + Types.NestedField.required( + EQUALITY_DELETE_RECORD_COUNT.fieldId(), + EQUALITY_DELETE_RECORD_COUNT.name(), + Types.LongType.get()), + Types.NestedField.required( + EQUALITY_DELETE_FILE_COUNT.fieldId(), + EQUALITY_DELETE_FILE_COUNT.name(), + Types.IntegerType.get()), + TOTAL_RECORD_COUNT, + LAST_UPDATED_AT, + LAST_UPDATED_SNAPSHOT_ID, + DV_COUNT); + } /* The positions of each statistics within the full schema of partition statistics. */ int PARTITION_POSITION = 0; diff --git a/core/src/main/java/org/apache/iceberg/BasePartitionStatisticsScan.java b/core/src/main/java/org/apache/iceberg/BasePartitionStatisticsScan.java index 075a1a85d394..1316f015adb3 100644 --- a/core/src/main/java/org/apache/iceberg/BasePartitionStatisticsScan.java +++ b/core/src/main/java/org/apache/iceberg/BasePartitionStatisticsScan.java @@ -72,7 +72,7 @@ public CloseableIterable scan() { } Types.StructType partitionType = Partitioning.partitionType(table); - Schema schema = PartitionStatsHandler.schema(partitionType, TableUtil.formatVersion(table)); + Schema schema = PartitionStatistics.schema(partitionType, TableUtil.formatVersion(table)); FileFormat fileFormat = FileFormat.fromFileName(statsFile.get().path()); Preconditions.checkNotNull( diff --git a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java index acf4ca3c0ce9..89aabfcc6af2 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java +++ b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java @@ -68,30 +68,101 @@ private PartitionStatsHandler() {} private static final Logger LOG = LoggerFactory.getLogger(PartitionStatsHandler.class); // schema of the partition stats file as per spec - public static final int PARTITION_FIELD_ID = 1; - public static final String PARTITION_FIELD_NAME = "partition"; + /** + * @deprecated will be removed in 1.12.0. Use {@link PartitionStatistics#EMPTY_PARTITION_FIELD} + */ + @Deprecated public static final int PARTITION_FIELD_ID = 1; + + /** + * @deprecated will be removed in 1.12.0. Use {@link PartitionStatistics#EMPTY_PARTITION_FIELD} + */ + @Deprecated public static final String PARTITION_FIELD_NAME = "partition"; + + /** + * @deprecated will be removed in 1.12.0. Use {@link PartitionStatistics#SPEC_ID} + */ + @Deprecated public static final NestedField SPEC_ID = NestedField.required(2, "spec_id", IntegerType.get()); + + /** + * @deprecated will be removed in 1.12.0. Use {@link PartitionStatistics#DATA_RECORD_COUNT} + */ + @Deprecated public static final NestedField DATA_RECORD_COUNT = NestedField.required(3, "data_record_count", LongType.get()); + + /** + * @deprecated will be removed in 1.12.0. Use {@link PartitionStatistics#DATA_FILE_COUNT} + */ + @Deprecated public static final NestedField DATA_FILE_COUNT = NestedField.required(4, "data_file_count", IntegerType.get()); + + /** + * @deprecated will be removed in 1.12.0. Use {@link + * PartitionStatistics#TOTAL_DATA_FILE_SIZE_IN_BYTES} + */ + @Deprecated public static final NestedField TOTAL_DATA_FILE_SIZE_IN_BYTES = NestedField.required(5, "total_data_file_size_in_bytes", LongType.get()); + + /** + * @deprecated will be removed in 1.12.0. Use {@link + * PartitionStatistics#POSITION_DELETE_RECORD_COUNT} + */ + @Deprecated public static final NestedField POSITION_DELETE_RECORD_COUNT = NestedField.optional(6, "position_delete_record_count", LongType.get()); + + /** + * @deprecated will be removed in 1.12.0. Use {@link + * PartitionStatistics#POSITION_DELETE_FILE_COUNT} + */ + @Deprecated public static final NestedField POSITION_DELETE_FILE_COUNT = NestedField.optional(7, "position_delete_file_count", IntegerType.get()); + + /** + * @deprecated will be removed in 1.12.0. Use {@link + * PartitionStatistics#EQUALITY_DELETE_RECORD_COUNT} + */ + @Deprecated public static final NestedField EQUALITY_DELETE_RECORD_COUNT = NestedField.optional(8, "equality_delete_record_count", LongType.get()); + + /** + * @deprecated will be removed in 1.12.0. Use {@link + * PartitionStatistics#EQUALITY_DELETE_FILE_COUNT} + */ + @Deprecated public static final NestedField EQUALITY_DELETE_FILE_COUNT = NestedField.optional(9, "equality_delete_file_count", IntegerType.get()); + + /** + * @deprecated will be removed in 1.12.0. Use {@link PartitionStatistics#TOTAL_RECORD_COUNT} + */ + @Deprecated public static final NestedField TOTAL_RECORD_COUNT = NestedField.optional(10, "total_record_count", LongType.get()); + + /** + * @deprecated will be removed in 1.12.0. Use {@link PartitionStatistics#LAST_UPDATED_AT} + */ + @Deprecated public static final NestedField LAST_UPDATED_AT = NestedField.optional(11, "last_updated_at", LongType.get()); + + /** + * @deprecated will be removed in 1.12.0. Use {@link PartitionStatistics#LAST_UPDATED_SNAPSHOT_ID} + */ + @Deprecated public static final NestedField LAST_UPDATED_SNAPSHOT_ID = NestedField.optional(12, "last_updated_snapshot_id", LongType.get()); - // Using default value for v3 field to support v3 reader reading file written by v2 + + /** + * @deprecated will be removed in 1.12.0. Use {@link PartitionStatistics#DV_COUNT} + */ + @Deprecated public static final NestedField DV_COUNT = NestedField.required("dv_count") .withId(13) @@ -107,7 +178,10 @@ private PartitionStatsHandler() {} * @param unifiedPartitionType unified partition schema type. Could be calculated by {@link * Partitioning#partitionType(Table)}. * @return a schema that corresponds to the provided unified partition type. + * @deprecated will be removed in 1.12.0. Use {@link PartitionStatistics#schema(StructType, int)} + * instead. */ + @Deprecated public static Schema schema(StructType unifiedPartitionType, int formatVersion) { Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned"); Preconditions.checkState( @@ -243,7 +317,7 @@ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long return writePartitionStatsFile( table, snapshot.snapshotId(), - schema(partitionType, TableUtil.formatVersion(table)), + PartitionStatistics.schema(partitionType, TableUtil.formatVersion(table)), sortedStats); } diff --git a/core/src/test/java/org/apache/iceberg/PartitionStatisticsScanTestBase.java b/core/src/test/java/org/apache/iceberg/PartitionStatisticsScanTestBase.java index e48d26a71f97..321d6fedad5a 100644 --- a/core/src/test/java/org/apache/iceberg/PartitionStatisticsScanTestBase.java +++ b/core/src/test/java/org/apache/iceberg/PartitionStatisticsScanTestBase.java @@ -18,7 +18,7 @@ */ package org.apache.iceberg; -import static org.apache.iceberg.PartitionStatsHandler.PARTITION_FIELD_ID; +import static org.apache.iceberg.PartitionStatistics.EMPTY_PARTITION_FIELD; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -217,10 +217,10 @@ public void testScanPartitionStatsForCurrentSnapshot() throws Exception { } Snapshot snapshot1 = testTable.currentSnapshot(); - Schema recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable), 2); + Schema recordSchema = PartitionStatistics.schema(Partitioning.partitionType(testTable), 2); Types.StructType partitionType = - recordSchema.findField(PARTITION_FIELD_ID).type().asStructType(); + recordSchema.findField(EMPTY_PARTITION_FIELD.fieldId()).type().asStructType(); computeAndValidatePartitionStats( testTable, testTable.currentSnapshot().snapshotId(), @@ -379,10 +379,10 @@ public void testScanPartitionStatsForOlderSnapshot() throws Exception { testTable.newAppend().appendFile(dataFile1).appendFile(dataFile2).commit(); - Schema recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable), 2); + Schema recordSchema = PartitionStatistics.schema(Partitioning.partitionType(testTable), 2); Types.StructType partitionType = - recordSchema.findField(PARTITION_FIELD_ID).type().asStructType(); + recordSchema.findField(EMPTY_PARTITION_FIELD.fieldId()).type().asStructType(); computeAndValidatePartitionStats( testTable, diff --git a/core/src/test/java/org/apache/iceberg/PartitionStatisticsTestBase.java b/core/src/test/java/org/apache/iceberg/PartitionStatisticsTestBase.java index 5324db50f5da..518c1b3cde81 100644 --- a/core/src/test/java/org/apache/iceberg/PartitionStatisticsTestBase.java +++ b/core/src/test/java/org/apache/iceberg/PartitionStatisticsTestBase.java @@ -18,18 +18,18 @@ */ package org.apache.iceberg; -import static org.apache.iceberg.PartitionStatsHandler.DATA_FILE_COUNT; -import static org.apache.iceberg.PartitionStatsHandler.DATA_RECORD_COUNT; -import static org.apache.iceberg.PartitionStatsHandler.EQUALITY_DELETE_FILE_COUNT; -import static org.apache.iceberg.PartitionStatsHandler.EQUALITY_DELETE_RECORD_COUNT; -import static org.apache.iceberg.PartitionStatsHandler.LAST_UPDATED_AT; -import static org.apache.iceberg.PartitionStatsHandler.LAST_UPDATED_SNAPSHOT_ID; -import static org.apache.iceberg.PartitionStatsHandler.PARTITION_FIELD_NAME; -import static org.apache.iceberg.PartitionStatsHandler.POSITION_DELETE_FILE_COUNT; -import static org.apache.iceberg.PartitionStatsHandler.POSITION_DELETE_RECORD_COUNT; -import static org.apache.iceberg.PartitionStatsHandler.SPEC_ID; -import static org.apache.iceberg.PartitionStatsHandler.TOTAL_DATA_FILE_SIZE_IN_BYTES; -import static org.apache.iceberg.PartitionStatsHandler.TOTAL_RECORD_COUNT; +import static org.apache.iceberg.PartitionStatistics.DATA_FILE_COUNT; +import static org.apache.iceberg.PartitionStatistics.DATA_RECORD_COUNT; +import static org.apache.iceberg.PartitionStatistics.EMPTY_PARTITION_FIELD; +import static org.apache.iceberg.PartitionStatistics.EQUALITY_DELETE_FILE_COUNT; +import static org.apache.iceberg.PartitionStatistics.EQUALITY_DELETE_RECORD_COUNT; +import static org.apache.iceberg.PartitionStatistics.LAST_UPDATED_AT; +import static org.apache.iceberg.PartitionStatistics.LAST_UPDATED_SNAPSHOT_ID; +import static org.apache.iceberg.PartitionStatistics.POSITION_DELETE_FILE_COUNT; +import static org.apache.iceberg.PartitionStatistics.POSITION_DELETE_RECORD_COUNT; +import static org.apache.iceberg.PartitionStatistics.SPEC_ID; +import static org.apache.iceberg.PartitionStatistics.TOTAL_DATA_FILE_SIZE_IN_BYTES; +import static org.apache.iceberg.PartitionStatistics.TOTAL_RECORD_COUNT; import static org.apache.iceberg.types.Types.NestedField.optional; import java.io.File; @@ -60,7 +60,7 @@ public abstract class PartitionStatisticsTestBase { protected Schema invalidOldSchema(Types.StructType unifiedPartitionType) { // field ids starts from 0 instead of 1 return new Schema( - Types.NestedField.required(0, PARTITION_FIELD_NAME, unifiedPartitionType), + Types.NestedField.required(0, EMPTY_PARTITION_FIELD.name(), unifiedPartitionType), Types.NestedField.required(1, SPEC_ID.name(), Types.IntegerType.get()), Types.NestedField.required(2, DATA_RECORD_COUNT.name(), Types.LongType.get()), Types.NestedField.required(3, DATA_FILE_COUNT.name(), Types.IntegerType.get()), diff --git a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java index d5442e209dcc..7b7e85bfffce 100644 --- a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java +++ b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java @@ -18,7 +18,7 @@ */ package org.apache.iceberg; -import static org.apache.iceberg.PartitionStatsHandler.PARTITION_FIELD_ID; +import static org.apache.iceberg.PartitionStatistics.EMPTY_PARTITION_FIELD; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; @@ -159,10 +159,11 @@ public void testAllDatatypePartitionWriting() throws Exception { fileFormatProperty); Types.StructType partitionSchema = Partitioning.partitionType(testTable); - Schema dataSchema = PartitionStatsHandler.schema(partitionSchema, formatVersion); + Schema dataSchema = PartitionStatistics.schema(partitionSchema, formatVersion); PartitionData partitionData = - new PartitionData(dataSchema.findField(PARTITION_FIELD_ID).type().asStructType()); + new PartitionData( + dataSchema.findField(EMPTY_PARTITION_FIELD.fieldId()).type().asStructType()); partitionData.set(0, true); partitionData.set(1, 42); partitionData.set(2, 42L); @@ -228,12 +229,12 @@ public void testOptionalFieldsWriting() throws Exception { fileFormatProperty); Types.StructType partitionSchema = Partitioning.partitionType(testTable); - Schema dataSchema = PartitionStatsHandler.schema(partitionSchema, formatVersion); + Schema dataSchema = PartitionStatistics.schema(partitionSchema, formatVersion); ImmutableList.Builder partitionListBuilder = ImmutableList.builder(); for (int i = 0; i < 5; i++) { PartitionStatistics stats = - randomStats(dataSchema.findField(PARTITION_FIELD_ID).type().asStructType()); + randomStats(dataSchema.findField(EMPTY_PARTITION_FIELD.fieldId()).type().asStructType()); stats.set(PartitionStatistics.POSITION_DELETE_RECORD_COUNT_POSITION, null); stats.set(PartitionStatistics.POSITION_DELETE_FILE_COUNT_POSITION, null); stats.set(PartitionStatistics.EQUALITY_DELETE_RECORD_COUNT_POSITION, null); @@ -330,10 +331,10 @@ public void testPartitionStats() throws Exception { } Snapshot snapshot1 = testTable.currentSnapshot(); - Schema recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable), 2); + Schema recordSchema = PartitionStatistics.schema(Partitioning.partitionType(testTable), 2); Types.StructType partitionType = - recordSchema.findField(PARTITION_FIELD_ID).type().asStructType(); + recordSchema.findField(EMPTY_PARTITION_FIELD.fieldId()).type().asStructType(); computeAndValidatePartitionStats( testTable, recordSchema, @@ -409,7 +410,7 @@ public void testPartitionStats() throws Exception { testTable.newRowDelta().addDeletes(dv).commit(); Snapshot snapshot4 = testTable.currentSnapshot(); - recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable), 3); + recordSchema = PartitionStatistics.schema(Partitioning.partitionType(testTable), 3); computeAndValidatePartitionStats( testTable, @@ -600,7 +601,7 @@ public void testReadingStatsWithInvalidSchema() throws Exception { Table testTable = TestTables.create(tempDir("old_schema"), "old_schema", SCHEMA, spec, 2, fileFormatProperty); Types.StructType partitionType = Partitioning.partitionType(testTable); - Schema newSchema = PartitionStatsHandler.schema(partitionType, 2); + Schema newSchema = PartitionStatistics.schema(partitionType, 2); Schema oldSchema = invalidOldSchema(partitionType); PartitionStatisticsFile invalidStatisticsFile = @@ -681,7 +682,7 @@ public void testV2toV3SchemaEvolution() throws Exception { Types.StructType partitionSchema = Partitioning.partitionType(testTable); // read with v2 schema - Schema v2Schema = PartitionStatsHandler.schema(partitionSchema, 2); + Schema v2Schema = PartitionStatistics.schema(partitionSchema, 2); List partitionStatsV2; try (CloseableIterable recordIterator = PartitionStatsHandler.readPartitionStatsFile( @@ -690,7 +691,7 @@ public void testV2toV3SchemaEvolution() throws Exception { } // read with v3 schema - Schema v3Schema = PartitionStatsHandler.schema(partitionSchema, 3); + Schema v3Schema = PartitionStatistics.schema(partitionSchema, 3); List partitionStatsV3; try (CloseableIterable recordIterator = PartitionStatsHandler.readPartitionStatsFile(