diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java index 04ac69476add..23ad3893b168 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java @@ -22,8 +22,11 @@ import java.io.UncheckedIOException; import java.util.List; import java.util.Map; +import org.apache.comet.CometRuntimeException; import org.apache.comet.parquet.AbstractColumnReader; import org.apache.comet.parquet.BatchReader; +import org.apache.comet.vector.CometSelectionVector; +import org.apache.comet.vector.CometVector; import org.apache.iceberg.Schema; import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.parquet.VectorizedReader; @@ -148,9 +151,17 @@ ColumnarBatch loadDataToColumnBatch() { Pair pair = buildRowIdMapping(vectors); if (pair != null) { int[] rowIdMapping = pair.first(); - numLiveRows = pair.second(); - for (int i = 0; i < vectors.length; i++) { - vectors[i] = new ColumnVectorWithFilter(vectors[i], rowIdMapping); + if (pair.second() != null) { + numLiveRows = pair.second(); + for (int i = 0; i < vectors.length; i++) { + if (vectors[i] instanceof CometVector) { + vectors[i] = + new CometSelectionVector((CometVector) vectors[i], rowIdMapping, numLiveRows); + } else { + throw new CometRuntimeException( + "Unsupported column vector type: " + vectors[i].getClass()); + } + } } } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java index 04ac69476add..23ad3893b168 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java @@ -22,8 +22,11 @@ import java.io.UncheckedIOException; import java.util.List; import java.util.Map; +import org.apache.comet.CometRuntimeException; import org.apache.comet.parquet.AbstractColumnReader; import org.apache.comet.parquet.BatchReader; +import org.apache.comet.vector.CometSelectionVector; +import org.apache.comet.vector.CometVector; import org.apache.iceberg.Schema; import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.parquet.VectorizedReader; @@ -148,9 +151,17 @@ ColumnarBatch loadDataToColumnBatch() { Pair pair = buildRowIdMapping(vectors); if (pair != null) { int[] rowIdMapping = pair.first(); - numLiveRows = pair.second(); - for (int i = 0; i < vectors.length; i++) { - vectors[i] = new ColumnVectorWithFilter(vectors[i], rowIdMapping); + if (pair.second() != null) { + numLiveRows = pair.second(); + for (int i = 0; i < vectors.length; i++) { + if (vectors[i] instanceof CometVector) { + vectors[i] = + new CometSelectionVector((CometVector) vectors[i], rowIdMapping, numLiveRows); + } else { + throw new CometRuntimeException( + "Unsupported column vector type: " + vectors[i].getClass()); + } + } } } } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java index 04ac69476add..23ad3893b168 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java @@ -22,8 +22,11 @@ import java.io.UncheckedIOException; import java.util.List; import java.util.Map; +import org.apache.comet.CometRuntimeException; import org.apache.comet.parquet.AbstractColumnReader; import org.apache.comet.parquet.BatchReader; +import org.apache.comet.vector.CometSelectionVector; +import org.apache.comet.vector.CometVector; import org.apache.iceberg.Schema; import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.parquet.VectorizedReader; @@ -148,9 +151,17 @@ ColumnarBatch loadDataToColumnBatch() { Pair pair = buildRowIdMapping(vectors); if (pair != null) { int[] rowIdMapping = pair.first(); - numLiveRows = pair.second(); - for (int i = 0; i < vectors.length; i++) { - vectors[i] = new ColumnVectorWithFilter(vectors[i], rowIdMapping); + if (pair.second() != null) { + numLiveRows = pair.second(); + for (int i = 0; i < vectors.length; i++) { + if (vectors[i] instanceof CometVector) { + vectors[i] = + new CometSelectionVector((CometVector) vectors[i], rowIdMapping, numLiveRows); + } else { + throw new CometRuntimeException( + "Unsupported column vector type: " + vectors[i].getClass()); + } + } } } }