From cfcde40a829a05650ad6b4d16edcb3cfd0322b96 Mon Sep 17 00:00:00 2001 From: hsiang-c Date: Tue, 12 Aug 2025 11:41:15 -0700 Subject: [PATCH 1/4] Enable Comet Scan in Iceberg Spark --- gradle/libs.versions.toml | 2 +- .../apache/iceberg/spark/source/SparkBatch.java | 2 +- .../org/apache/iceberg/spark/source/SparkScan.java | 14 +++++++++++++- .../apache/iceberg/spark/source/SparkBatch.java | 2 +- .../org/apache/iceberg/spark/source/SparkScan.java | 14 +++++++++++++- spark/v4.0/build.gradle | 4 ++-- .../apache/iceberg/spark/source/SparkBatch.java | 2 +- .../org/apache/iceberg/spark/source/SparkScan.java | 14 +++++++++++++- 8 files changed, 45 insertions(+), 9 deletions(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 9896f366d0e8..2e15f1545dbc 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -37,7 +37,7 @@ awssdk-s3accessgrants = "2.3.0" bson-ver = "4.11.5" caffeine = "2.9.3" calcite = "1.40.0" -comet = "0.8.1" +comet = "0.10.0-SNAPSHOT" datasketches = "6.2.0" delta-standalone = "3.3.2" delta-spark = "3.3.2" diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index db4230f2f6f3..99e827c69cb9 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -148,7 +148,7 @@ private OrcBatchReadConf orcBatchReadConf() { // - Parquet vectorization is enabled // - only primitives or metadata columns are projected // - all tasks are of FileScanTask type and read only Parquet files - private boolean useParquetBatchReads() { + protected boolean useParquetBatchReads() { return readConf.parquetVectorizationEnabled() && expectedSchema.columns().stream().allMatch(this::supportsParquetBatchReads) && taskGroups.stream().allMatch(this::supportsParquetBatchReads); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index 106b296de098..4295a64438c3 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -24,6 +24,7 @@ import java.util.Optional; import java.util.function.Supplier; import java.util.stream.Collectors; +import org.apache.comet.parquet.SupportsComet; import org.apache.iceberg.BlobMetadata; import org.apache.iceberg.ScanTask; import org.apache.iceberg.ScanTaskGroup; @@ -37,6 +38,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Strings; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.ParquetReaderType; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkSchemaUtil; @@ -95,7 +97,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -abstract class SparkScan implements Scan, SupportsReportStatistics { +abstract class SparkScan implements Scan, SupportsReportStatistics, SupportsComet { private static final Logger LOG = LoggerFactory.getLogger(SparkScan.class); private static final String NDV_KEY = "ndv"; @@ -351,4 +353,14 @@ protected long adjustSplitSize(List tasks, long splitSize) { return splitSize; } } + + @Override + public boolean isCometEnabled() { + if (readConf.parquetReaderType() == ParquetReaderType.COMET) { + SparkBatch batch = (SparkBatch) this.toBatch(); + return batch.useParquetBatchReads(); + } + + return false; + } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index db4230f2f6f3..99e827c69cb9 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -148,7 +148,7 @@ private OrcBatchReadConf orcBatchReadConf() { // - Parquet vectorization is enabled // - only primitives or metadata columns are projected // - all tasks are of FileScanTask type and read only Parquet files - private boolean useParquetBatchReads() { + protected boolean useParquetBatchReads() { return readConf.parquetVectorizationEnabled() && expectedSchema.columns().stream().allMatch(this::supportsParquetBatchReads) && taskGroups.stream().allMatch(this::supportsParquetBatchReads); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index 106b296de098..4295a64438c3 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -24,6 +24,7 @@ import java.util.Optional; import java.util.function.Supplier; import java.util.stream.Collectors; +import org.apache.comet.parquet.SupportsComet; import org.apache.iceberg.BlobMetadata; import org.apache.iceberg.ScanTask; import org.apache.iceberg.ScanTaskGroup; @@ -37,6 +38,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Strings; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.ParquetReaderType; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkSchemaUtil; @@ -95,7 +97,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -abstract class SparkScan implements Scan, SupportsReportStatistics { +abstract class SparkScan implements Scan, SupportsReportStatistics, SupportsComet { private static final Logger LOG = LoggerFactory.getLogger(SparkScan.class); private static final String NDV_KEY = "ndv"; @@ -351,4 +353,14 @@ protected long adjustSplitSize(List tasks, long splitSize) { return splitSize; } } + + @Override + public boolean isCometEnabled() { + if (readConf.parquetReaderType() == ParquetReaderType.COMET) { + SparkBatch batch = (SparkBatch) this.toBatch(); + return batch.useParquetBatchReads(); + } + + return false; + } } diff --git a/spark/v4.0/build.gradle b/spark/v4.0/build.gradle index 9c7ea06f9938..d190c9955ecc 100644 --- a/spark/v4.0/build.gradle +++ b/spark/v4.0/build.gradle @@ -80,7 +80,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { exclude group: 'org.roaringbitmap' } - compileOnly "org.apache.datafusion:comet-spark-spark3.5_2.13:${libs.versions.comet.get()}" + compileOnly "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_2.13:${libs.versions.comet.get()}" implementation libs.parquet.column implementation libs.parquet.hadoop @@ -189,7 +189,7 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer testImplementation libs.avro.avro testImplementation libs.parquet.hadoop testImplementation libs.awaitility - testImplementation "org.apache.datafusion:comet-spark-spark3.5_2.13:${libs.versions.comet.get()}" + testImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_2.13:${libs.versions.comet.get()}" // Required because we remove antlr plugin dependencies from the compile configuration, see note above runtimeOnly libs.antlr.runtime413 diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index db4230f2f6f3..99e827c69cb9 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -148,7 +148,7 @@ private OrcBatchReadConf orcBatchReadConf() { // - Parquet vectorization is enabled // - only primitives or metadata columns are projected // - all tasks are of FileScanTask type and read only Parquet files - private boolean useParquetBatchReads() { + protected boolean useParquetBatchReads() { return readConf.parquetVectorizationEnabled() && expectedSchema.columns().stream().allMatch(this::supportsParquetBatchReads) && taskGroups.stream().allMatch(this::supportsParquetBatchReads); diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index 106b296de098..4295a64438c3 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -24,6 +24,7 @@ import java.util.Optional; import java.util.function.Supplier; import java.util.stream.Collectors; +import org.apache.comet.parquet.SupportsComet; import org.apache.iceberg.BlobMetadata; import org.apache.iceberg.ScanTask; import org.apache.iceberg.ScanTaskGroup; @@ -37,6 +38,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Strings; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.ParquetReaderType; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkSchemaUtil; @@ -95,7 +97,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -abstract class SparkScan implements Scan, SupportsReportStatistics { +abstract class SparkScan implements Scan, SupportsReportStatistics, SupportsComet { private static final Logger LOG = LoggerFactory.getLogger(SparkScan.class); private static final String NDV_KEY = "ndv"; @@ -351,4 +353,14 @@ protected long adjustSplitSize(List tasks, long splitSize) { return splitSize; } } + + @Override + public boolean isCometEnabled() { + if (readConf.parquetReaderType() == ParquetReaderType.COMET) { + SparkBatch batch = (SparkBatch) this.toBatch(); + return batch.useParquetBatchReads(); + } + + return false; + } } From 35b4dde84acfea771fe572db937e4869c2e480df Mon Sep 17 00:00:00 2001 From: hsiang-c Date: Tue, 12 Aug 2025 22:14:04 -0700 Subject: [PATCH 2/4] Fallback to Spark whenever there are delete files --- .../iceberg/spark/source/SparkBatch.java | 19 +++++++++++++++-- .../iceberg/spark/source/SparkBatch.java | 19 +++++++++++++++-- .../iceberg/spark/source/SparkBatch.java | 21 ++++++++++++++++--- 3 files changed, 52 insertions(+), 7 deletions(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index 99e827c69cb9..2714d54bfd01 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -172,11 +172,11 @@ private boolean supportsParquetBatchReads(Types.NestedField field) { return field.type().isPrimitiveType() || MetadataColumns.isMetadataColumn(field.fieldId()); } - private boolean useCometBatchReads() { + protected boolean useCometBatchReads() { return readConf.parquetVectorizationEnabled() && readConf.parquetReaderType() == ParquetReaderType.COMET && expectedSchema.columns().stream().allMatch(this::supportsCometBatchReads) - && taskGroups.stream().allMatch(this::supportsParquetBatchReads); + && taskGroups.stream().allMatch(this::supportsCometBatchReads); } private boolean supportsCometBatchReads(Types.NestedField field) { @@ -186,6 +186,21 @@ private boolean supportsCometBatchReads(Types.NestedField field) { && field.fieldId() != MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(); } + private boolean supportsCometBatchReads(ScanTask task) { + if (task instanceof ScanTaskGroup) { + ScanTaskGroup taskGroup = (ScanTaskGroup) task; + return taskGroup.tasks().stream().allMatch(this::supportsCometBatchReads); + + } else if (task.isFileScanTask() && !task.isDataTask()) { + FileScanTask fileScanTask = task.asFileScanTask(); + // Comet can't handle delete files for now + return fileScanTask.file().format() == FileFormat.PARQUET && fileScanTask.deletes().isEmpty(); + + } else { + return false; + } + } + // conditions for using ORC batch reads: // - ORC vectorization is enabled // - all tasks are of type FileScanTask and read only ORC files with no delete files diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index 99e827c69cb9..2714d54bfd01 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -172,11 +172,11 @@ private boolean supportsParquetBatchReads(Types.NestedField field) { return field.type().isPrimitiveType() || MetadataColumns.isMetadataColumn(field.fieldId()); } - private boolean useCometBatchReads() { + protected boolean useCometBatchReads() { return readConf.parquetVectorizationEnabled() && readConf.parquetReaderType() == ParquetReaderType.COMET && expectedSchema.columns().stream().allMatch(this::supportsCometBatchReads) - && taskGroups.stream().allMatch(this::supportsParquetBatchReads); + && taskGroups.stream().allMatch(this::supportsCometBatchReads); } private boolean supportsCometBatchReads(Types.NestedField field) { @@ -186,6 +186,21 @@ private boolean supportsCometBatchReads(Types.NestedField field) { && field.fieldId() != MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(); } + private boolean supportsCometBatchReads(ScanTask task) { + if (task instanceof ScanTaskGroup) { + ScanTaskGroup taskGroup = (ScanTaskGroup) task; + return taskGroup.tasks().stream().allMatch(this::supportsCometBatchReads); + + } else if (task.isFileScanTask() && !task.isDataTask()) { + FileScanTask fileScanTask = task.asFileScanTask(); + // Comet can't handle delete files for now + return fileScanTask.file().format() == FileFormat.PARQUET && fileScanTask.deletes().isEmpty(); + + } else { + return false; + } + } + // conditions for using ORC batch reads: // - ORC vectorization is enabled // - all tasks are of type FileScanTask and read only ORC files with no delete files diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index 99e827c69cb9..449c7f97295b 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -148,7 +148,7 @@ private OrcBatchReadConf orcBatchReadConf() { // - Parquet vectorization is enabled // - only primitives or metadata columns are projected // - all tasks are of FileScanTask type and read only Parquet files - protected boolean useParquetBatchReads() { + private boolean useParquetBatchReads() { return readConf.parquetVectorizationEnabled() && expectedSchema.columns().stream().allMatch(this::supportsParquetBatchReads) && taskGroups.stream().allMatch(this::supportsParquetBatchReads); @@ -172,11 +172,11 @@ private boolean supportsParquetBatchReads(Types.NestedField field) { return field.type().isPrimitiveType() || MetadataColumns.isMetadataColumn(field.fieldId()); } - private boolean useCometBatchReads() { + protected boolean useCometBatchReads() { return readConf.parquetVectorizationEnabled() && readConf.parquetReaderType() == ParquetReaderType.COMET && expectedSchema.columns().stream().allMatch(this::supportsCometBatchReads) - && taskGroups.stream().allMatch(this::supportsParquetBatchReads); + && taskGroups.stream().allMatch(this::supportsCometBatchReads); } private boolean supportsCometBatchReads(Types.NestedField field) { @@ -186,6 +186,21 @@ private boolean supportsCometBatchReads(Types.NestedField field) { && field.fieldId() != MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(); } + private boolean supportsCometBatchReads(ScanTask task) { + if (task instanceof ScanTaskGroup) { + ScanTaskGroup taskGroup = (ScanTaskGroup) task; + return taskGroup.tasks().stream().allMatch(this::supportsCometBatchReads); + + } else if (task.isFileScanTask() && !task.isDataTask()) { + FileScanTask fileScanTask = task.asFileScanTask(); + // Comet can't handle delete files for now + return fileScanTask.file().format() == FileFormat.PARQUET && fileScanTask.deletes().isEmpty(); + + } else { + return false; + } + } + // conditions for using ORC batch reads: // - ORC vectorization is enabled // - all tasks are of type FileScanTask and read only ORC files with no delete files From 92c9376c30097333a7eea198ceabed75487fcf5f Mon Sep 17 00:00:00 2001 From: hsiang-c Date: Tue, 12 Aug 2025 22:14:16 -0700 Subject: [PATCH 3/4] Use useCometBatchReads --- .../java/org/apache/iceberg/spark/source/SparkScan.java | 9 ++------- .../java/org/apache/iceberg/spark/source/SparkScan.java | 9 ++------- .../java/org/apache/iceberg/spark/source/SparkScan.java | 9 ++------- 3 files changed, 6 insertions(+), 21 deletions(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index 4295a64438c3..967b0d41d081 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -38,7 +38,6 @@ import org.apache.iceberg.relocated.com.google.common.base.Strings; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.spark.ParquetReaderType; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkSchemaUtil; @@ -356,11 +355,7 @@ protected long adjustSplitSize(List tasks, long splitSize) { @Override public boolean isCometEnabled() { - if (readConf.parquetReaderType() == ParquetReaderType.COMET) { - SparkBatch batch = (SparkBatch) this.toBatch(); - return batch.useParquetBatchReads(); - } - - return false; + SparkBatch batch = (SparkBatch) this.toBatch(); + return batch.useCometBatchReads(); } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index 4295a64438c3..967b0d41d081 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -38,7 +38,6 @@ import org.apache.iceberg.relocated.com.google.common.base.Strings; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.spark.ParquetReaderType; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkSchemaUtil; @@ -356,11 +355,7 @@ protected long adjustSplitSize(List tasks, long splitSize) { @Override public boolean isCometEnabled() { - if (readConf.parquetReaderType() == ParquetReaderType.COMET) { - SparkBatch batch = (SparkBatch) this.toBatch(); - return batch.useParquetBatchReads(); - } - - return false; + SparkBatch batch = (SparkBatch) this.toBatch(); + return batch.useCometBatchReads(); } } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index 4295a64438c3..967b0d41d081 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -38,7 +38,6 @@ import org.apache.iceberg.relocated.com.google.common.base.Strings; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.spark.ParquetReaderType; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkSchemaUtil; @@ -356,11 +355,7 @@ protected long adjustSplitSize(List tasks, long splitSize) { @Override public boolean isCometEnabled() { - if (readConf.parquetReaderType() == ParquetReaderType.COMET) { - SparkBatch batch = (SparkBatch) this.toBatch(); - return batch.useParquetBatchReads(); - } - - return false; + SparkBatch batch = (SparkBatch) this.toBatch(); + return batch.useCometBatchReads(); } } From 90b4b4fa17a2a403260a6731c8772fdb5341dedf Mon Sep 17 00:00:00 2001 From: hsiang-c Date: Wed, 17 Sep 2025 07:54:35 +0800 Subject: [PATCH 4/4] Use Comet 0.10.0 w/ deletion support --- gradle/libs.versions.toml | 2 +- .../iceberg/spark/source/SparkBatch.java | 17 +---------- .../iceberg/spark/source/SparkBatch.java | 29 +------------------ .../iceberg/spark/source/SparkBatch.java | 17 +---------- 4 files changed, 4 insertions(+), 61 deletions(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 2e15f1545dbc..e89059d19567 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -37,7 +37,7 @@ awssdk-s3accessgrants = "2.3.0" bson-ver = "4.11.5" caffeine = "2.9.3" calcite = "1.40.0" -comet = "0.10.0-SNAPSHOT" +comet = "0.10.0" datasketches = "6.2.0" delta-standalone = "3.3.2" delta-spark = "3.3.2" diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index 2714d54bfd01..c5f1723ea791 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -176,7 +176,7 @@ protected boolean useCometBatchReads() { return readConf.parquetVectorizationEnabled() && readConf.parquetReaderType() == ParquetReaderType.COMET && expectedSchema.columns().stream().allMatch(this::supportsCometBatchReads) - && taskGroups.stream().allMatch(this::supportsCometBatchReads); + && taskGroups.stream().allMatch(this::supportsParquetBatchReads); } private boolean supportsCometBatchReads(Types.NestedField field) { @@ -186,21 +186,6 @@ private boolean supportsCometBatchReads(Types.NestedField field) { && field.fieldId() != MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(); } - private boolean supportsCometBatchReads(ScanTask task) { - if (task instanceof ScanTaskGroup) { - ScanTaskGroup taskGroup = (ScanTaskGroup) task; - return taskGroup.tasks().stream().allMatch(this::supportsCometBatchReads); - - } else if (task.isFileScanTask() && !task.isDataTask()) { - FileScanTask fileScanTask = task.asFileScanTask(); - // Comet can't handle delete files for now - return fileScanTask.file().format() == FileFormat.PARQUET && fileScanTask.deletes().isEmpty(); - - } else { - return false; - } - } - // conditions for using ORC batch reads: // - ORC vectorization is enabled // - all tasks are of type FileScanTask and read only ORC files with no delete files diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index 2714d54bfd01..1645d613b9ef 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -172,34 +172,7 @@ private boolean supportsParquetBatchReads(Types.NestedField field) { return field.type().isPrimitiveType() || MetadataColumns.isMetadataColumn(field.fieldId()); } - protected boolean useCometBatchReads() { - return readConf.parquetVectorizationEnabled() - && readConf.parquetReaderType() == ParquetReaderType.COMET - && expectedSchema.columns().stream().allMatch(this::supportsCometBatchReads) - && taskGroups.stream().allMatch(this::supportsCometBatchReads); - } - - private boolean supportsCometBatchReads(Types.NestedField field) { - return field.type().isPrimitiveType() - && !field.type().typeId().equals(Type.TypeID.UUID) - && field.fieldId() != MetadataColumns.ROW_ID.fieldId() - && field.fieldId() != MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(); - } - - private boolean supportsCometBatchReads(ScanTask task) { - if (task instanceof ScanTaskGroup) { - ScanTaskGroup taskGroup = (ScanTaskGroup) task; - return taskGroup.tasks().stream().allMatch(this::supportsCometBatchReads); - - } else if (task.isFileScanTask() && !task.isDataTask()) { - FileScanTask fileScanTask = task.asFileScanTask(); - // Comet can't handle delete files for now - return fileScanTask.file().format() == FileFormat.PARQUET && fileScanTask.deletes().isEmpty(); - - } else { - return false; - } - } +q // conditions for using ORC batch reads: // - ORC vectorization is enabled diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index 449c7f97295b..30229d5e5e31 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -176,7 +176,7 @@ protected boolean useCometBatchReads() { return readConf.parquetVectorizationEnabled() && readConf.parquetReaderType() == ParquetReaderType.COMET && expectedSchema.columns().stream().allMatch(this::supportsCometBatchReads) - && taskGroups.stream().allMatch(this::supportsCometBatchReads); + && taskGroups.stream().allMatch(this::supportsParquetBatchReads); } private boolean supportsCometBatchReads(Types.NestedField field) { @@ -186,21 +186,6 @@ private boolean supportsCometBatchReads(Types.NestedField field) { && field.fieldId() != MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(); } - private boolean supportsCometBatchReads(ScanTask task) { - if (task instanceof ScanTaskGroup) { - ScanTaskGroup taskGroup = (ScanTaskGroup) task; - return taskGroup.tasks().stream().allMatch(this::supportsCometBatchReads); - - } else if (task.isFileScanTask() && !task.isDataTask()) { - FileScanTask fileScanTask = task.asFileScanTask(); - // Comet can't handle delete files for now - return fileScanTask.file().format() == FileFormat.PARQUET && fileScanTask.deletes().isEmpty(); - - } else { - return false; - } - } - // conditions for using ORC batch reads: // - ORC vectorization is enabled // - all tasks are of type FileScanTask and read only ORC files with no delete files