Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions api/src/main/java/org/apache/iceberg/PartitionStatistics.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,101 @@
*/
package org.apache.iceberg;

import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Types;

/** Interface for partition statistics returned from a {@link PartitionStatisticsScan}. */
public interface PartitionStatistics extends StructLike {
Types.NestedField EMPTY_PARTITION_FIELD =
Types.NestedField.required(1, "partition", Types.StructType.of());
Types.NestedField SPEC_ID = Types.NestedField.required(2, "spec_id", Types.IntegerType.get());
Types.NestedField DATA_RECORD_COUNT =
Types.NestedField.required(3, "data_record_count", Types.LongType.get());
Types.NestedField DATA_FILE_COUNT =
Types.NestedField.required(4, "data_file_count", Types.IntegerType.get());
Types.NestedField TOTAL_DATA_FILE_SIZE_IN_BYTES =
Types.NestedField.required(5, "total_data_file_size_in_bytes", Types.LongType.get());
Types.NestedField POSITION_DELETE_RECORD_COUNT =
Types.NestedField.optional(6, "position_delete_record_count", Types.LongType.get());
Types.NestedField POSITION_DELETE_FILE_COUNT =
Types.NestedField.optional(7, "position_delete_file_count", Types.IntegerType.get());
Types.NestedField EQUALITY_DELETE_RECORD_COUNT =
Types.NestedField.optional(8, "equality_delete_record_count", Types.LongType.get());
Types.NestedField EQUALITY_DELETE_FILE_COUNT =
Types.NestedField.optional(9, "equality_delete_file_count", Types.IntegerType.get());
Types.NestedField TOTAL_RECORD_COUNT =
Types.NestedField.optional(10, "total_record_count", Types.LongType.get());
Types.NestedField LAST_UPDATED_AT =
Types.NestedField.optional(11, "last_updated_at", Types.LongType.get());
Types.NestedField LAST_UPDATED_SNAPSHOT_ID =
Types.NestedField.optional(12, "last_updated_snapshot_id", Types.LongType.get());
// Using default value for v3 field to support v3 reader reading file written by v2
Types.NestedField DV_COUNT =
Types.NestedField.required("dv_count")
.withId(13)
.ofType(Types.IntegerType.get())
.withInitialDefault(Literal.of(0))
.withWriteDefault(Literal.of(0))
.build();

static Schema schema(Types.StructType unifiedPartitionType, int formatVersion) {
Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned");
Preconditions.checkState(formatVersion > 0, "Invalid format version: %d", formatVersion);

if (formatVersion <= 2) {
return v2Schema(unifiedPartitionType);
}

return v3Schema(unifiedPartitionType);
}

private static Schema v2Schema(Types.StructType unifiedPartitionType) {
return new Schema(
Types.NestedField.required(
EMPTY_PARTITION_FIELD.fieldId(), EMPTY_PARTITION_FIELD.name(), unifiedPartitionType),
SPEC_ID,
DATA_RECORD_COUNT,
DATA_FILE_COUNT,
TOTAL_DATA_FILE_SIZE_IN_BYTES,
POSITION_DELETE_RECORD_COUNT,
POSITION_DELETE_FILE_COUNT,
EQUALITY_DELETE_RECORD_COUNT,
EQUALITY_DELETE_FILE_COUNT,
TOTAL_RECORD_COUNT,
LAST_UPDATED_AT,
LAST_UPDATED_SNAPSHOT_ID);
}

private static Schema v3Schema(Types.StructType unifiedPartitionType) {
return new Schema(
Types.NestedField.required(
EMPTY_PARTITION_FIELD.fieldId(), EMPTY_PARTITION_FIELD.name(), unifiedPartitionType),
SPEC_ID,
DATA_RECORD_COUNT,
DATA_FILE_COUNT,
TOTAL_DATA_FILE_SIZE_IN_BYTES,
Types.NestedField.required(
POSITION_DELETE_RECORD_COUNT.fieldId(),
POSITION_DELETE_RECORD_COUNT.name(),
Types.LongType.get()),
Types.NestedField.required(
POSITION_DELETE_FILE_COUNT.fieldId(),
POSITION_DELETE_FILE_COUNT.name(),
Types.IntegerType.get()),
Types.NestedField.required(
EQUALITY_DELETE_RECORD_COUNT.fieldId(),
EQUALITY_DELETE_RECORD_COUNT.name(),
Types.LongType.get()),
Types.NestedField.required(
EQUALITY_DELETE_FILE_COUNT.fieldId(),
EQUALITY_DELETE_FILE_COUNT.name(),
Types.IntegerType.get()),
TOTAL_RECORD_COUNT,
LAST_UPDATED_AT,
LAST_UPDATED_SNAPSHOT_ID,
DV_COUNT);
}

/* The positions of each statistics within the full schema of partition statistics. */
int PARTITION_POSITION = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public CloseableIterable<PartitionStatistics> scan() {
}

Types.StructType partitionType = Partitioning.partitionType(table);
Schema schema = PartitionStatsHandler.schema(partitionType, TableUtil.formatVersion(table));
Schema schema = PartitionStatistics.schema(partitionType, TableUtil.formatVersion(table));

FileFormat fileFormat = FileFormat.fromFileName(statsFile.get().path());
Preconditions.checkNotNull(
Expand Down
82 changes: 78 additions & 4 deletions core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,30 +68,101 @@ private PartitionStatsHandler() {}
private static final Logger LOG = LoggerFactory.getLogger(PartitionStatsHandler.class);

// schema of the partition stats file as per spec
public static final int PARTITION_FIELD_ID = 1;
public static final String PARTITION_FIELD_NAME = "partition";
/**
* @deprecated will be removed in 1.12.0. Use {@link PartitionStatistics#EMPTY_PARTITION_FIELD}
*/
@Deprecated public static final int PARTITION_FIELD_ID = 1;

/**
* @deprecated will be removed in 1.12.0. Use {@link PartitionStatistics#EMPTY_PARTITION_FIELD}
*/
@Deprecated public static final String PARTITION_FIELD_NAME = "partition";

/**
* @deprecated will be removed in 1.12.0. Use {@link PartitionStatistics#SPEC_ID}
*/
@Deprecated
public static final NestedField SPEC_ID = NestedField.required(2, "spec_id", IntegerType.get());

/**
* @deprecated will be removed in 1.12.0. Use {@link PartitionStatistics#DATA_RECORD_COUNT}
*/
@Deprecated
public static final NestedField DATA_RECORD_COUNT =
NestedField.required(3, "data_record_count", LongType.get());

/**
* @deprecated will be removed in 1.12.0. Use {@link PartitionStatistics#DATA_FILE_COUNT}
*/
@Deprecated
public static final NestedField DATA_FILE_COUNT =
NestedField.required(4, "data_file_count", IntegerType.get());

/**
* @deprecated will be removed in 1.12.0. Use {@link
* PartitionStatistics#TOTAL_DATA_FILE_SIZE_IN_BYTES}
*/
@Deprecated
public static final NestedField TOTAL_DATA_FILE_SIZE_IN_BYTES =
NestedField.required(5, "total_data_file_size_in_bytes", LongType.get());

/**
* @deprecated will be removed in 1.12.0. Use {@link
* PartitionStatistics#POSITION_DELETE_RECORD_COUNT}
*/
@Deprecated
public static final NestedField POSITION_DELETE_RECORD_COUNT =
NestedField.optional(6, "position_delete_record_count", LongType.get());

/**
* @deprecated will be removed in 1.12.0. Use {@link
* PartitionStatistics#POSITION_DELETE_FILE_COUNT}
*/
@Deprecated
public static final NestedField POSITION_DELETE_FILE_COUNT =
NestedField.optional(7, "position_delete_file_count", IntegerType.get());

/**
* @deprecated will be removed in 1.12.0. Use {@link
* PartitionStatistics#EQUALITY_DELETE_RECORD_COUNT}
*/
@Deprecated
public static final NestedField EQUALITY_DELETE_RECORD_COUNT =
NestedField.optional(8, "equality_delete_record_count", LongType.get());

/**
* @deprecated will be removed in 1.12.0. Use {@link
* PartitionStatistics#EQUALITY_DELETE_FILE_COUNT}
*/
@Deprecated
public static final NestedField EQUALITY_DELETE_FILE_COUNT =
NestedField.optional(9, "equality_delete_file_count", IntegerType.get());

/**
* @deprecated will be removed in 1.12.0. Use {@link PartitionStatistics#TOTAL_RECORD_COUNT}
*/
@Deprecated
public static final NestedField TOTAL_RECORD_COUNT =
NestedField.optional(10, "total_record_count", LongType.get());

/**
* @deprecated will be removed in 1.12.0. Use {@link PartitionStatistics#LAST_UPDATED_AT}
*/
@Deprecated
public static final NestedField LAST_UPDATED_AT =
NestedField.optional(11, "last_updated_at", LongType.get());

/**
* @deprecated will be removed in 1.12.0. Use {@link PartitionStatistics#LAST_UPDATED_SNAPSHOT_ID}
*/
@Deprecated
public static final NestedField LAST_UPDATED_SNAPSHOT_ID =
NestedField.optional(12, "last_updated_snapshot_id", LongType.get());
// Using default value for v3 field to support v3 reader reading file written by v2

/**
* @deprecated will be removed in 1.12.0. Use {@link PartitionStatistics#DV_COUNT}
*/
@Deprecated
public static final NestedField DV_COUNT =
NestedField.required("dv_count")
.withId(13)
Expand All @@ -107,7 +178,10 @@ private PartitionStatsHandler() {}
* @param unifiedPartitionType unified partition schema type. Could be calculated by {@link
* Partitioning#partitionType(Table)}.
* @return a schema that corresponds to the provided unified partition type.
* @deprecated will be removed in 1.12.0. Use {@link PartitionStatistics#schema(StructType, int)}
* instead.
*/
@Deprecated
public static Schema schema(StructType unifiedPartitionType, int formatVersion) {
Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned");
Preconditions.checkState(
Expand Down Expand Up @@ -243,7 +317,7 @@ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long
return writePartitionStatsFile(
table,
snapshot.snapshotId(),
schema(partitionType, TableUtil.formatVersion(table)),
PartitionStatistics.schema(partitionType, TableUtil.formatVersion(table)),
sortedStats);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
package org.apache.iceberg;

import static org.apache.iceberg.PartitionStatsHandler.PARTITION_FIELD_ID;
import static org.apache.iceberg.PartitionStatistics.EMPTY_PARTITION_FIELD;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand Down Expand Up @@ -217,10 +217,10 @@ public void testScanPartitionStatsForCurrentSnapshot() throws Exception {
}

Snapshot snapshot1 = testTable.currentSnapshot();
Schema recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable), 2);
Schema recordSchema = PartitionStatistics.schema(Partitioning.partitionType(testTable), 2);

Types.StructType partitionType =
recordSchema.findField(PARTITION_FIELD_ID).type().asStructType();
recordSchema.findField(EMPTY_PARTITION_FIELD.fieldId()).type().asStructType();
computeAndValidatePartitionStats(
testTable,
testTable.currentSnapshot().snapshotId(),
Expand Down Expand Up @@ -379,10 +379,10 @@ public void testScanPartitionStatsForOlderSnapshot() throws Exception {

testTable.newAppend().appendFile(dataFile1).appendFile(dataFile2).commit();

Schema recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable), 2);
Schema recordSchema = PartitionStatistics.schema(Partitioning.partitionType(testTable), 2);

Types.StructType partitionType =
recordSchema.findField(PARTITION_FIELD_ID).type().asStructType();
recordSchema.findField(EMPTY_PARTITION_FIELD.fieldId()).type().asStructType();

computeAndValidatePartitionStats(
testTable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@
*/
package org.apache.iceberg;

import static org.apache.iceberg.PartitionStatsHandler.DATA_FILE_COUNT;
import static org.apache.iceberg.PartitionStatsHandler.DATA_RECORD_COUNT;
import static org.apache.iceberg.PartitionStatsHandler.EQUALITY_DELETE_FILE_COUNT;
import static org.apache.iceberg.PartitionStatsHandler.EQUALITY_DELETE_RECORD_COUNT;
import static org.apache.iceberg.PartitionStatsHandler.LAST_UPDATED_AT;
import static org.apache.iceberg.PartitionStatsHandler.LAST_UPDATED_SNAPSHOT_ID;
import static org.apache.iceberg.PartitionStatsHandler.PARTITION_FIELD_NAME;
import static org.apache.iceberg.PartitionStatsHandler.POSITION_DELETE_FILE_COUNT;
import static org.apache.iceberg.PartitionStatsHandler.POSITION_DELETE_RECORD_COUNT;
import static org.apache.iceberg.PartitionStatsHandler.SPEC_ID;
import static org.apache.iceberg.PartitionStatsHandler.TOTAL_DATA_FILE_SIZE_IN_BYTES;
import static org.apache.iceberg.PartitionStatsHandler.TOTAL_RECORD_COUNT;
import static org.apache.iceberg.PartitionStatistics.DATA_FILE_COUNT;
import static org.apache.iceberg.PartitionStatistics.DATA_RECORD_COUNT;
import static org.apache.iceberg.PartitionStatistics.EMPTY_PARTITION_FIELD;
import static org.apache.iceberg.PartitionStatistics.EQUALITY_DELETE_FILE_COUNT;
import static org.apache.iceberg.PartitionStatistics.EQUALITY_DELETE_RECORD_COUNT;
import static org.apache.iceberg.PartitionStatistics.LAST_UPDATED_AT;
import static org.apache.iceberg.PartitionStatistics.LAST_UPDATED_SNAPSHOT_ID;
import static org.apache.iceberg.PartitionStatistics.POSITION_DELETE_FILE_COUNT;
import static org.apache.iceberg.PartitionStatistics.POSITION_DELETE_RECORD_COUNT;
import static org.apache.iceberg.PartitionStatistics.SPEC_ID;
import static org.apache.iceberg.PartitionStatistics.TOTAL_DATA_FILE_SIZE_IN_BYTES;
import static org.apache.iceberg.PartitionStatistics.TOTAL_RECORD_COUNT;
import static org.apache.iceberg.types.Types.NestedField.optional;

import java.io.File;
Expand Down Expand Up @@ -60,7 +60,7 @@ public abstract class PartitionStatisticsTestBase {
protected Schema invalidOldSchema(Types.StructType unifiedPartitionType) {
// field ids starts from 0 instead of 1
return new Schema(
Types.NestedField.required(0, PARTITION_FIELD_NAME, unifiedPartitionType),
Types.NestedField.required(0, EMPTY_PARTITION_FIELD.name(), unifiedPartitionType),
Types.NestedField.required(1, SPEC_ID.name(), Types.IntegerType.get()),
Types.NestedField.required(2, DATA_RECORD_COUNT.name(), Types.LongType.get()),
Types.NestedField.required(3, DATA_FILE_COUNT.name(), Types.IntegerType.get()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
package org.apache.iceberg;

import static org.apache.iceberg.PartitionStatsHandler.PARTITION_FIELD_ID;
import static org.apache.iceberg.PartitionStatistics.EMPTY_PARTITION_FIELD;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -159,10 +159,11 @@ public void testAllDatatypePartitionWriting() throws Exception {
fileFormatProperty);

Types.StructType partitionSchema = Partitioning.partitionType(testTable);
Schema dataSchema = PartitionStatsHandler.schema(partitionSchema, formatVersion);
Schema dataSchema = PartitionStatistics.schema(partitionSchema, formatVersion);

PartitionData partitionData =
new PartitionData(dataSchema.findField(PARTITION_FIELD_ID).type().asStructType());
new PartitionData(
dataSchema.findField(EMPTY_PARTITION_FIELD.fieldId()).type().asStructType());
partitionData.set(0, true);
partitionData.set(1, 42);
partitionData.set(2, 42L);
Expand Down Expand Up @@ -228,12 +229,12 @@ public void testOptionalFieldsWriting() throws Exception {
fileFormatProperty);

Types.StructType partitionSchema = Partitioning.partitionType(testTable);
Schema dataSchema = PartitionStatsHandler.schema(partitionSchema, formatVersion);
Schema dataSchema = PartitionStatistics.schema(partitionSchema, formatVersion);

ImmutableList.Builder<PartitionStatistics> partitionListBuilder = ImmutableList.builder();
for (int i = 0; i < 5; i++) {
PartitionStatistics stats =
randomStats(dataSchema.findField(PARTITION_FIELD_ID).type().asStructType());
randomStats(dataSchema.findField(EMPTY_PARTITION_FIELD.fieldId()).type().asStructType());
stats.set(PartitionStatistics.POSITION_DELETE_RECORD_COUNT_POSITION, null);
stats.set(PartitionStatistics.POSITION_DELETE_FILE_COUNT_POSITION, null);
stats.set(PartitionStatistics.EQUALITY_DELETE_RECORD_COUNT_POSITION, null);
Expand Down Expand Up @@ -330,10 +331,10 @@ public void testPartitionStats() throws Exception {
}

Snapshot snapshot1 = testTable.currentSnapshot();
Schema recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable), 2);
Schema recordSchema = PartitionStatistics.schema(Partitioning.partitionType(testTable), 2);

Types.StructType partitionType =
recordSchema.findField(PARTITION_FIELD_ID).type().asStructType();
recordSchema.findField(EMPTY_PARTITION_FIELD.fieldId()).type().asStructType();
computeAndValidatePartitionStats(
testTable,
recordSchema,
Expand Down Expand Up @@ -409,7 +410,7 @@ public void testPartitionStats() throws Exception {
testTable.newRowDelta().addDeletes(dv).commit();
Snapshot snapshot4 = testTable.currentSnapshot();

recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable), 3);
recordSchema = PartitionStatistics.schema(Partitioning.partitionType(testTable), 3);

computeAndValidatePartitionStats(
testTable,
Expand Down Expand Up @@ -600,7 +601,7 @@ public void testReadingStatsWithInvalidSchema() throws Exception {
Table testTable =
TestTables.create(tempDir("old_schema"), "old_schema", SCHEMA, spec, 2, fileFormatProperty);
Types.StructType partitionType = Partitioning.partitionType(testTable);
Schema newSchema = PartitionStatsHandler.schema(partitionType, 2);
Schema newSchema = PartitionStatistics.schema(partitionType, 2);
Schema oldSchema = invalidOldSchema(partitionType);

PartitionStatisticsFile invalidStatisticsFile =
Expand Down Expand Up @@ -681,7 +682,7 @@ public void testV2toV3SchemaEvolution() throws Exception {
Types.StructType partitionSchema = Partitioning.partitionType(testTable);

// read with v2 schema
Schema v2Schema = PartitionStatsHandler.schema(partitionSchema, 2);
Schema v2Schema = PartitionStatistics.schema(partitionSchema, 2);
List<PartitionStats> partitionStatsV2;
try (CloseableIterable<PartitionStats> recordIterator =
PartitionStatsHandler.readPartitionStatsFile(
Expand All @@ -690,7 +691,7 @@ public void testV2toV3SchemaEvolution() throws Exception {
}

// read with v3 schema
Schema v3Schema = PartitionStatsHandler.schema(partitionSchema, 3);
Schema v3Schema = PartitionStatistics.schema(partitionSchema, 3);
List<PartitionStats> partitionStatsV3;
try (CloseableIterable<PartitionStats> recordIterator =
PartitionStatsHandler.readPartitionStatsFile(
Expand Down