From b33aec4ed58877fcdc6af8ee0d0bfe4f180ed45f Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Fri, 11 Apr 2025 15:37:59 +0200 Subject: [PATCH 01/50] Core, Data: File Format API interfaces --- build.gradle | 1 + .../apache/iceberg/io/AppenderBuilder.java | 139 +++++++++ .../org/apache/iceberg/io/ObjectModel.java | 79 ++++++ .../org/apache/iceberg/io/ReadBuilder.java | 123 ++++++++ .../apache/iceberg/data/AppenderBuilder.java | 43 +++ .../iceberg/data/DataWriterBuilder.java | 44 +++ .../data/EqualityDeleteWriterBuilder.java | 54 ++++ .../iceberg/data/FileWriterBuilderBase.java | 54 ++++ .../iceberg/data/ObjectModelRegistry.java | 224 +++++++++++++++ .../data/PositionDeleteWriterBuilder.java | 49 ++++ .../org/apache/iceberg/data/WriteBuilder.java | 264 ++++++++++++++++++ .../iceberg/data/WriterBuilderBase.java | 106 +++++++ 12 files changed, 1180 insertions(+) create mode 100644 core/src/main/java/org/apache/iceberg/io/AppenderBuilder.java create mode 100644 core/src/main/java/org/apache/iceberg/io/ObjectModel.java create mode 100644 core/src/main/java/org/apache/iceberg/io/ReadBuilder.java create mode 100644 data/src/main/java/org/apache/iceberg/data/AppenderBuilder.java create mode 100644 data/src/main/java/org/apache/iceberg/data/DataWriterBuilder.java create mode 100644 data/src/main/java/org/apache/iceberg/data/EqualityDeleteWriterBuilder.java create mode 100644 data/src/main/java/org/apache/iceberg/data/FileWriterBuilderBase.java create mode 100644 data/src/main/java/org/apache/iceberg/data/ObjectModelRegistry.java create mode 100644 data/src/main/java/org/apache/iceberg/data/PositionDeleteWriterBuilder.java create mode 100644 data/src/main/java/org/apache/iceberg/data/WriteBuilder.java create mode 100644 data/src/main/java/org/apache/iceberg/data/WriterBuilderBase.java diff --git a/build.gradle b/build.gradle index ab5e9700c6aa..aee10d13dc20 100644 --- a/build.gradle +++ b/build.gradle @@ -373,6 +373,7 @@ project(':iceberg-data') { dependencies { implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') api project(':iceberg-api') + implementation project(':iceberg-common') implementation project(':iceberg-core') compileOnly project(':iceberg-parquet') compileOnly project(':iceberg-orc') diff --git a/core/src/main/java/org/apache/iceberg/io/AppenderBuilder.java b/core/src/main/java/org/apache/iceberg/io/AppenderBuilder.java new file mode 100644 index 000000000000..3efc06720b77 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/AppenderBuilder.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.Schema; + +/** + * Interface which should be implemented by the data file format implementations. The {@link + * AppenderBuilder} will be parametrized based on the user provided configuration and finally the + * {@link AppenderBuilder#build(AppenderBuilder.WriteMode)} method is used to generate the appender + * for the specific writer use-cases. The following input should be handled by the appender in the + * specific modes: + * + * + * + * @param type returned by builder API to allow chained calls + * @param the engine specific schema of the input data + */ +public interface AppenderBuilder, E> { + /** Set the file schema. */ + B schema(Schema newSchema); + + /** + * Set a writer configuration property which affects the writer behavior. + * + * @param property a writer config property name + * @param value config value + * @return this for method chaining + */ + B set(String property, String value); + + /** + * Set a file metadata property in the created file. + * + * @param property a file metadata property name + * @param value config value + * @return this for method chaining + */ + B meta(String property, String value); + + /** Sets the metrics configuration used for collecting column metrics for the created file. */ + B metricsConfig(MetricsConfig newMetricsConfig); + + /** Overwrite the file if it already exists. By default, overwrite is disabled. */ + B overwrite(); + + /** + * Overwrite the file if it already exists. The default value is false. + * + * @deprecated Since 1.10.0, will be removed in 1.11.0. Only provided for backward compatibility. + * Use {@link #overwrite()} instead. + */ + @Deprecated + B overwrite(boolean enabled); + + /** + * Sets the encryption key used for writing the file. If encryption is not supported by the reader + * then an exception should be thrown. + */ + default B fileEncryptionKey(ByteBuffer encryptionKey) { + throw new UnsupportedOperationException("Not supported"); + } + + /** + * Sets the additional authentication data prefix used for writing the file. If encryption is not + * supported by the reader then an exception should be thrown. + */ + default B aadPrefix(ByteBuffer aadPrefix) { + throw new UnsupportedOperationException("Not supported"); + } + + /** + * Sets the engine native schema for the input. Defines the input type when there is N to 1 + * mapping between the engine type and the Iceberg type, and providing the Iceberg schema is not + * enough for the conversion. + */ + B engineSchema(E newEngineSchema); + + /** + * Builds the {@link FileAppender} for the configured {@link WriteMode}. Could change several + * use-case specific configurations, like: + * + *
    + *
  • Mode specific writer context (typically different for data and delete files). + *
  • Writer functions to accept data rows, or {@link + * org.apache.iceberg.deletes.PositionDelete}s + *
+ */ + FileAppender build(WriteMode mode) throws IOException; + + /** + * Writer modes. Based on the mode {@link #build(WriteMode)} could alter the appender + * configuration when creating the {@link FileAppender}. + */ + enum WriteMode { + /** Mode for appending data to a file. */ + APPENDER, + /** Mode for writing data files. */ + DATA_WRITER, + /** Mode for writing equality delete files. */ + EQUALITY_DELETE_WRITER, + /** Mode for writing position delete files. */ + POSITION_DELETE_WRITER, + /** Mode for writing position delete files with row data. */ + POSITION_DELETE_WITH_ROW_WRITER + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/ObjectModel.java b/core/src/main/java/org/apache/iceberg/io/ObjectModel.java new file mode 100644 index 000000000000..861d424df8a1 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/ObjectModel.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +import org.apache.iceberg.FileFormat; + +/** + * Direct conversion is used between file formats and engine internal formats for performance + * reasons. Object models encapsulate these conversions. + * + *

{@link ReadBuilder} is provided for reading data files stored in a given {@link FileFormat} + * into the engine specific object model. + * + *

{@link AppenderBuilder} is provided for writing engine specific object model to data/delete + * files stored in a given {@link FileFormat}. + * + *

Iceberg supports the following object models natively: + * + *

    + *
  • generic - reads and writes Iceberg {@link org.apache.iceberg.data.Record}s + *
  • spark - reads and writes Spark InternalRow records + *
  • spark-vectorized - vectorized reads for Spark columnar batches. Not supported for {@link + * FileFormat#AVRO} + *
  • flink - reads and writes Flink RowData records + *
  • arrow - vectorized reads for into Arrow columnar format. Only supported for {@link + * FileFormat#PARQUET} + *
+ * + *

Engines could implement their own object models to leverage Iceberg data file reading and + * writing capabilities. + * + * @param the engine specific schema of the input data for the appender + */ +public interface ObjectModel { + /** The file format which is read/written by the object model. */ + FileFormat format(); + + /** + * The name of the object model. Allows users to specify the object model to map the data file for + * reading and writing. + */ + String name(); + + /** + * The appender builder for the output file which writes the data in the specified file format and + * accepts the records defined by this object model. + * + * @param outputFile to write to + * @return the appender builder + * @param The type of the appender builder + */ + > B appenderBuilder(OutputFile outputFile); + + /** + * The reader builder for the input file which reads the data from the specified file format and + * returns the records in this object model. + * + * @param inputFile to read from + * @return the reader builder + * @param The type of the reader builder + */ + > B readBuilder(InputFile inputFile); +} diff --git a/core/src/main/java/org/apache/iceberg/io/ReadBuilder.java b/core/src/main/java/org/apache/iceberg/io/ReadBuilder.java new file mode 100644 index 000000000000..03c38504ad6d --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/ReadBuilder.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.iceberg.Schema; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.mapping.NameMapping; + +/** + * File formats should implement this interface to provide a builder for reading data files. {@link + * ReadBuilder} reads the data files with the specified parameters. The returned objects are defined + * by the {@link ObjectModel} which is used to read the data. + * + *

This interface is directly exposed for the users to parameterize readers. + * + * @param type returned by builder API to allow chained calls + */ +public interface ReadBuilder> { + /** The configuration key for the batch size in case of vectorized reads. */ + String RECORDS_PER_BATCH_KEY = "iceberg.records-per-batch"; + + /** + * Restricts the read to the given range: [start, start + length). + * + * @param newStart the start position for this read + * @param newLength the length of the range this read should scan + */ + B split(long newStart, long newLength); + + /** Read only the given columns. */ + B project(Schema newSchema); + + /** + * Pushes down the {@link Expression} filter for the reader to prevent reading unnecessary + * records. Some readers might not be able to filter some part of the expression. In this case the + * reader might return unfiltered or partially filtered rows. It is the caller's responsibility to + * apply the filter again. + * + * @param newFilter the filter to set + * @param filterCaseSensitive whether the filtering is case-sensitive or not + */ + default B filter(Expression newFilter, boolean filterCaseSensitive) { + // Skip filtering if not available + return (B) this; + } + + /** + * Pushes down the {@link Expression} filter for the reader to prevent reading unnecessary + * records. Some readers might not be able to filter some part of the exception. In this case the + * reader might return unfiltered or partially filtered rows. It is the caller's responsibility to + * apply the filter again. The default implementation sets the filter to be case-sensitive. + * + * @param newFilter the filter to set + */ + default B filter(Expression newFilter) { + return filter(newFilter, true); + } + + /** + * Sets configuration key/value pairs for the reader. Reader builders should ignore configuration + * keys not known for them. + */ + default B set(String key, String value) { + // Skip configuration if not applicable + return (B) this; + } + + /** + * Enables reusing the containers returned by the reader. Decreases pressure on GC. Readers could + * decide to ignore the user provided setting if is not supported by them. + */ + default B reuseContainers() { + // Skip container reuse configuration if not applicable + return (B) this; + } + + /** + * Accessors for constant field values. Used for calculating values in the result which are coming + * from metadata, and not coming from the data files themselves. The keys of the map are the + * column ids, the values are the accessors generating the values. + */ + B constantFieldAccessors(Map constantFieldAccessors); + + /** Sets a mapping from external schema names to Iceberg type IDs. */ + B withNameMapping(NameMapping newNameMapping); + + /** + * Sets the file encryption key used for reading the file. If encryption is not supported by the + * reader then an exception should be thrown. + */ + default B withFileEncryptionKey(ByteBuffer encryptionKey) { + throw new UnsupportedOperationException("Not supported"); + } + + /** + * Sets the additional authentication data prefix for encryption. If encryption is not supported + * by the reader then an exception should be thrown. + */ + default B withAADPrefix(ByteBuffer aadPrefix) { + throw new UnsupportedOperationException("Not supported"); + } + + /** Builds the reader. */ + CloseableIterable build(); +} diff --git a/data/src/main/java/org/apache/iceberg/data/AppenderBuilder.java b/data/src/main/java/org/apache/iceberg/data/AppenderBuilder.java new file mode 100644 index 000000000000..37f5e8086195 --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/AppenderBuilder.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.io.IOException; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.FileAppender; + +/** + * Builder for generating a {@link FileAppender}. + * + * @param type of the builder + * @param engine specific schema of the input records used for appender initialization + */ +public interface AppenderBuilder, E> + extends WriterBuilderBase { + /** + * Creates a {@link FileAppender} based on the configurations set. The appender will expect inputs + * defined by the {@link #engineSchema(Object)}} which should match the Iceberg schema defined by + * {@link #schema(Schema)}. + * + * @param the type of data that the appender will handle + * @return a {@link FileAppender} instance configured with the specified settings + * @throws IOException if an I/O error occurs during the creation of the appender + */ + FileAppender appender() throws IOException; +} diff --git a/data/src/main/java/org/apache/iceberg/data/DataWriterBuilder.java b/data/src/main/java/org/apache/iceberg/data/DataWriterBuilder.java new file mode 100644 index 000000000000..6475728be5a4 --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/DataWriterBuilder.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.io.IOException; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.DataWriter; + +/** + * Builder for generating a {@link DataWriter}. + * + * @param type of the builder + * @param engine specific schema of the input records used for appender initialization + */ +public interface DataWriterBuilder, E> + extends FileWriterBuilderBase { + /** + * Creates a writer which generates a {@link org.apache.iceberg.DataFile} based on the + * configurations set. The data writer will expect inputs defined by the {@link + * #engineSchema(Object)} which should be convertible to the Iceberg schema defined by {@link + * #schema(Schema)}. + * + * @param the type of data that the writer will handle + * @return a {@link DataWriter} instance configured with the specified settings + * @throws IOException if an I/O error occurs during the creation of the writer + */ + DataWriter dataWriter() throws IOException; +} diff --git a/data/src/main/java/org/apache/iceberg/data/EqualityDeleteWriterBuilder.java b/data/src/main/java/org/apache/iceberg/data/EqualityDeleteWriterBuilder.java new file mode 100644 index 000000000000..44e07a1f8703 --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/EqualityDeleteWriterBuilder.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.Schema; +import org.apache.iceberg.deletes.EqualityDeleteWriter; + +/** + * Builder for generating an {@link EqualityDeleteWriter}. + * + * @param type of the builder + * @param engine specific schema of the input records used for appender initialization + */ +public interface EqualityDeleteWriterBuilder, E> + extends FileWriterBuilderBase { + /** Sets the row schema for the delete writers. */ + B withRowSchema(Schema newSchema); + + /** Sets the equality field ids for the equality delete writer. */ + B withEqualityFieldIds(List fieldIds); + + /** Sets the equality field ids for the equality delete writer. */ + B withEqualityFieldIds(int... fieldIds); + + /** + * Creates a writer which generates an equality {@link DeleteFile} based on the configurations + * set. The writer will expect inputs defined by the {@link #engineSchema(Object)} which should be + * convertible to the Iceberg schema defined by {@link #withRowSchema(Schema)}. + * + * @param the type of data that the writer will handle + * @return a {@link EqualityDeleteWriter} instance configured with the specified settings + * @throws IOException if an I/O error occurs during the creation of the writer + */ + EqualityDeleteWriter equalityDeleteWriter() throws IOException; +} diff --git a/data/src/main/java/org/apache/iceberg/data/FileWriterBuilderBase.java b/data/src/main/java/org/apache/iceberg/data/FileWriterBuilderBase.java new file mode 100644 index 000000000000..1b1b15f1d669 --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/FileWriterBuilderBase.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.io.DataWriter; + +/** + * Builder for generating one of the following: + * + *

    + *
  • {@link DataWriter} + *
  • {@link EqualityDeleteWriter} + *
  • {@link PositionDeleteWriter} + *
+ * + * @param type of the builder + * @param engine specific schema of the input records used for appender initialization + */ +interface FileWriterBuilderBase, E> + extends WriterBuilderBase { + /** Sets the partition specification for the Iceberg metadata. */ + B withSpec(PartitionSpec newSpec); + + /** Sets the partition value for the Iceberg metadata. */ + B withPartition(StructLike newPartition); + + /** Sets the encryption key metadata for Iceberg metadata. */ + B withKeyMetadata(EncryptionKeyMetadata metadata); + + /** Sets the sort order for the Iceberg metadata. */ + B withSortOrder(SortOrder newSortOrder); +} diff --git a/data/src/main/java/org/apache/iceberg/data/ObjectModelRegistry.java b/data/src/main/java/org/apache/iceberg/data/ObjectModelRegistry.java new file mode 100644 index 000000000000..1cb1ac98c84d --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/ObjectModelRegistry.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.io.AppenderBuilder; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.ObjectModel; +import org.apache.iceberg.io.ReadBuilder; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Registry which provides the available {@link ReadBuilder}s and writer builders ({@link + * org.apache.iceberg.data.AppenderBuilder}, {@link DataWriterBuilder}, {@link + * EqualityDeleteWriterBuilder}, {@link PositionDeleteWriterBuilder}). Based on the `file format` + * and the requested `object model name` the registry returns the correct reader and writer + * builders. These builders could be used to generate the readers and writers. + * + *

The available {@link ObjectModel}s are registered by the {@link + * #registerObjectModel(ObjectModel)} method. These {@link ObjectModel}s will be used to create the + * {@link ReadBuilder}s and the {@link AppenderBuilder}s. The former ones are returned directly, the + * later ones are wrapped in the appropriate writer builder implementations. + */ +public final class ObjectModelRegistry { + private static final Logger LOG = LoggerFactory.getLogger(ObjectModelRegistry.class); + // The list of classes which are used for registering the reader and writer builders + private static final List CLASSES_TO_REGISTER = ImmutableList.of(); + + private static final Map> OBJECT_MODELS = Maps.newConcurrentMap(); + + /** + * Registers a new object model. + * + * @param objectModel the object model + * @throws IllegalArgumentException if an object model for the given {@code format} and {@code + * objectModelName} combination already exists + */ + @SuppressWarnings("CatchBlockLogException") + public static void registerObjectModel(ObjectModel objectModel) { + try { + Key key = new Key(objectModel.format(), objectModel.name()); + if (OBJECT_MODELS.containsKey(key)) { + throw new IllegalArgumentException( + String.format( + "Object model %s clashes with %s. Both serves %s", + objectModel.getClass(), OBJECT_MODELS.get(key), key)); + } + + OBJECT_MODELS.put(key, objectModel); + } catch (RuntimeException e) { + // failing to register an object model is normal and does not require a stack trace + LOG.info( + "Unable to use register object model {} for data files: {}", objectModel, e.getMessage()); + } + } + + @SuppressWarnings("CatchBlockLogException") + private static void registerSupportedFormats() { + // Uses dynamic methods to call the `register` for the listed classes + for (String classToRegister : CLASSES_TO_REGISTER) { + try { + DynMethods.builder("register").impl(classToRegister).buildStaticChecked().invoke(); + } catch (NoSuchMethodException e) { + // failing to register an object model is normal and does not require a stack trace + LOG.info("Unable to register {} for data files: {}", classToRegister, e.getMessage()); + } + } + } + + static { + registerSupportedFormats(); + } + + private ObjectModelRegistry() {} + + /** + * Provides a reader builder for the given input file which returns objects generated by the given + * object model. + * + * @param format of the file to read + * @param objectModelName name of the object model used to generate the reader + * @param inputFile to read + * @return {@link ReadBuilder} for building the actual reader + */ + public static ReadBuilder readBuilder( + FileFormat format, String objectModelName, InputFile inputFile) { + return OBJECT_MODELS.get(new Key(format, objectModelName)).readBuilder(inputFile); + } + + /** + * Provides an appender builder for the given output file which writes a data file with a given + * file format and expects input records defined by the object model. + * + * @param format of the file to write + * @param objectModelName name of the object model used to generate the writer + * @param outputFile to write + * @param type for the engine specific schema expected by the appender + * @return {@link ReadBuilder} for building the actual reader + */ + public static org.apache.iceberg.data.AppenderBuilder appenderBuilder( + FileFormat format, String objectModelName, EncryptedOutputFile outputFile) { + return writerFor(format, objectModelName, outputFile); + } + + /** + * Provides a data writer builder for the given output file which writes a data file with a given + * file format and expects input records defined by the object model. + * + * @param format of the file to write + * @param objectModelName accepted by the writer + * @param outputFile to write + * @param type for the engine specific schema expected by the writer + * @return {@link ReadBuilder} for building the actual reader + */ + public static DataWriterBuilder writerBuilder( + FileFormat format, String objectModelName, EncryptedOutputFile outputFile) { + return writerFor(format, objectModelName, outputFile); + } + + /** + * Provides an equality delete writer builder for the given output file which writes a delete file + * with a given file format and expects input records defined by the object model. + * + * @param format of the file to write + * @param objectModelName accepted by the writer + * @param outputFile to write + * @param type for the engine specific schema expected by the writer + * @return {@link ReadBuilder} for building the actual reader + */ + public static EqualityDeleteWriterBuilder equalityDeleteWriterBuilder( + FileFormat format, String objectModelName, EncryptedOutputFile outputFile) { + return writerFor(format, objectModelName, outputFile); + } + + /** + * Provides a position delete writer builder for the given output file which writes a delete file + * with a given file format and expects input records defined by the object model when the row + * data is written. + * + * @param format of the file to write + * @param objectModelName accepted by the writer + * @param outputFile to write + * @param type for the engine specific schema expected by the writer + * @return {@link ReadBuilder} for building the actual writer + */ + public static PositionDeleteWriterBuilder positionDeleteWriterBuilder( + FileFormat format, String objectModelName, EncryptedOutputFile outputFile) { + return writerFor(format, objectModelName, outputFile); + } + + @SuppressWarnings("unchecked") + private static , E> WriteBuilder writerFor( + FileFormat format, String objectModelName, EncryptedOutputFile outputFile) { + return new WriteBuilder<>( + ((ObjectModel) OBJECT_MODELS.get(new Key(format, objectModelName))) + .appenderBuilder(outputFile.encryptingOutputFile()), + outputFile.encryptingOutputFile().location(), + format); + } + + /** Key used to identify readers and writers in the {@link ObjectModelRegistry}. */ + private static class Key { + private final FileFormat fileFormat; + private final String objectModelName; + + private Key(FileFormat fileFormat, String objectModelName) { + this.fileFormat = fileFormat; + this.objectModelName = objectModelName; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("fileFormat", fileFormat) + .add("objectModelName", objectModelName) + .toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (!(o instanceof Key)) { + return false; + } + + Key other = (Key) o; + return Objects.equal(other.fileFormat, fileFormat) + && Objects.equal(other.objectModelName, objectModelName); + } + + @Override + public int hashCode() { + return Objects.hashCode(fileFormat, objectModelName); + } + } +} diff --git a/data/src/main/java/org/apache/iceberg/data/PositionDeleteWriterBuilder.java b/data/src/main/java/org/apache/iceberg/data/PositionDeleteWriterBuilder.java new file mode 100644 index 000000000000..ee43032a3c72 --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/PositionDeleteWriterBuilder.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.io.IOException; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.Schema; +import org.apache.iceberg.deletes.PositionDeleteWriter; + +/** + * Builder for generating an {@link PositionDeleteWriter}. + * + * @param type of the builder + * @param engine specific schema of the input records used for appender initialization + */ +public interface PositionDeleteWriterBuilder, E> + extends FileWriterBuilderBase { + /** Sets the row schema for the delete writers. */ + B withRowSchema(Schema newSchema); + + /** + * Creates a writer which generates a position {@link DeleteFile} based on the configurations set. + * The writer will expect {@link org.apache.iceberg.deletes.PositionDelete} records. If {@link + * #withRowSchema(Schema)} is set then the positional delete records should contain delete rows + * specified by the {@link #engineSchema(Object)}. The provided engine schema should be + * convertible to the Iceberg schema defined by {@link #withRowSchema(Schema)}. + * + * @param the type of data that the writer will handle + * @return a {@link PositionDeleteWriter} instance configured with the specified settings + * @throws IOException if an I/O error occurs during the creation of the writer + */ + PositionDeleteWriter positionDeleteWriter() throws IOException; +} diff --git a/data/src/main/java/org/apache/iceberg/data/WriteBuilder.java b/data/src/main/java/org/apache/iceberg/data/WriteBuilder.java new file mode 100644 index 000000000000..1f1f1f786e08 --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/WriteBuilder.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.io.AppenderBuilder; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.DeleteSchemaUtil; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.ArrayUtil; + +/** + * Builder implementation for generating the different writer interfaces. The builder is an internal + * class and could change without notice. Use one of the following specific interfaces: + * + *

    + *
  • {@link FileAppender} + *
  • {@link DataWriter} + *
  • {@link EqualityDeleteWriter} + *
  • {@link PositionDeleteWriter} + *
+ * + * The builder wraps the file format specific {@link AppenderBuilder}. To allow further engine and + * file format specific configuration changes for the given writer the {@link + * AppenderBuilder#build(AppenderBuilder.WriteMode)} method is called with the correct parameter to + * create the appender used internally to provide the required functionality. + * + * @param type of the appender + * @param engine specific schema of the input records used for appender initialization + */ +@SuppressWarnings("unchecked") +class WriteBuilder, A extends AppenderBuilder, E> + implements org.apache.iceberg.data.AppenderBuilder, + DataWriterBuilder, + EqualityDeleteWriterBuilder, + PositionDeleteWriterBuilder { + private final AppenderBuilder appenderBuilder; + private final String location; + private final FileFormat format; + private PartitionSpec spec = null; + private StructLike partition = null; + private EncryptionKeyMetadata keyMetadata = null; + private SortOrder sortOrder = null; + private Schema rowSchema = null; + private int[] equalityFieldIds = null; + + WriteBuilder(AppenderBuilder appenderBuilder, String location, FileFormat format) { + this.appenderBuilder = appenderBuilder; + this.location = location; + this.format = format; + } + + @Override + public B schema(Schema newSchema) { + appenderBuilder.schema(newSchema); + return (B) this; + } + + @Override + public B engineSchema(E engineSchema) { + appenderBuilder.engineSchema(engineSchema); + return (B) this; + } + + @Override + public B set(String property, String value) { + appenderBuilder.set(property, value); + return (B) this; + } + + @Override + public B set(Map properties) { + properties.forEach(appenderBuilder::set); + return (B) this; + } + + @Override + public B meta(String property, String value) { + appenderBuilder.meta(property, value); + return (B) this; + } + + @Override + public B meta(Map properties) { + properties.forEach(appenderBuilder::meta); + return (B) this; + } + + @Override + public B metricsConfig(MetricsConfig newMetricsConfig) { + appenderBuilder.metricsConfig(newMetricsConfig); + return (B) this; + } + + @Override + public B overwrite() { + appenderBuilder.overwrite(); + return (B) this; + } + + @Override + public B fileEncryptionKey(ByteBuffer encryptionKey) { + appenderBuilder.fileEncryptionKey(encryptionKey); + return (B) this; + } + + @Override + public B aadPrefix(ByteBuffer aadPrefix) { + appenderBuilder.aadPrefix(aadPrefix); + return (B) this; + } + + @Override + public B withRowSchema(Schema newSchema) { + this.rowSchema = newSchema; + return (B) this; + } + + @Override + public B withEqualityFieldIds(List fieldIds) { + this.equalityFieldIds = ArrayUtil.toIntArray(fieldIds); + return (B) this; + } + + @Override + public B withEqualityFieldIds(int... fieldIds) { + this.equalityFieldIds = fieldIds; + return (B) this; + } + + @Override + public B withSpec(PartitionSpec newSpec) { + this.spec = newSpec; + return (B) this; + } + + @Override + public B withPartition(StructLike newPartition) { + this.partition = newPartition; + return (B) this; + } + + @Override + public B withKeyMetadata(EncryptionKeyMetadata metadata) { + this.keyMetadata = metadata; + return (B) this; + } + + @Override + public B withSortOrder(SortOrder newSortOrder) { + this.sortOrder = newSortOrder; + return (B) this; + } + + @Override + public FileAppender appender() throws IOException { + return appenderBuilder.build(AppenderBuilder.WriteMode.APPENDER); + } + + @Override + public DataWriter dataWriter() throws IOException { + Preconditions.checkArgument(spec != null, "Cannot create data writer without spec"); + Preconditions.checkArgument( + spec.isUnpartitioned() || partition != null, + "Partition must not be null when creating data writer for partitioned spec"); + + return new DataWriter<>( + appenderBuilder.build(AppenderBuilder.WriteMode.DATA_WRITER), + format, + location, + spec, + partition, + keyMetadata, + sortOrder); + } + + @Override + public EqualityDeleteWriter equalityDeleteWriter() throws IOException { + Preconditions.checkState( + rowSchema != null, "Cannot create equality delete file without a schema"); + Preconditions.checkState( + equalityFieldIds != null, "Cannot create equality delete file without delete field ids"); + Preconditions.checkArgument( + spec != null, "Spec must not be null when creating equality delete writer"); + Preconditions.checkArgument( + spec.isUnpartitioned() || partition != null, + "Partition must not be null for partitioned writes"); + + return new EqualityDeleteWriter<>( + appenderBuilder + .schema(rowSchema) + .meta("delete-type", "equality") + .meta( + "delete-field-ids", + IntStream.of(equalityFieldIds) + .mapToObj(Objects::toString) + .collect(Collectors.joining(", "))) + .build(AppenderBuilder.WriteMode.EQUALITY_DELETE_WRITER), + format, + location, + spec, + partition, + keyMetadata, + sortOrder, + equalityFieldIds); + } + + @Override + public PositionDeleteWriter positionDeleteWriter() throws IOException { + Preconditions.checkState( + equalityFieldIds == null, "Cannot create position delete file using delete field ids"); + Preconditions.checkArgument( + spec != null, "Spec must not be null when creating position delete writer"); + Preconditions.checkArgument( + spec.isUnpartitioned() || partition != null, + "Partition must not be null for partitioned writes"); + + return new PositionDeleteWriter<>( + appenderBuilder + .meta("delete-type", "position") + .schema(DeleteSchemaUtil.posDeleteSchema(rowSchema)) + .build( + rowSchema != null + ? AppenderBuilder.WriteMode.POSITION_DELETE_WITH_ROW_WRITER + : AppenderBuilder.WriteMode.POSITION_DELETE_WRITER), + format, + location, + spec, + partition, + keyMetadata); + } +} diff --git a/data/src/main/java/org/apache/iceberg/data/WriterBuilderBase.java b/data/src/main/java/org/apache/iceberg/data/WriterBuilderBase.java new file mode 100644 index 000000000000..c21660a365f7 --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/WriterBuilderBase.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.Schema; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.io.AppenderBuilder; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.FileAppender; + +/** + * Builder for generating one of the following: + * + *
    + *
  • {@link FileAppender} + *
  • {@link DataWriter} + *
  • {@link EqualityDeleteWriter} + *
  • {@link PositionDeleteWriter} + *
+ * + * @param type of the builder + * @param engine specific schema of the input records used for appender initialization + */ +interface WriterBuilderBase, E> { + + /** Set the file schema. */ + B schema(Schema newSchema); + + /** + * Sets the engine specific schema for the input. Used by the {@link + * AppenderBuilder#build(AppenderBuilder.WriteMode)} to configure the engine specific converters. + */ + B engineSchema(E engineSchema); + + /** + * Set a writer configuration property which affects the writer behavior. + * + * @param property a writer config property name + * @param value config value + * @return this for method chaining + */ + B set(String property, String value); + + /** + * Adds the new properties to the writer configuration. + * + * @param properties a map of writer config properties + * @return this for method chaining + */ + B set(Map properties); + + /** + * Set a file metadata property in the created file. + * + * @param property a file metadata property name + * @param value config value + * @return this for method chaining + */ + B meta(String property, String value); + + /** + * Add the new properties to file metadata for the created file. + * + * @param properties a map of file metadata properties + * @return this for method chaining + */ + B meta(Map properties); + + /** Sets the metrics configuration used for collecting column metrics for the created file. */ + B metricsConfig(MetricsConfig newMetricsConfig); + + /** Overwrite the file if it already exists. By default, overwrite is disabled. */ + B overwrite(); + + /** + * Sets the encryption key used for writing the file. If encryption is not supported by the writer + * then an exception should be thrown. + */ + B fileEncryptionKey(ByteBuffer encryptionKey); + + /** + * Sets the additional authentication data prefix used for writing the file. If encryption is not + * supported by the writer then an exception should be thrown. + */ + B aadPrefix(ByteBuffer aadPrefix); +} From 5b79dbd87e835ba7f3a901a689b2968f1f2aa369 Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Tue, 29 Apr 2025 12:33:23 +0200 Subject: [PATCH 02/50] Renjie's comments --- .../main/java/org/apache/iceberg/io/AppenderBuilder.java | 3 --- .../java/org/apache/iceberg/data/ObjectModelRegistry.java | 6 +++--- .../src/main/java/org/apache/iceberg/data/WriteBuilder.java | 2 +- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/io/AppenderBuilder.java b/core/src/main/java/org/apache/iceberg/io/AppenderBuilder.java index 3efc06720b77..357bb8ef74cb 100644 --- a/core/src/main/java/org/apache/iceberg/io/AppenderBuilder.java +++ b/core/src/main/java/org/apache/iceberg/io/AppenderBuilder.java @@ -33,7 +33,6 @@ *