diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 404c19aef224..bea1b7d9108f 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -37,7 +37,7 @@ awssdk-s3accessgrants = "2.3.0" bson-ver = "4.11.5" caffeine = "2.9.3" calcite = "1.40.0" -comet = "0.8.1" +comet = "0.10.0" datasketches = "6.2.0" delta-standalone = "3.3.2" delta-spark = "3.3.2" diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index 0626d0b43985..bc8c29d1eb43 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -151,7 +151,7 @@ private OrcBatchReadConf orcBatchReadConf() { // - Parquet vectorization is enabled // - only primitives or metadata columns are projected // - all tasks are of FileScanTask type and read only Parquet files - private boolean useParquetBatchReads() { + protected boolean useParquetBatchReads() { return readConf.parquetVectorizationEnabled() && expectedSchema.columns().stream().allMatch(this::supportsParquetBatchReads) && taskGroups.stream().allMatch(this::supportsParquetBatchReads); @@ -175,7 +175,7 @@ private boolean supportsParquetBatchReads(Types.NestedField field) { return field.type().isPrimitiveType() || MetadataColumns.isMetadataColumn(field.fieldId()); } - private boolean useCometBatchReads() { + protected boolean useCometBatchReads() { return readConf.parquetVectorizationEnabled() && readConf.parquetReaderType() == ParquetReaderType.COMET && expectedSchema.columns().stream().allMatch(this::supportsCometBatchReads) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index 106b296de098..967b0d41d081 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -24,6 +24,7 @@ import java.util.Optional; import java.util.function.Supplier; import java.util.stream.Collectors; +import org.apache.comet.parquet.SupportsComet; import org.apache.iceberg.BlobMetadata; import org.apache.iceberg.ScanTask; import org.apache.iceberg.ScanTaskGroup; @@ -95,7 +96,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -abstract class SparkScan implements Scan, SupportsReportStatistics { +abstract class SparkScan implements Scan, SupportsReportStatistics, SupportsComet { private static final Logger LOG = LoggerFactory.getLogger(SparkScan.class); private static final String NDV_KEY = "ndv"; @@ -351,4 +352,10 @@ protected long adjustSplitSize(List tasks, long splitSize) { return splitSize; } } + + @Override + public boolean isCometEnabled() { + SparkBatch batch = (SparkBatch) this.toBatch(); + return batch.useCometBatchReads(); + } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index 0626d0b43985..74512504458a 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -151,7 +151,7 @@ private OrcBatchReadConf orcBatchReadConf() { // - Parquet vectorization is enabled // - only primitives or metadata columns are projected // - all tasks are of FileScanTask type and read only Parquet files - private boolean useParquetBatchReads() { + protected boolean useParquetBatchReads() { return readConf.parquetVectorizationEnabled() && expectedSchema.columns().stream().allMatch(this::supportsParquetBatchReads) && taskGroups.stream().allMatch(this::supportsParquetBatchReads); @@ -175,19 +175,7 @@ private boolean supportsParquetBatchReads(Types.NestedField field) { return field.type().isPrimitiveType() || MetadataColumns.isMetadataColumn(field.fieldId()); } - private boolean useCometBatchReads() { - return readConf.parquetVectorizationEnabled() - && readConf.parquetReaderType() == ParquetReaderType.COMET - && expectedSchema.columns().stream().allMatch(this::supportsCometBatchReads) - && taskGroups.stream().allMatch(this::supportsParquetBatchReads); - } - - private boolean supportsCometBatchReads(Types.NestedField field) { - return field.type().isPrimitiveType() - && !field.type().typeId().equals(Type.TypeID.UUID) - && field.fieldId() != MetadataColumns.ROW_ID.fieldId() - && field.fieldId() != MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(); - } +q // conditions for using ORC batch reads: // - ORC vectorization is enabled diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index 106b296de098..967b0d41d081 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -24,6 +24,7 @@ import java.util.Optional; import java.util.function.Supplier; import java.util.stream.Collectors; +import org.apache.comet.parquet.SupportsComet; import org.apache.iceberg.BlobMetadata; import org.apache.iceberg.ScanTask; import org.apache.iceberg.ScanTaskGroup; @@ -95,7 +96,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -abstract class SparkScan implements Scan, SupportsReportStatistics { +abstract class SparkScan implements Scan, SupportsReportStatistics, SupportsComet { private static final Logger LOG = LoggerFactory.getLogger(SparkScan.class); private static final String NDV_KEY = "ndv"; @@ -351,4 +352,10 @@ protected long adjustSplitSize(List tasks, long splitSize) { return splitSize; } } + + @Override + public boolean isCometEnabled() { + SparkBatch batch = (SparkBatch) this.toBatch(); + return batch.useCometBatchReads(); + } } diff --git a/spark/v4.0/build.gradle b/spark/v4.0/build.gradle index 825bd17010ec..8f26055d239b 100644 --- a/spark/v4.0/build.gradle +++ b/spark/v4.0/build.gradle @@ -80,7 +80,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { exclude group: 'org.roaringbitmap' } - compileOnly "org.apache.datafusion:comet-spark-spark3.5_2.13:${libs.versions.comet.get()}" + compileOnly "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_2.13:${libs.versions.comet.get()}" implementation libs.parquet.column implementation libs.parquet.hadoop @@ -190,7 +190,7 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer testImplementation libs.avro.avro testImplementation libs.parquet.hadoop testImplementation libs.awaitility - testImplementation "org.apache.datafusion:comet-spark-spark3.5_2.13:${libs.versions.comet.get()}" + testImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_2.13:${libs.versions.comet.get()}" testImplementation(testFixtures(project(':iceberg-parquet'))) // Required because we remove antlr plugin dependencies from the compile configuration, see note above diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index 0626d0b43985..a15f0ca75f4c 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -175,7 +175,7 @@ private boolean supportsParquetBatchReads(Types.NestedField field) { return field.type().isPrimitiveType() || MetadataColumns.isMetadataColumn(field.fieldId()); } - private boolean useCometBatchReads() { + protected boolean useCometBatchReads() { return readConf.parquetVectorizationEnabled() && readConf.parquetReaderType() == ParquetReaderType.COMET && expectedSchema.columns().stream().allMatch(this::supportsCometBatchReads) diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index 106b296de098..967b0d41d081 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -24,6 +24,7 @@ import java.util.Optional; import java.util.function.Supplier; import java.util.stream.Collectors; +import org.apache.comet.parquet.SupportsComet; import org.apache.iceberg.BlobMetadata; import org.apache.iceberg.ScanTask; import org.apache.iceberg.ScanTaskGroup; @@ -95,7 +96,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -abstract class SparkScan implements Scan, SupportsReportStatistics { +abstract class SparkScan implements Scan, SupportsReportStatistics, SupportsComet { private static final Logger LOG = LoggerFactory.getLogger(SparkScan.class); private static final String NDV_KEY = "ndv"; @@ -351,4 +352,10 @@ protected long adjustSplitSize(List tasks, long splitSize) { return splitSize; } } + + @Override + public boolean isCometEnabled() { + SparkBatch batch = (SparkBatch) this.toBatch(); + return batch.useCometBatchReads(); + } }