From 8df3ab41dae529c88840dce747c41210a210effc Mon Sep 17 00:00:00 2001 From: huaxingao Date: Tue, 24 Jun 2025 15:20:26 -0700 Subject: [PATCH 1/7] Spark:3.4 Encapsulate parquet objects for Comet --- build.gradle | 8 + .../parquet/CometVectorizedParquetReader.java | 315 ++++++++++++++++++ .../org/apache/iceberg/parquet/Parquet.java | 47 ++- .../org/apache/iceberg/parquet/ReadConf.java | 8 + .../data/vectorized/CometColumnReader.java | 92 ++++- .../vectorized/CometColumnarBatchReader.java | 19 +- .../CometVectorizedReaderBuilder.java | 3 +- .../iceberg/spark/source/BaseBatchReader.java | 1 + 8 files changed, 467 insertions(+), 26 deletions(-) create mode 100644 parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java diff --git a/build.gradle b/build.gradle index bf09d1b492ff..79603391c2ea 100644 --- a/build.gradle +++ b/build.gradle @@ -845,6 +845,7 @@ project(':iceberg-orc') { } project(':iceberg-parquet') { + test { useJUnitPlatform() } @@ -854,6 +855,13 @@ project(':iceberg-parquet') { implementation project(':iceberg-core') implementation project(':iceberg-common') + implementation("org.apache.datafusion:comet-spark-spark${ sparkVersionsString}_${scalaVersion}:${libs.versions.comet.get()}") { + exclude group: 'org.apache.arrow' + exclude group: 'org.apache.parquet' + exclude group: 'org.apache.spark' + exclude group: 'org.apache.iceberg' + } + implementation(libs.parquet.avro) { exclude group: 'org.apache.avro', module: 'avro' // already shaded by Parquet diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java new file mode 100644 index 000000000000..5b5555779e4d --- /dev/null +++ b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.function.Function; +import org.apache.comet.parquet.FileReader; +import org.apache.comet.parquet.ParquetColumnSpec; +import org.apache.comet.parquet.ReadOptions; +import org.apache.comet.parquet.RowGroupReader; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.Schema; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.hadoop.HadoopInputFile; +import org.apache.iceberg.io.CloseableGroup; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.ByteBuffers; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; + +public class CometVectorizedParquetReader extends CloseableGroup + implements CloseableIterable { + private final InputFile input; + private final ParquetReadOptions options; + private final Schema expectedSchema; + private final Function> batchReaderFunc; + private final Expression filter; + private final boolean reuseContainers; + private final boolean caseSensitive; + private final int batchSize; + private final NameMapping nameMapping; + private final Map properties; + private Long start = null; + private Long length = null; + private ByteBuffer fileEncryptionKey = null; + private ByteBuffer fileAADPrefix = null; + + public CometVectorizedParquetReader( + InputFile input, + Schema expectedSchema, + ParquetReadOptions options, + Function> readerFunc, + NameMapping nameMapping, + Expression filter, + boolean reuseContainers, + boolean caseSensitive, + int maxRecordsPerBatch, + Map properties, + Long start, + Long length, + ByteBuffer fileEncryptionKey, + ByteBuffer fileAADPrefix) { + this.input = input; + this.expectedSchema = expectedSchema; + this.options = options; + this.batchReaderFunc = readerFunc; + // replace alwaysTrue with null to avoid extra work evaluating a trivial filter + this.filter = filter == Expressions.alwaysTrue() ? null : filter; + this.reuseContainers = reuseContainers; + this.caseSensitive = caseSensitive; + this.batchSize = maxRecordsPerBatch; + this.nameMapping = nameMapping; + this.properties = properties; + this.start = start; + this.length = length; + this.fileEncryptionKey = fileEncryptionKey; + this.fileAADPrefix = fileAADPrefix; + } + + private ReadConf conf = null; + + private ReadConf init() { + if (conf == null) { + ReadConf readConf = + new ReadConf( + input, + options, + expectedSchema, + filter, + null, + batchReaderFunc, + nameMapping, + reuseContainers, + caseSensitive, + batchSize); + this.conf = readConf.copy(); + return readConf; + } + return conf; + } + + @Override + public CloseableIterator iterator() { + FileIterator iter = + new FileIterator<>(init(), properties, start, length, fileEncryptionKey, fileAADPrefix); + addCloseable(iter); + return iter; + } + + private static class FileIterator implements CloseableIterator { + // private final ParquetFileReader reader; + private final boolean[] shouldSkip; + private final VectorizedReader model; + private final long totalValues; + private final int batchSize; + private final List> columnChunkMetadata; + private final boolean reuseContainers; + private int nextRowGroup = 0; + private long nextRowGroupStart = 0; + private long valuesRead = 0; + private T last = null; + private final FileReader cometReader; + + FileIterator( + ReadConf conf, + Map properties, + Long start, + Long length, + ByteBuffer fileEncryptionKey, + ByteBuffer fileAADPrefix) { + this.shouldSkip = conf.shouldSkip(); + this.totalValues = conf.totalValues(); + this.reuseContainers = conf.reuseContainers(); + this.model = conf.vectorizedModel(); + this.batchSize = conf.batchSize(); + this.model.setBatchSize(this.batchSize); + this.columnChunkMetadata = conf.columnChunkMetadataForRowGroups(); + this.cometReader = + newCometReader( + conf.file(), + conf.projection(), + properties, + start, + length, + fileEncryptionKey, + fileAADPrefix); + } + + private FileReader newCometReader( + InputFile file, + MessageType projection, + Map properties, + Long start, + Long length, + ByteBuffer fileEncryptionKey, + ByteBuffer fileAADPrefix) { + try { + ReadOptions cometOptions = ReadOptions.builder(new Configuration()).build(); + + FileReader fileReader = + new FileReader( + ((HadoopInputFile) file).getPath(), + new Configuration(((HadoopInputFile) file).getConf()), + cometOptions, + properties, + start, + length, + ByteBuffers.toByteArray(fileEncryptionKey), + ByteBuffers.toByteArray(fileAADPrefix)); + + List columnDescriptors = projection.getColumns(); + + List specs = Lists.newArrayList(); + + for (ColumnDescriptor descriptor : columnDescriptors) { + String[] path = descriptor.getPath(); + PrimitiveType primitiveType = descriptor.getPrimitiveType(); + String physicalType = primitiveType.getPrimitiveTypeName().name(); + + int typeLength = + primitiveType.getPrimitiveTypeName() + == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY + ? primitiveType.getTypeLength() + : 0; + + boolean isRepeated = primitiveType.getRepetition() == Type.Repetition.REPEATED; + + // ToDo: extract this into a Util method + String logicalTypeName = null; + Map logicalTypeParams = Maps.newHashMap(); + LogicalTypeAnnotation logicalType = primitiveType.getLogicalTypeAnnotation(); + + if (logicalType != null) { + logicalTypeName = logicalType.getClass().getSimpleName(); + + // Handle specific logical types + if (logicalType instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { + LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimal = + (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("precision", String.valueOf(decimal.getPrecision())); + logicalTypeParams.put("scale", String.valueOf(decimal.getScale())); + } else if (logicalType + instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestamp = + (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("isAdjustedToUTC", String.valueOf(timestamp.isAdjustedToUTC())); + logicalTypeParams.put("unit", timestamp.getUnit().name()); + } else if (logicalType instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimeLogicalTypeAnnotation time = + (LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("isAdjustedToUTC", String.valueOf(time.isAdjustedToUTC())); + logicalTypeParams.put("unit", time.getUnit().name()); + } else if (logicalType instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) { + LogicalTypeAnnotation.IntLogicalTypeAnnotation intType = + (LogicalTypeAnnotation.IntLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("isSigned", String.valueOf(intType.isSigned())); + logicalTypeParams.put("bitWidth", String.valueOf(intType.getBitWidth())); + } + } + + ParquetColumnSpec spec = + new ParquetColumnSpec( + 1, // ToDo: pass in the correct id + path, + physicalType, + typeLength, + isRepeated, + descriptor.getMaxDefinitionLevel(), + descriptor.getMaxRepetitionLevel(), + logicalTypeName, + logicalTypeParams); + specs.add(spec); + } + + fileReader.setRequestedSchemaFromSpecs(specs); + return fileReader; + } catch (IOException e) { + throw new UncheckedIOException("Failed to open Parquet file: " + file.location(), e); + } + } + + @Override + public boolean hasNext() { + return valuesRead < totalValues; + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + if (valuesRead >= nextRowGroupStart) { + advance(); + } + + // batchSize is an integer, so casting to integer is safe + int numValuesToRead = (int) Math.min(nextRowGroupStart - valuesRead, batchSize); + if (reuseContainers) { + this.last = model.read(last, numValuesToRead); + } else { + this.last = model.read(null, numValuesToRead); + } + valuesRead += numValuesToRead; + + return last; + } + + private void advance() { + while (shouldSkip[nextRowGroup]) { + nextRowGroup += 1; + cometReader.skipNextRowGroup(); + } + RowGroupReader pages; + try { + pages = cometReader.readNextRowGroup(); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + + model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup)); + nextRowGroupStart += pages.getRowCount(); + nextRowGroup += 1; + } + + @Override + public void close() throws IOException { + model.close(); + cometReader.close(); + } + } +} diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 6f68fbe150ff..f9df53b038b8 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -1161,6 +1161,7 @@ public static class ReadBuilder implements InternalData.ReadBuilder { private NameMapping nameMapping = null; private ByteBuffer fileEncryptionKey = null; private ByteBuffer fileAADPrefix = null; + private boolean isComet; private ReadBuilder(InputFile file) { this.file = file; @@ -1289,6 +1290,11 @@ public ReadBuilder setCustomType(int fieldId, Class struct throw new UnsupportedOperationException("Custom types are not yet supported"); } + public ReadBuilder enableComet(boolean enableComet) { + this.isComet = enableComet; + return this; + } + public ReadBuilder withFileEncryptionKey(ByteBuffer encryptionKey) { this.fileEncryptionKey = encryptionKey; return this; @@ -1300,7 +1306,7 @@ public ReadBuilder withAADPrefix(ByteBuffer aadPrefix) { } @Override - @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"}) + @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity", "MethodLength"}) public CloseableIterable build() { FileDecryptionProperties fileDecryptionProperties = null; if (fileEncryptionKey != null) { @@ -1352,16 +1358,35 @@ public CloseableIterable build() { } if (batchedReaderFunc != null) { - return new VectorizedParquetReader<>( - file, - schema, - options, - batchedReaderFunc, - mapping, - filter, - reuseContainers, - caseSensitive, - maxRecordsPerBatch); + if (isComet) { + LOG.info("Comet enabled"); + return new CometVectorizedParquetReader<>( + file, + schema, + options, + batchedReaderFunc, + mapping, + filter, + reuseContainers, + caseSensitive, + maxRecordsPerBatch, + properties, + start, + length, + fileEncryptionKey, + fileAADPrefix); + } else { + return new VectorizedParquetReader<>( + file, + schema, + options, + batchedReaderFunc, + mapping, + filter, + reuseContainers, + caseSensitive, + maxRecordsPerBatch); + } } else { Function> readBuilder = readerFuncWithSchema != null diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java index 1fb2372ba568..142e5fbadf1f 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java @@ -157,6 +157,14 @@ ParquetFileReader reader() { return newReader; } + InputFile file() { + return file; + } + + MessageType projection() { + return projection; + } + ParquetValueReader model() { return model; } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java index 81b7d83a7077..1ddd4d0dfd4d 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java @@ -19,18 +19,26 @@ package org.apache.iceberg.spark.data.vectorized; import java.io.IOException; +import java.util.Map; +import org.apache.comet.CometConf; import org.apache.comet.CometSchemaImporter; import org.apache.comet.parquet.AbstractColumnReader; import org.apache.comet.parquet.ColumnReader; +import org.apache.comet.parquet.ParquetColumnSpec; +import org.apache.comet.parquet.RowGroupReader; import org.apache.comet.parquet.TypeUtil; import org.apache.comet.parquet.Utils; import org.apache.comet.shaded.arrow.memory.RootAllocator; import org.apache.iceberg.parquet.VectorizedReader; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.types.Types; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; @@ -42,16 +50,19 @@ class CometColumnReader implements VectorizedReader { private final ColumnDescriptor descriptor; private final DataType sparkType; + private final int fieldId; // The delegated ColumnReader from Comet side private AbstractColumnReader delegate; private boolean initialized = false; private int batchSize = DEFAULT_BATCH_SIZE; private CometSchemaImporter importer; + private ParquetColumnSpec spec; - CometColumnReader(DataType sparkType, ColumnDescriptor descriptor) { + CometColumnReader(DataType sparkType, ColumnDescriptor descriptor, int fieldId) { this.sparkType = sparkType; this.descriptor = descriptor; + this.fieldId = fieldId; } CometColumnReader(Types.NestedField field) { @@ -59,6 +70,7 @@ class CometColumnReader implements VectorizedReader { StructField structField = new StructField(field.name(), dataType, false, Metadata.empty()); this.sparkType = dataType; this.descriptor = TypeUtil.convertToParquet(structField); + this.fieldId = field.fieldId(); } public AbstractColumnReader delegate() { @@ -92,7 +104,77 @@ public void reset() { } this.importer = new CometSchemaImporter(new RootAllocator()); - this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, false, false); + + String[] path = descriptor.getPath(); + PrimitiveType primitiveType = descriptor.getPrimitiveType(); + String physicalType = primitiveType.getPrimitiveTypeName().name(); + + // ToDo: extract this into a Util method + String logicalTypeName = null; + Map logicalTypeParams = Maps.newHashMap(); + LogicalTypeAnnotation logicalType = primitiveType.getLogicalTypeAnnotation(); + + if (logicalType != null) { + logicalTypeName = logicalType.getClass().getSimpleName(); + + // Handle specific logical types + if (logicalType instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { + LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimal = + (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("precision", String.valueOf(decimal.getPrecision())); + logicalTypeParams.put("scale", String.valueOf(decimal.getScale())); + } else if (logicalType instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestamp = + (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("isAdjustedToUTC", String.valueOf(timestamp.isAdjustedToUTC())); + logicalTypeParams.put("unit", timestamp.getUnit().name()); + } else if (logicalType instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimeLogicalTypeAnnotation time = + (LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("isAdjustedToUTC", String.valueOf(time.isAdjustedToUTC())); + logicalTypeParams.put("unit", time.getUnit().name()); + } else if (logicalType instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) { + LogicalTypeAnnotation.IntLogicalTypeAnnotation intType = + (LogicalTypeAnnotation.IntLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("isSigned", String.valueOf(intType.isSigned())); + logicalTypeParams.put("bitWidth", String.valueOf(intType.getBitWidth())); + } + } + + int typeLength = + primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY + ? primitiveType.getTypeLength() + : 0; + boolean isRepeated = primitiveType.getRepetition() == Type.Repetition.REPEATED; + spec = + new ParquetColumnSpec( + fieldId, + path, + physicalType, + typeLength, + isRepeated, + descriptor.getMaxDefinitionLevel(), + descriptor.getMaxRepetitionLevel(), + logicalTypeName, + logicalTypeParams); + + boolean useLegacyTime = + Boolean.parseBoolean( + SQLConf.get() + .getConfString( + CometConf.COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP().key(), "false")); + boolean useLazyMaterialization = + Boolean.parseBoolean( + SQLConf.get().getConfString(CometConf.COMET_USE_LAZY_MATERIALIZATION().key(), "true")); + this.delegate = + Utils.getColumnReader( + sparkType, + spec, + importer, + batchSize, + true, // Comet sets this to true for native execution + useLazyMaterialization, + useLegacyTime); this.initialized = true; } @@ -111,9 +193,9 @@ public DataType sparkType() { *

NOTE: this should be called before reading a new Parquet column chunk, and after {@link * CometColumnReader#reset} is called. */ - public void setPageReader(PageReader pageReader) throws IOException { + public void setPageReader(RowGroupReader pageStore) throws IOException { Preconditions.checkState(initialized, "Invalid state: 'reset' should be called first"); - ((ColumnReader) delegate).setPageReader(pageReader); + ((ColumnReader) delegate).setRowGroupReader(pageStore, spec); } @Override diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java index 04ac69476add..52d19ec2daf7 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java @@ -23,7 +23,8 @@ import java.util.List; import java.util.Map; import org.apache.comet.parquet.AbstractColumnReader; -import org.apache.comet.parquet.BatchReader; +import org.apache.comet.parquet.IcebergCometBatchReader; +import org.apache.comet.parquet.RowGroupReader; import org.apache.iceberg.Schema; import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.parquet.VectorizedReader; @@ -55,7 +56,7 @@ class CometColumnarBatchReader implements VectorizedReader { // calling BatchReader.nextBatch, the isDeleted value is not yet available, so // DeleteColumnReader.readBatch must be called explicitly later, after the isDeleted value is // available. - private final BatchReader delegate; + private final IcebergCometBatchReader delegate; private DeleteFilter deletes = null; private long rowStartPosInBatch = 0; @@ -64,10 +65,7 @@ class CometColumnarBatchReader implements VectorizedReader { readers.stream().map(CometColumnReader.class::cast).toArray(CometColumnReader[]::new); this.hasIsDeletedColumn = readers.stream().anyMatch(reader -> reader instanceof CometDeleteColumnReader); - - AbstractColumnReader[] abstractColumnReaders = new AbstractColumnReader[readers.size()]; - this.delegate = new BatchReader(abstractColumnReaders); - delegate.setSparkSchema(SparkSchemaUtil.convert(schema)); + this.delegate = new IcebergCometBatchReader(readers.size(), SparkSchemaUtil.convert(schema)); } @Override @@ -79,19 +77,22 @@ public void setRowGroupInfo( && !(readers[i] instanceof CometPositionColumnReader) && !(readers[i] instanceof CometDeleteColumnReader)) { readers[i].reset(); - readers[i].setPageReader(pageStore.getPageReader(readers[i].descriptor())); + readers[i].setPageReader((RowGroupReader) pageStore); } } catch (IOException e) { throw new UncheckedIOException("Failed to setRowGroupInfo for Comet vectorization", e); } } + AbstractColumnReader[] delegateReaders = new AbstractColumnReader[readers.length]; for (int i = 0; i < readers.length; i++) { - delegate.getColumnReaders()[i] = this.readers[i].delegate(); + delegateReaders[i] = readers[i].delegate(); } + delegate.init(delegateReaders); + this.rowStartPosInBatch = - pageStore + ((RowGroupReader) pageStore) .getRowIndexOffset() .orElseThrow( () -> diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java index d36f1a727477..56f8c9bff933 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java @@ -142,6 +142,7 @@ public VectorizedReader primitive( return null; } - return new CometColumnReader(SparkSchemaUtil.convert(icebergField.type()), desc); + return new CometColumnReader( + SparkSchemaUtil.convert(icebergField.type()), desc, parquetFieldId); } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index 780e1750a52e..57892ac4c59d 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -109,6 +109,7 @@ private CloseableIterable newParquetIterable( // read performance as every batch read doesn't have to pay the cost of allocating memory. .reuseContainers() .withNameMapping(nameMapping()) + .enableComet(parquetConf.readerType() == ParquetReaderType.COMET) .build(); } From 4d033e05efe7f007e5b866cdf0913aa9b9fd94d2 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Tue, 5 Aug 2025 17:44:31 -0700 Subject: [PATCH 2/7] Update for changes to Comet apis. --- gradle/libs.versions.toml | 2 +- .../iceberg/parquet/CometTypeUtils.java | 255 ++++++++++++++++++ .../parquet/CometVectorizedParquetReader.java | 66 +---- spark/v3.4/build.gradle | 2 + .../data/vectorized/CometColumnReader.java | 64 +---- .../vectorized/CometConstantColumnReader.java | 7 +- .../vectorized/CometDeleteColumnReader.java | 2 +- .../vectorized/CometPositionColumnReader.java | 3 +- spark/v3.5/build.gradle | 2 + .../data/vectorized/CometColumnReader.java | 40 ++- .../vectorized/CometColumnarBatchReader.java | 19 +- .../vectorized/CometConstantColumnReader.java | 7 +- .../vectorized/CometDeleteColumnReader.java | 4 +- .../vectorized/CometPositionColumnReader.java | 3 +- .../CometVectorizedReaderBuilder.java | 3 +- .../iceberg/spark/source/BaseBatchReader.java | 1 + spark/v4.0/build.gradle | 2 + .../data/vectorized/CometColumnReader.java | 40 ++- .../vectorized/CometColumnarBatchReader.java | 18 +- .../vectorized/CometConstantColumnReader.java | 7 +- .../vectorized/CometDeleteColumnReader.java | 2 +- .../vectorized/CometPositionColumnReader.java | 3 +- .../CometVectorizedReaderBuilder.java | 3 +- .../iceberg/spark/source/BaseBatchReader.java | 1 + 24 files changed, 392 insertions(+), 164 deletions(-) create mode 100644 parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index e4555ff70942..3b8a60d95ac6 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -37,7 +37,7 @@ awssdk-s3accessgrants = "2.3.0" bson-ver = "4.11.5" caffeine = "2.9.3" calcite = "1.40.0" -comet = "0.8.1" +comet = "0.10.0-SNAPSHOT" datasketches = "6.2.0" delta-standalone = "3.3.2" delta-spark = "3.3.2" diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java b/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java new file mode 100644 index 000000000000..ddf6c7de5ae8 --- /dev/null +++ b/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import java.util.Map; +import org.apache.comet.parquet.ParquetColumnSpec; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; + +public class CometTypeUtils { + + private CometTypeUtils() {} + + public static ParquetColumnSpec descriptorToParquetColumnSpec(ColumnDescriptor descriptor) { + + String[] path = descriptor.getPath(); + PrimitiveType primitiveType = descriptor.getPrimitiveType(); + String physicalType = primitiveType.getPrimitiveTypeName().name(); + + int typeLength = + primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY + ? primitiveType.getTypeLength() + : 0; + + boolean isRepeated = primitiveType.getRepetition() == Type.Repetition.REPEATED; + + // ToDo: extract this into a Util method + String logicalTypeName = null; + Map logicalTypeParams = Maps.newHashMap(); + LogicalTypeAnnotation logicalType = primitiveType.getLogicalTypeAnnotation(); + + if (logicalType != null) { + logicalTypeName = logicalType.getClass().getSimpleName(); + + // Handle specific logical types + if (logicalType instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { + LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimal = + (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("precision", String.valueOf(decimal.getPrecision())); + logicalTypeParams.put("scale", String.valueOf(decimal.getScale())); + } else if (logicalType instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestamp = + (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("isAdjustedToUTC", String.valueOf(timestamp.isAdjustedToUTC())); + logicalTypeParams.put("unit", timestamp.getUnit().name()); + } else if (logicalType instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimeLogicalTypeAnnotation time = + (LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("isAdjustedToUTC", String.valueOf(time.isAdjustedToUTC())); + logicalTypeParams.put("unit", time.getUnit().name()); + } else if (logicalType instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) { + LogicalTypeAnnotation.IntLogicalTypeAnnotation intType = + (LogicalTypeAnnotation.IntLogicalTypeAnnotation) logicalType; + logicalTypeParams.put("isSigned", String.valueOf(intType.isSigned())); + logicalTypeParams.put("bitWidth", String.valueOf(intType.getBitWidth())); + } + } + + return new ParquetColumnSpec( + 1, // ToDo: pass in the correct id + path, + physicalType, + typeLength, + isRepeated, + descriptor.getMaxDefinitionLevel(), + descriptor.getMaxRepetitionLevel(), + logicalTypeName, + logicalTypeParams); + } + + public static ColumnDescriptor buildColumnDescriptor(ParquetColumnSpec columnSpec) { + PrimitiveType.PrimitiveTypeName primType = + PrimitiveType.PrimitiveTypeName.valueOf(columnSpec.getPhysicalType()); + + Type.Repetition repetition; + if (columnSpec.getMaxRepetitionLevel() > 0) { + repetition = Type.Repetition.REPEATED; + } else if (columnSpec.getMaxDefinitionLevel() > 0) { + repetition = Type.Repetition.OPTIONAL; + } else { + repetition = Type.Repetition.REQUIRED; + } + + String name = columnSpec.getPath()[columnSpec.getPath().length - 1]; + // Reconstruct the logical type from parameters + LogicalTypeAnnotation logicalType = null; + if (columnSpec.getLogicalTypeName() != null) { + logicalType = + reconstructLogicalType( + columnSpec.getLogicalTypeName(), columnSpec.getLogicalTypeParams()); + } + + PrimitiveType primitiveType; + if (primType == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) { + primitiveType = + org.apache.parquet.schema.Types.primitive(primType, repetition) + .length(columnSpec.getTypeLength()) + .as(logicalType) + .id(columnSpec.getFieldId()) + .named(name); + } else { + primitiveType = + Types.primitive(primType, repetition) + .as(logicalType) + .id(columnSpec.getFieldId()) + .named(name); + } + + return new ColumnDescriptor( + columnSpec.getPath(), + primitiveType, + columnSpec.getMaxRepetitionLevel(), + columnSpec.getMaxDefinitionLevel()); + } + + private static LogicalTypeAnnotation reconstructLogicalType( + String logicalTypeName, java.util.Map params) { + + switch (logicalTypeName) { + // MAP + case "MapLogicalTypeAnnotation": + return LogicalTypeAnnotation.mapType(); + + // LIST + case "ListLogicalTypeAnnotation": + return LogicalTypeAnnotation.listType(); + + // STRING + case "StringLogicalTypeAnnotation": + return LogicalTypeAnnotation.stringType(); + + // MAP_KEY_VALUE + case "MapKeyValueLogicalTypeAnnotation": + return LogicalTypeAnnotation.MapKeyValueTypeAnnotation.getInstance(); + + // ENUM + case "EnumLogicalTypeAnnotation": + return LogicalTypeAnnotation.enumType(); + + // DECIMAL + case "DecimalLogicalTypeAnnotation": + if (!params.containsKey("scale") || !params.containsKey("precision")) { + throw new IllegalArgumentException( + "Missing required parameters for DecimalLogicalTypeAnnotation: " + params); + } + int scale = Integer.parseInt(params.get("scale")); + int precision = Integer.parseInt(params.get("precision")); + return LogicalTypeAnnotation.decimalType(scale, precision); + + // DATE + case "DateLogicalTypeAnnotation": + return LogicalTypeAnnotation.dateType(); + + // TIME + case "TimeLogicalTypeAnnotation": + if (!params.containsKey("isAdjustedToUTC") || !params.containsKey("unit")) { + throw new IllegalArgumentException( + "Missing required parameters for TimeLogicalTypeAnnotation: " + params); + } + + boolean isUTC = Boolean.parseBoolean(params.get("isAdjustedToUTC")); + String timeUnitStr = params.get("unit"); + + LogicalTypeAnnotation.TimeUnit timeUnit; + switch (timeUnitStr) { + case "MILLIS": + timeUnit = LogicalTypeAnnotation.TimeUnit.MILLIS; + break; + case "MICROS": + timeUnit = LogicalTypeAnnotation.TimeUnit.MICROS; + break; + case "NANOS": + timeUnit = LogicalTypeAnnotation.TimeUnit.NANOS; + break; + default: + throw new IllegalArgumentException("Unknown time unit: " + timeUnitStr); + } + return LogicalTypeAnnotation.timeType(isUTC, timeUnit); + + // TIMESTAMP + case "TimestampLogicalTypeAnnotation": + if (!params.containsKey("isAdjustedToUTC") || !params.containsKey("unit")) { + throw new IllegalArgumentException( + "Missing required parameters for TimestampLogicalTypeAnnotation: " + params); + } + boolean isAdjustedToUTC = Boolean.parseBoolean(params.get("isAdjustedToUTC")); + String unitStr = params.get("unit"); + + LogicalTypeAnnotation.TimeUnit unit; + switch (unitStr) { + case "MILLIS": + unit = LogicalTypeAnnotation.TimeUnit.MILLIS; + break; + case "MICROS": + unit = LogicalTypeAnnotation.TimeUnit.MICROS; + break; + case "NANOS": + unit = LogicalTypeAnnotation.TimeUnit.NANOS; + break; + default: + throw new IllegalArgumentException("Unknown timestamp unit: " + unitStr); + } + return LogicalTypeAnnotation.timestampType(isAdjustedToUTC, unit); + + // INTEGER + case "IntLogicalTypeAnnotation": + if (!params.containsKey("isSigned") || !params.containsKey("bitWidth")) { + throw new IllegalArgumentException( + "Missing required parameters for IntLogicalTypeAnnotation: " + params); + } + boolean isSigned = Boolean.parseBoolean(params.get("isSigned")); + int bitWidth = Integer.parseInt(params.get("bitWidth")); + return LogicalTypeAnnotation.intType(bitWidth, isSigned); + + // JSON + case "JsonLogicalTypeAnnotation": + return LogicalTypeAnnotation.jsonType(); + + // BSON + case "BsonLogicalTypeAnnotation": + return LogicalTypeAnnotation.bsonType(); + + // UUID + case "UUIDLogicalTypeAnnotation": + return LogicalTypeAnnotation.uuidType(); + + // INTERVAL + case "IntervalLogicalTypeAnnotation": + return LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance(); + + default: + throw new IllegalArgumentException("Unknown logical type: " + logicalTypeName); + } + } +} diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java index 5b5555779e4d..88b195b76a2d 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java @@ -29,28 +29,24 @@ import org.apache.comet.parquet.ParquetColumnSpec; import org.apache.comet.parquet.ReadOptions; import org.apache.comet.parquet.RowGroupReader; +import org.apache.comet.parquet.WrappedInputFile; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.Schema; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; -import org.apache.iceberg.hadoop.HadoopInputFile; import org.apache.iceberg.io.CloseableGroup; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.ByteBuffers; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ColumnPath; -import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.PrimitiveType; -import org.apache.parquet.schema.Type; public class CometVectorizedParquetReader extends CloseableGroup implements CloseableIterable { @@ -183,8 +179,7 @@ private FileReader newCometReader( FileReader fileReader = new FileReader( - ((HadoopInputFile) file).getPath(), - new Configuration(((HadoopInputFile) file).getConf()), + new WrappedInputFile(file), cometOptions, properties, start, @@ -197,62 +192,7 @@ private FileReader newCometReader( List specs = Lists.newArrayList(); for (ColumnDescriptor descriptor : columnDescriptors) { - String[] path = descriptor.getPath(); - PrimitiveType primitiveType = descriptor.getPrimitiveType(); - String physicalType = primitiveType.getPrimitiveTypeName().name(); - - int typeLength = - primitiveType.getPrimitiveTypeName() - == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY - ? primitiveType.getTypeLength() - : 0; - - boolean isRepeated = primitiveType.getRepetition() == Type.Repetition.REPEATED; - - // ToDo: extract this into a Util method - String logicalTypeName = null; - Map logicalTypeParams = Maps.newHashMap(); - LogicalTypeAnnotation logicalType = primitiveType.getLogicalTypeAnnotation(); - - if (logicalType != null) { - logicalTypeName = logicalType.getClass().getSimpleName(); - - // Handle specific logical types - if (logicalType instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { - LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimal = - (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType; - logicalTypeParams.put("precision", String.valueOf(decimal.getPrecision())); - logicalTypeParams.put("scale", String.valueOf(decimal.getScale())); - } else if (logicalType - instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) { - LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestamp = - (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType; - logicalTypeParams.put("isAdjustedToUTC", String.valueOf(timestamp.isAdjustedToUTC())); - logicalTypeParams.put("unit", timestamp.getUnit().name()); - } else if (logicalType instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) { - LogicalTypeAnnotation.TimeLogicalTypeAnnotation time = - (LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType; - logicalTypeParams.put("isAdjustedToUTC", String.valueOf(time.isAdjustedToUTC())); - logicalTypeParams.put("unit", time.getUnit().name()); - } else if (logicalType instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) { - LogicalTypeAnnotation.IntLogicalTypeAnnotation intType = - (LogicalTypeAnnotation.IntLogicalTypeAnnotation) logicalType; - logicalTypeParams.put("isSigned", String.valueOf(intType.isSigned())); - logicalTypeParams.put("bitWidth", String.valueOf(intType.getBitWidth())); - } - } - - ParquetColumnSpec spec = - new ParquetColumnSpec( - 1, // ToDo: pass in the correct id - path, - physicalType, - typeLength, - isRepeated, - descriptor.getMaxDefinitionLevel(), - descriptor.getMaxRepetitionLevel(), - logicalTypeName, - logicalTypeParams); + ParquetColumnSpec spec = CometTypeUtils.descriptorToParquetColumnSpec(descriptor); specs.add(spec); } diff --git a/spark/v3.4/build.gradle b/spark/v3.4/build.gradle index 714be0831d8e..ca3b9dc41141 100644 --- a/spark/v3.4/build.gradle +++ b/spark/v3.4/build.gradle @@ -264,6 +264,8 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') integrationImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts') integrationImplementation project(path: ":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts') + integrationImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:${libs.versions.comet.get()}" + // runtime dependencies for running Hive Catalog based integration test integrationRuntimeOnly project(':iceberg-hive-metastore') diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java index 1ddd4d0dfd4d..eba1a2a0fb15 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java @@ -19,7 +19,6 @@ package org.apache.iceberg.spark.data.vectorized; import java.io.IOException; -import java.util.Map; import org.apache.comet.CometConf; import org.apache.comet.CometSchemaImporter; import org.apache.comet.parquet.AbstractColumnReader; @@ -29,15 +28,12 @@ import org.apache.comet.parquet.TypeUtil; import org.apache.comet.parquet.Utils; import org.apache.comet.shaded.arrow.memory.RootAllocator; +import org.apache.iceberg.parquet.CometTypeUtils; import org.apache.iceberg.parquet.VectorizedReader; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.types.Types; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.schema.LogicalTypeAnnotation; -import org.apache.parquet.schema.PrimitiveType; -import org.apache.parquet.schema.Type; import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Metadata; @@ -69,7 +65,8 @@ class CometColumnReader implements VectorizedReader { DataType dataType = SparkSchemaUtil.convert(field.type()); StructField structField = new StructField(field.name(), dataType, false, Metadata.empty()); this.sparkType = dataType; - this.descriptor = TypeUtil.convertToParquet(structField); + this.descriptor = + CometTypeUtils.buildColumnDescriptor(TypeUtil.convertToParquetSpec(structField)); this.fieldId = field.fieldId(); } @@ -105,58 +102,7 @@ public void reset() { this.importer = new CometSchemaImporter(new RootAllocator()); - String[] path = descriptor.getPath(); - PrimitiveType primitiveType = descriptor.getPrimitiveType(); - String physicalType = primitiveType.getPrimitiveTypeName().name(); - - // ToDo: extract this into a Util method - String logicalTypeName = null; - Map logicalTypeParams = Maps.newHashMap(); - LogicalTypeAnnotation logicalType = primitiveType.getLogicalTypeAnnotation(); - - if (logicalType != null) { - logicalTypeName = logicalType.getClass().getSimpleName(); - - // Handle specific logical types - if (logicalType instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { - LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimal = - (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType; - logicalTypeParams.put("precision", String.valueOf(decimal.getPrecision())); - logicalTypeParams.put("scale", String.valueOf(decimal.getScale())); - } else if (logicalType instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) { - LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestamp = - (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType; - logicalTypeParams.put("isAdjustedToUTC", String.valueOf(timestamp.isAdjustedToUTC())); - logicalTypeParams.put("unit", timestamp.getUnit().name()); - } else if (logicalType instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) { - LogicalTypeAnnotation.TimeLogicalTypeAnnotation time = - (LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType; - logicalTypeParams.put("isAdjustedToUTC", String.valueOf(time.isAdjustedToUTC())); - logicalTypeParams.put("unit", time.getUnit().name()); - } else if (logicalType instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) { - LogicalTypeAnnotation.IntLogicalTypeAnnotation intType = - (LogicalTypeAnnotation.IntLogicalTypeAnnotation) logicalType; - logicalTypeParams.put("isSigned", String.valueOf(intType.isSigned())); - logicalTypeParams.put("bitWidth", String.valueOf(intType.getBitWidth())); - } - } - - int typeLength = - primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY - ? primitiveType.getTypeLength() - : 0; - boolean isRepeated = primitiveType.getRepetition() == Type.Repetition.REPEATED; - spec = - new ParquetColumnSpec( - fieldId, - path, - physicalType, - typeLength, - isRepeated, - descriptor.getMaxDefinitionLevel(), - descriptor.getMaxRepetitionLevel(), - logicalTypeName, - logicalTypeParams); + spec = CometTypeUtils.descriptorToParquetColumnSpec(descriptor); boolean useLegacyTime = Boolean.parseBoolean( @@ -165,7 +111,7 @@ public void reset() { CometConf.COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP().key(), "false")); boolean useLazyMaterialization = Boolean.parseBoolean( - SQLConf.get().getConfString(CometConf.COMET_USE_LAZY_MATERIALIZATION().key(), "true")); + SQLConf.get().getConfString(CometConf.COMET_USE_LAZY_MATERIALIZATION().key(), "false")); this.delegate = Utils.getColumnReader( sparkType, diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java index c665002e8f66..6201988ee2b5 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java @@ -21,6 +21,7 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import org.apache.comet.parquet.ConstantColumnReader; +import org.apache.iceberg.parquet.CometTypeUtils; import org.apache.iceberg.types.Types; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; @@ -34,7 +35,11 @@ class CometConstantColumnReader extends CometColumnReader { super(field); // use delegate to set constant value on the native side to be consumed by native execution. setDelegate( - new ConstantColumnReader(sparkType(), descriptor(), convertToSparkValue(value), false)); + new ConstantColumnReader( + sparkType(), + CometTypeUtils.descriptorToParquetColumnSpec(descriptor()), + convertToSparkValue(value), + false)); } @Override diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java index 4a28fc51da9b..cba108e4326e 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java @@ -51,7 +51,7 @@ private static class DeleteColumnReader extends MetadataColumnReader { DeleteColumnReader() { super( DataTypes.BooleanType, - TypeUtil.convertToParquet( + TypeUtil.convertToParquetSpec( new StructField("_deleted", DataTypes.BooleanType, false, Metadata.empty())), false /* useDecimal128 = false */, false /* isConstant */); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java index 1949a717982a..3416d47bd8b4 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java @@ -20,6 +20,7 @@ import org.apache.comet.parquet.MetadataColumnReader; import org.apache.comet.parquet.Native; +import org.apache.iceberg.parquet.CometTypeUtils; import org.apache.iceberg.types.Types; import org.apache.parquet.column.ColumnDescriptor; import org.apache.spark.sql.types.DataTypes; @@ -44,7 +45,7 @@ private static class PositionColumnReader extends MetadataColumnReader { PositionColumnReader(ColumnDescriptor descriptor) { super( DataTypes.LongType, - descriptor, + CometTypeUtils.descriptorToParquetColumnSpec(descriptor), false /* useDecimal128 = false */, false /* isConstant */); } diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle index 69700d84366d..138240f566c2 100644 --- a/spark/v3.5/build.gradle +++ b/spark/v3.5/build.gradle @@ -264,6 +264,8 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') integrationImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts') integrationImplementation project(path: ":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts') + integrationImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:${libs.versions.comet.get()}" + // runtime dependencies for running Hive Catalog based integration test integrationRuntimeOnly project(':iceberg-hive-metastore') diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java index 81b7d83a7077..eba1a2a0fb15 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java @@ -19,18 +19,22 @@ package org.apache.iceberg.spark.data.vectorized; import java.io.IOException; +import org.apache.comet.CometConf; import org.apache.comet.CometSchemaImporter; import org.apache.comet.parquet.AbstractColumnReader; import org.apache.comet.parquet.ColumnReader; +import org.apache.comet.parquet.ParquetColumnSpec; +import org.apache.comet.parquet.RowGroupReader; import org.apache.comet.parquet.TypeUtil; import org.apache.comet.parquet.Utils; import org.apache.comet.shaded.arrow.memory.RootAllocator; +import org.apache.iceberg.parquet.CometTypeUtils; import org.apache.iceberg.parquet.VectorizedReader; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.types.Types; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.page.PageReader; +import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; @@ -42,23 +46,28 @@ class CometColumnReader implements VectorizedReader { private final ColumnDescriptor descriptor; private final DataType sparkType; + private final int fieldId; // The delegated ColumnReader from Comet side private AbstractColumnReader delegate; private boolean initialized = false; private int batchSize = DEFAULT_BATCH_SIZE; private CometSchemaImporter importer; + private ParquetColumnSpec spec; - CometColumnReader(DataType sparkType, ColumnDescriptor descriptor) { + CometColumnReader(DataType sparkType, ColumnDescriptor descriptor, int fieldId) { this.sparkType = sparkType; this.descriptor = descriptor; + this.fieldId = fieldId; } CometColumnReader(Types.NestedField field) { DataType dataType = SparkSchemaUtil.convert(field.type()); StructField structField = new StructField(field.name(), dataType, false, Metadata.empty()); this.sparkType = dataType; - this.descriptor = TypeUtil.convertToParquet(structField); + this.descriptor = + CometTypeUtils.buildColumnDescriptor(TypeUtil.convertToParquetSpec(structField)); + this.fieldId = field.fieldId(); } public AbstractColumnReader delegate() { @@ -92,7 +101,26 @@ public void reset() { } this.importer = new CometSchemaImporter(new RootAllocator()); - this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, false, false); + + spec = CometTypeUtils.descriptorToParquetColumnSpec(descriptor); + + boolean useLegacyTime = + Boolean.parseBoolean( + SQLConf.get() + .getConfString( + CometConf.COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP().key(), "false")); + boolean useLazyMaterialization = + Boolean.parseBoolean( + SQLConf.get().getConfString(CometConf.COMET_USE_LAZY_MATERIALIZATION().key(), "false")); + this.delegate = + Utils.getColumnReader( + sparkType, + spec, + importer, + batchSize, + true, // Comet sets this to true for native execution + useLazyMaterialization, + useLegacyTime); this.initialized = true; } @@ -111,9 +139,9 @@ public DataType sparkType() { *

NOTE: this should be called before reading a new Parquet column chunk, and after {@link * CometColumnReader#reset} is called. */ - public void setPageReader(PageReader pageReader) throws IOException { + public void setPageReader(RowGroupReader pageStore) throws IOException { Preconditions.checkState(initialized, "Invalid state: 'reset' should be called first"); - ((ColumnReader) delegate).setPageReader(pageReader); + ((ColumnReader) delegate).setRowGroupReader(pageStore, spec); } @Override diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java index 04ac69476add..52d19ec2daf7 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java @@ -23,7 +23,8 @@ import java.util.List; import java.util.Map; import org.apache.comet.parquet.AbstractColumnReader; -import org.apache.comet.parquet.BatchReader; +import org.apache.comet.parquet.IcebergCometBatchReader; +import org.apache.comet.parquet.RowGroupReader; import org.apache.iceberg.Schema; import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.parquet.VectorizedReader; @@ -55,7 +56,7 @@ class CometColumnarBatchReader implements VectorizedReader { // calling BatchReader.nextBatch, the isDeleted value is not yet available, so // DeleteColumnReader.readBatch must be called explicitly later, after the isDeleted value is // available. - private final BatchReader delegate; + private final IcebergCometBatchReader delegate; private DeleteFilter deletes = null; private long rowStartPosInBatch = 0; @@ -64,10 +65,7 @@ class CometColumnarBatchReader implements VectorizedReader { readers.stream().map(CometColumnReader.class::cast).toArray(CometColumnReader[]::new); this.hasIsDeletedColumn = readers.stream().anyMatch(reader -> reader instanceof CometDeleteColumnReader); - - AbstractColumnReader[] abstractColumnReaders = new AbstractColumnReader[readers.size()]; - this.delegate = new BatchReader(abstractColumnReaders); - delegate.setSparkSchema(SparkSchemaUtil.convert(schema)); + this.delegate = new IcebergCometBatchReader(readers.size(), SparkSchemaUtil.convert(schema)); } @Override @@ -79,19 +77,22 @@ public void setRowGroupInfo( && !(readers[i] instanceof CometPositionColumnReader) && !(readers[i] instanceof CometDeleteColumnReader)) { readers[i].reset(); - readers[i].setPageReader(pageStore.getPageReader(readers[i].descriptor())); + readers[i].setPageReader((RowGroupReader) pageStore); } } catch (IOException e) { throw new UncheckedIOException("Failed to setRowGroupInfo for Comet vectorization", e); } } + AbstractColumnReader[] delegateReaders = new AbstractColumnReader[readers.length]; for (int i = 0; i < readers.length; i++) { - delegate.getColumnReaders()[i] = this.readers[i].delegate(); + delegateReaders[i] = readers[i].delegate(); } + delegate.init(delegateReaders); + this.rowStartPosInBatch = - pageStore + ((RowGroupReader) pageStore) .getRowIndexOffset() .orElseThrow( () -> diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java index 047c96314b13..88d691a607a7 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java @@ -21,6 +21,7 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import org.apache.comet.parquet.ConstantColumnReader; +import org.apache.iceberg.parquet.CometTypeUtils; import org.apache.iceberg.types.Types; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; @@ -34,7 +35,11 @@ class CometConstantColumnReader extends CometColumnReader { super(field); // use delegate to set constant value on the native side to be consumed by native execution. setDelegate( - new ConstantColumnReader(sparkType(), descriptor(), convertToSparkValue(value), false)); + new ConstantColumnReader( + sparkType(), + CometTypeUtils.descriptorToParquetColumnSpec(descriptor()), + convertToSparkValue(value), + false)); } @Override diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java index 6235bfe4865e..cba108e4326e 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java @@ -51,10 +51,10 @@ private static class DeleteColumnReader extends MetadataColumnReader { DeleteColumnReader() { super( DataTypes.BooleanType, - TypeUtil.convertToParquet( + TypeUtil.convertToParquetSpec( new StructField("_deleted", DataTypes.BooleanType, false, Metadata.empty())), false /* useDecimal128 = false */, - false /* isConstant = false */); + false /* isConstant */); this.isDeleted = new boolean[0]; } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java index bcc0e514c28d..98e80068c519 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java @@ -20,6 +20,7 @@ import org.apache.comet.parquet.MetadataColumnReader; import org.apache.comet.parquet.Native; +import org.apache.iceberg.parquet.CometTypeUtils; import org.apache.iceberg.types.Types; import org.apache.parquet.column.ColumnDescriptor; import org.apache.spark.sql.types.DataTypes; @@ -44,7 +45,7 @@ private static class PositionColumnReader extends MetadataColumnReader { PositionColumnReader(ColumnDescriptor descriptor) { super( DataTypes.LongType, - descriptor, + CometTypeUtils.descriptorToParquetColumnSpec(descriptor), false /* useDecimal128 = false */, false /* isConstant = false */); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java index d36f1a727477..56f8c9bff933 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java @@ -142,6 +142,7 @@ public VectorizedReader primitive( return null; } - return new CometColumnReader(SparkSchemaUtil.convert(icebergField.type()), desc); + return new CometColumnReader( + SparkSchemaUtil.convert(icebergField.type()), desc, parquetFieldId); } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index 780e1750a52e..57892ac4c59d 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -109,6 +109,7 @@ private CloseableIterable newParquetIterable( // read performance as every batch read doesn't have to pay the cost of allocating memory. .reuseContainers() .withNameMapping(nameMapping()) + .enableComet(parquetConf.readerType() == ParquetReaderType.COMET) .build(); } diff --git a/spark/v4.0/build.gradle b/spark/v4.0/build.gradle index 9c7ea06f9938..512b80b695b0 100644 --- a/spark/v4.0/build.gradle +++ b/spark/v4.0/build.gradle @@ -269,6 +269,8 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') integrationImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts') integrationImplementation project(path: ":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts') + integrationImplementation "org.apache.datafusion:comet-spark-spark3.5_2.13:${libs.versions.comet.get()}" + // runtime dependencies for running Hive Catalog based integration test integrationRuntimeOnly project(':iceberg-hive-metastore') diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java index 81b7d83a7077..eba1a2a0fb15 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java @@ -19,18 +19,22 @@ package org.apache.iceberg.spark.data.vectorized; import java.io.IOException; +import org.apache.comet.CometConf; import org.apache.comet.CometSchemaImporter; import org.apache.comet.parquet.AbstractColumnReader; import org.apache.comet.parquet.ColumnReader; +import org.apache.comet.parquet.ParquetColumnSpec; +import org.apache.comet.parquet.RowGroupReader; import org.apache.comet.parquet.TypeUtil; import org.apache.comet.parquet.Utils; import org.apache.comet.shaded.arrow.memory.RootAllocator; +import org.apache.iceberg.parquet.CometTypeUtils; import org.apache.iceberg.parquet.VectorizedReader; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.types.Types; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.page.PageReader; +import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; @@ -42,23 +46,28 @@ class CometColumnReader implements VectorizedReader { private final ColumnDescriptor descriptor; private final DataType sparkType; + private final int fieldId; // The delegated ColumnReader from Comet side private AbstractColumnReader delegate; private boolean initialized = false; private int batchSize = DEFAULT_BATCH_SIZE; private CometSchemaImporter importer; + private ParquetColumnSpec spec; - CometColumnReader(DataType sparkType, ColumnDescriptor descriptor) { + CometColumnReader(DataType sparkType, ColumnDescriptor descriptor, int fieldId) { this.sparkType = sparkType; this.descriptor = descriptor; + this.fieldId = fieldId; } CometColumnReader(Types.NestedField field) { DataType dataType = SparkSchemaUtil.convert(field.type()); StructField structField = new StructField(field.name(), dataType, false, Metadata.empty()); this.sparkType = dataType; - this.descriptor = TypeUtil.convertToParquet(structField); + this.descriptor = + CometTypeUtils.buildColumnDescriptor(TypeUtil.convertToParquetSpec(structField)); + this.fieldId = field.fieldId(); } public AbstractColumnReader delegate() { @@ -92,7 +101,26 @@ public void reset() { } this.importer = new CometSchemaImporter(new RootAllocator()); - this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, false, false); + + spec = CometTypeUtils.descriptorToParquetColumnSpec(descriptor); + + boolean useLegacyTime = + Boolean.parseBoolean( + SQLConf.get() + .getConfString( + CometConf.COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP().key(), "false")); + boolean useLazyMaterialization = + Boolean.parseBoolean( + SQLConf.get().getConfString(CometConf.COMET_USE_LAZY_MATERIALIZATION().key(), "false")); + this.delegate = + Utils.getColumnReader( + sparkType, + spec, + importer, + batchSize, + true, // Comet sets this to true for native execution + useLazyMaterialization, + useLegacyTime); this.initialized = true; } @@ -111,9 +139,9 @@ public DataType sparkType() { *

NOTE: this should be called before reading a new Parquet column chunk, and after {@link * CometColumnReader#reset} is called. */ - public void setPageReader(PageReader pageReader) throws IOException { + public void setPageReader(RowGroupReader pageStore) throws IOException { Preconditions.checkState(initialized, "Invalid state: 'reset' should be called first"); - ((ColumnReader) delegate).setPageReader(pageReader); + ((ColumnReader) delegate).setRowGroupReader(pageStore, spec); } @Override diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java index 04ac69476add..9ebe4b6396c1 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java @@ -23,7 +23,8 @@ import java.util.List; import java.util.Map; import org.apache.comet.parquet.AbstractColumnReader; -import org.apache.comet.parquet.BatchReader; +import org.apache.comet.parquet.IcebergCometBatchReader; +import org.apache.comet.parquet.RowGroupReader; import org.apache.iceberg.Schema; import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.parquet.VectorizedReader; @@ -55,7 +56,7 @@ class CometColumnarBatchReader implements VectorizedReader { // calling BatchReader.nextBatch, the isDeleted value is not yet available, so // DeleteColumnReader.readBatch must be called explicitly later, after the isDeleted value is // available. - private final BatchReader delegate; + private final IcebergCometBatchReader delegate; private DeleteFilter deletes = null; private long rowStartPosInBatch = 0; @@ -65,9 +66,7 @@ class CometColumnarBatchReader implements VectorizedReader { this.hasIsDeletedColumn = readers.stream().anyMatch(reader -> reader instanceof CometDeleteColumnReader); - AbstractColumnReader[] abstractColumnReaders = new AbstractColumnReader[readers.size()]; - this.delegate = new BatchReader(abstractColumnReaders); - delegate.setSparkSchema(SparkSchemaUtil.convert(schema)); + this.delegate = new IcebergCometBatchReader(readers.size(), SparkSchemaUtil.convert(schema)); } @Override @@ -79,19 +78,22 @@ public void setRowGroupInfo( && !(readers[i] instanceof CometPositionColumnReader) && !(readers[i] instanceof CometDeleteColumnReader)) { readers[i].reset(); - readers[i].setPageReader(pageStore.getPageReader(readers[i].descriptor())); + readers[i].setPageReader((RowGroupReader) pageStore); } } catch (IOException e) { throw new UncheckedIOException("Failed to setRowGroupInfo for Comet vectorization", e); } } + AbstractColumnReader[] delegateReaders = new AbstractColumnReader[readers.length]; for (int i = 0; i < readers.length; i++) { - delegate.getColumnReaders()[i] = this.readers[i].delegate(); + delegateReaders[i] = readers[i].delegate(); } + delegate.init(delegateReaders); + this.rowStartPosInBatch = - pageStore + ((RowGroupReader) pageStore) .getRowIndexOffset() .orElseThrow( () -> diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java index 047c96314b13..88d691a607a7 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java @@ -21,6 +21,7 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import org.apache.comet.parquet.ConstantColumnReader; +import org.apache.iceberg.parquet.CometTypeUtils; import org.apache.iceberg.types.Types; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; @@ -34,7 +35,11 @@ class CometConstantColumnReader extends CometColumnReader { super(field); // use delegate to set constant value on the native side to be consumed by native execution. setDelegate( - new ConstantColumnReader(sparkType(), descriptor(), convertToSparkValue(value), false)); + new ConstantColumnReader( + sparkType(), + CometTypeUtils.descriptorToParquetColumnSpec(descriptor()), + convertToSparkValue(value), + false)); } @Override diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java index 6235bfe4865e..721279786cd2 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java @@ -51,7 +51,7 @@ private static class DeleteColumnReader extends MetadataColumnReader { DeleteColumnReader() { super( DataTypes.BooleanType, - TypeUtil.convertToParquet( + TypeUtil.convertToParquetSpec( new StructField("_deleted", DataTypes.BooleanType, false, Metadata.empty())), false /* useDecimal128 = false */, false /* isConstant = false */); diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java index bcc0e514c28d..98e80068c519 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java @@ -20,6 +20,7 @@ import org.apache.comet.parquet.MetadataColumnReader; import org.apache.comet.parquet.Native; +import org.apache.iceberg.parquet.CometTypeUtils; import org.apache.iceberg.types.Types; import org.apache.parquet.column.ColumnDescriptor; import org.apache.spark.sql.types.DataTypes; @@ -44,7 +45,7 @@ private static class PositionColumnReader extends MetadataColumnReader { PositionColumnReader(ColumnDescriptor descriptor) { super( DataTypes.LongType, - descriptor, + CometTypeUtils.descriptorToParquetColumnSpec(descriptor), false /* useDecimal128 = false */, false /* isConstant = false */); } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java index d36f1a727477..56f8c9bff933 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java @@ -142,6 +142,7 @@ public VectorizedReader primitive( return null; } - return new CometColumnReader(SparkSchemaUtil.convert(icebergField.type()), desc); + return new CometColumnReader( + SparkSchemaUtil.convert(icebergField.type()), desc, parquetFieldId); } } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index 780e1750a52e..57892ac4c59d 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -109,6 +109,7 @@ private CloseableIterable newParquetIterable( // read performance as every batch read doesn't have to pay the cost of allocating memory. .reuseContainers() .withNameMapping(nameMapping()) + .enableComet(parquetConf.readerType() == ParquetReaderType.COMET) .build(); } From ddf7511bb841e66d8eeb8065329bb3408a5272b3 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Tue, 12 Aug 2025 17:28:58 -0700 Subject: [PATCH 3/7] better message --- parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index f9df53b038b8..bc173c7b6411 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -1359,7 +1359,7 @@ public CloseableIterable build() { if (batchedReaderFunc != null) { if (isComet) { - LOG.info("Comet enabled"); + LOG.info("Comet vectorized reader enabled"); return new CometVectorizedParquetReader<>( file, schema, From 3ce3c8b75f947652cfe11c6e5a31c8720bfefe6f Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Mon, 18 Aug 2025 14:40:16 -0700 Subject: [PATCH 4/7] Pass correct field id --- .../java/org/apache/iceberg/parquet/CometTypeUtils.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java b/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java index ddf6c7de5ae8..8f4305ce0681 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java @@ -44,7 +44,6 @@ public static ParquetColumnSpec descriptorToParquetColumnSpec(ColumnDescriptor d boolean isRepeated = primitiveType.getRepetition() == Type.Repetition.REPEATED; - // ToDo: extract this into a Util method String logicalTypeName = null; Map logicalTypeParams = Maps.newHashMap(); LogicalTypeAnnotation logicalType = primitiveType.getLogicalTypeAnnotation(); @@ -76,8 +75,14 @@ public static ParquetColumnSpec descriptorToParquetColumnSpec(ColumnDescriptor d } } + int id = -1; + Type type = descriptor.getPrimitiveType(); + if (type != null && type.getId() != null) { + id = type.getId().intValue(); + } + return new ParquetColumnSpec( - 1, // ToDo: pass in the correct id + id, path, physicalType, typeLength, From 9ad82cf3729b83e5939576609d2eaf6fc3a2efae Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Tue, 16 Sep 2025 16:44:35 -0700 Subject: [PATCH 5/7] update to use comet 0.10.0 --- gradle/libs.versions.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 3b8a60d95ac6..38205c9749a8 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -37,7 +37,7 @@ awssdk-s3accessgrants = "2.3.0" bson-ver = "4.11.5" caffeine = "2.9.3" calcite = "1.40.0" -comet = "0.10.0-SNAPSHOT" +comet = "0.10.0 datasketches = "6.2.0" delta-standalone = "3.3.2" delta-spark = "3.3.2" From c02b80e12cb85398bdce095abd2fc9f3cc205238 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Tue, 16 Sep 2025 16:48:12 -0700 Subject: [PATCH 6/7] fix typo --- gradle/libs.versions.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 38205c9749a8..3419e708edd7 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -37,7 +37,7 @@ awssdk-s3accessgrants = "2.3.0" bson-ver = "4.11.5" caffeine = "2.9.3" calcite = "1.40.0" -comet = "0.10.0 +comet = "0.10.0" datasketches = "6.2.0" delta-standalone = "3.3.2" delta-spark = "3.3.2" From b260491bde5f81f75d4dfb33d34f16564674e0cf Mon Sep 17 00:00:00 2001 From: hsiang-c Date: Fri, 26 Sep 2025 12:41:02 -0700 Subject: [PATCH 7/7] Close reader instance in ReadConf --- .../apache/iceberg/parquet/CometVectorizedParquetReader.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java index 88b195b76a2d..a3cba401827a 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java @@ -140,6 +140,7 @@ private static class FileIterator implements CloseableIterator { private long valuesRead = 0; private T last = null; private final FileReader cometReader; + private ReadConf conf; FileIterator( ReadConf conf, @@ -164,6 +165,7 @@ private static class FileIterator implements CloseableIterator { length, fileEncryptionKey, fileAADPrefix); + this.conf = conf; } private FileReader newCometReader( @@ -250,6 +252,9 @@ private void advance() { public void close() throws IOException { model.close(); cometReader.close(); + if (conf != null && conf.reader() != null) { + conf.reader().close(); + } } } }