diff --git a/core/src/main/java/org/apache/iceberg/formats/CommonWriteBuilder.java b/core/src/main/java/org/apache/iceberg/formats/CommonWriteBuilder.java new file mode 100644 index 000000000000..b37e755926f1 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/formats/CommonWriteBuilder.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.formats; + +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.io.DataWriter; + +/** + * A generic builder interface for creating specialized file writers in the Iceberg ecosystem. + * + *

This builder provides a unified configuration API for generating various types of content + * writers: + * + *

+ * + *

Each concrete implementation configures the underlying file format writer while adding + * content-specific metadata and behaviors. + * + * @param the concrete builder type for method chaining + */ +interface CommonWriteBuilder> { + + /** + * Set a writer configuration property which affects the writer behavior. + * + * @param property a writer config property name + * @param value config value + * @return this for method chaining + */ + B set(String property, String value); + + /** + * Adds the new properties to the writer configuration. + * + * @param properties a map of writer config properties + * @return this for method chaining + */ + default B setAll(Map properties) { + properties.forEach(this::set); + return self(); + } + + /** + * Set a file metadata property in the created file. + * + * @param property a file metadata property name + * @param value config value + * @return this for method chaining + */ + B meta(String property, String value); + + /** + * Add the new properties to file metadata for the created file. + * + * @param properties a map of file metadata properties + * @return this for method chaining + */ + default B meta(Map properties) { + properties.forEach(this::meta); + return self(); + } + + /** Sets the metrics configuration used for collecting column metrics for the created file. */ + B metricsConfig(MetricsConfig metricsConfig); + + /** Overwrite the file if it already exists. By default, overwrite is disabled. */ + B overwrite(); + + /** + * Sets the encryption key used for writing the file. If the writer does not support encryption, + * then an exception should be thrown. + */ + B withFileEncryptionKey(ByteBuffer encryptionKey); + + /** + * Sets the additional authentication data (AAD) prefix used for writing the file. If the writer + * does not support encryption, then an exception should be thrown. + */ + B withAADPrefix(ByteBuffer aadPrefix); + + /** Sets the partition specification for the Iceberg metadata. */ + B spec(PartitionSpec newSpec); + + /** Sets the partition value for the Iceberg metadata. */ + B partition(StructLike partition); + + /** Sets the encryption key metadata for Iceberg metadata. */ + B keyMetadata(EncryptionKeyMetadata keyMetadata); + + /** Sets the sort order for the Iceberg metadata. */ + B sortOrder(SortOrder sortOrder); + + B self(); +} diff --git a/core/src/main/java/org/apache/iceberg/formats/CommonWriteBuilderImpl.java b/core/src/main/java/org/apache/iceberg/formats/CommonWriteBuilderImpl.java new file mode 100644 index 000000000000..9e5d9b1605cb --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/formats/CommonWriteBuilderImpl.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.formats; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * An internal implementation that handles all {@link CommonWriteBuilder} interface variants. + * + *

This unified implementation serves as a backend for multiple specialized content writers: + * + *

+ * + *

The implementation delegates to a format-specific {@link WriteBuilder} while enriching it with + * content-specific functionality. When building a writer, the implementation configures the + * underlying builder and calls its {@link WriteBuilder#build()} method to create the appropriate + * specialized writer for the requested content type. + * + * @param the concrete builder type for method chaining + * @param the type of data records the writer will accept + * @param the type of the schema for the input data + */ +abstract class CommonWriteBuilderImpl, D, S> + implements CommonWriteBuilder { + private final WriteBuilder writeBuilder; + private final String location; + private final FileFormat format; + private PartitionSpec spec = null; + private StructLike partition = null; + private EncryptionKeyMetadata keyMetadata = null; + private SortOrder sortOrder = null; + + static DataWriteBuilder forDataFile( + WriteBuilder writeBuilder, String location, FileFormat format) { + return new DataFileWriteBuilder<>(writeBuilder.content(FileContent.DATA), location, format); + } + + static EqualityDeleteWriteBuilder forEqualityDelete( + WriteBuilder writeBuilder, String location, FileFormat format) { + return new EqualityDeleteFileWriteBuilder<>( + writeBuilder.content(FileContent.EQUALITY_DELETES), location, format); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + static PositionDeleteWriteBuilder forPositionDelete( + WriteBuilder writeBuilder, String location, FileFormat format) { + return new PositionDeleteFileWriteBuilder( + (WriteBuilder) writeBuilder.content(FileContent.POSITION_DELETES), + location, + format); + } + + private CommonWriteBuilderImpl( + WriteBuilder writeBuilder, String location, FileFormat format) { + this.writeBuilder = writeBuilder; + this.location = location; + this.format = format; + } + + @Override + public B set(String property, String value) { + writeBuilder.set(property, value); + return self(); + } + + @Override + public B meta(String property, String value) { + writeBuilder.meta(property, value); + return self(); + } + + @Override + public B metricsConfig(MetricsConfig metricsConfig) { + writeBuilder.metricsConfig(metricsConfig); + return self(); + } + + @Override + public B overwrite() { + writeBuilder.overwrite(); + return self(); + } + + @Override + public B withFileEncryptionKey(ByteBuffer encryptionKey) { + writeBuilder.withFileEncryptionKey(encryptionKey); + return self(); + } + + @Override + public B withAADPrefix(ByteBuffer aadPrefix) { + writeBuilder.withAADPrefix(aadPrefix); + return self(); + } + + @Override + public B spec(PartitionSpec newSpec) { + this.spec = newSpec; + return self(); + } + + @Override + public B partition(StructLike newPartition) { + this.partition = newPartition; + return self(); + } + + @Override + public B keyMetadata(EncryptionKeyMetadata newKeyMetadata) { + this.keyMetadata = newKeyMetadata; + return self(); + } + + @Override + public B sortOrder(SortOrder newSortOrder) { + this.sortOrder = newSortOrder; + return self(); + } + + private static class DataFileWriteBuilder + extends CommonWriteBuilderImpl, D, S> + implements DataWriteBuilder { + private DataFileWriteBuilder( + WriteBuilder writeBuilder, String location, FileFormat format) { + super(writeBuilder, location, format); + } + + @Override + public DataFileWriteBuilder schema(Schema schema) { + super.writeBuilder.schema(schema); + return this; + } + + @Override + public DataFileWriteBuilder inputSchema(S schema) { + super.writeBuilder.inputSchema(schema); + return this; + } + + @Override + public DataFileWriteBuilder self() { + return this; + } + + @Override + public DataWriter build() throws IOException { + Preconditions.checkArgument(super.spec != null, "Cannot create data writer without spec"); + Preconditions.checkArgument( + super.spec.isUnpartitioned() || super.partition != null, + "Partition must not be null when creating data writer for partitioned spec"); + + return new DataWriter<>( + super.writeBuilder.build(), + super.format, + super.location, + super.spec, + super.partition, + super.keyMetadata, + super.sortOrder); + } + } + + private static class EqualityDeleteFileWriteBuilder + extends CommonWriteBuilderImpl, D, S> + implements EqualityDeleteWriteBuilder { + private Schema rowSchema = null; + private int[] equalityFieldIds = null; + + private EqualityDeleteFileWriteBuilder( + WriteBuilder writeBuilder, String location, FileFormat format) { + super(writeBuilder, location, format); + } + + @Override + public EqualityDeleteFileWriteBuilder inputSchema(S schema) { + super.writeBuilder.inputSchema(schema); + return this; + } + + @Override + public EqualityDeleteFileWriteBuilder self() { + return this; + } + + @Override + public EqualityDeleteFileWriteBuilder rowSchema(Schema schema) { + this.rowSchema = schema; + return this; + } + + @Override + public EqualityDeleteFileWriteBuilder equalityFieldIds(int... fieldIds) { + this.equalityFieldIds = fieldIds; + return this; + } + + @Override + public EqualityDeleteWriter build() throws IOException { + Preconditions.checkState( + rowSchema != null, "Cannot create equality delete file without a schema"); + Preconditions.checkState( + equalityFieldIds != null, "Cannot create equality delete file without delete field ids"); + Preconditions.checkArgument( + super.spec != null, "Spec must not be null when creating equality delete writer"); + Preconditions.checkArgument( + super.spec.isUnpartitioned() || super.partition != null, + "Partition must not be null for partitioned writes"); + + return new EqualityDeleteWriter<>( + super.writeBuilder + .schema(rowSchema) + .meta("delete-type", "equality") + .meta( + "delete-field-ids", + IntStream.of(equalityFieldIds) + .mapToObj(Objects::toString) + .collect(Collectors.joining(", "))) + .build(), + super.format, + super.location, + super.spec, + super.partition, + super.keyMetadata, + super.sortOrder, + equalityFieldIds); + } + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private static class PositionDeleteFileWriteBuilder + extends CommonWriteBuilderImpl + implements PositionDeleteWriteBuilder { + + private PositionDeleteFileWriteBuilder( + WriteBuilder writeBuilder, String location, FileFormat format) { + super(writeBuilder, location, format); + } + + @Override + public PositionDeleteFileWriteBuilder self() { + return this; + } + + @Override + @SuppressWarnings("unchecked") + public PositionDeleteWriter build() throws IOException { + Preconditions.checkArgument( + super.spec != null, "Spec must not be null when creating position delete writer"); + Preconditions.checkArgument( + super.spec.isUnpartitioned() || super.partition != null, + "Partition must not be null for partitioned writes"); + + return new PositionDeleteWriter<>( + new PositionDeleteFileAppender( + super.writeBuilder.meta("delete-type", "position").build()), + super.format, + super.location, + super.spec, + super.partition, + super.keyMetadata); + } + } + + @SuppressWarnings("rawtypes") + private static class PositionDeleteFileAppender implements FileAppender { + private final FileAppender appender; + + PositionDeleteFileAppender(FileAppender appender) { + this.appender = appender; + } + + @Override + public void add(StructLike positionDelete) { + appender.add((PositionDelete) positionDelete); + } + + @Override + public Metrics metrics() { + return appender.metrics(); + } + + @Override + public long length() { + return appender.length(); + } + + @Override + public void close() throws IOException { + appender.close(); + } + + @Override + public List splitOffsets() { + return appender.splitOffsets(); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/formats/DataWriteBuilder.java b/core/src/main/java/org/apache/iceberg/formats/DataWriteBuilder.java new file mode 100644 index 000000000000..d81734794874 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/formats/DataWriteBuilder.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.formats; + +import java.io.IOException; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.DataWriter; + +/** + * A specialized builder for creating data content file writers. + * + *

This builder extends the generic {@link CommonWriteBuilder} interface with functionality + * specific to creating {@link DataWriter} instances. Data writers produce table content files + * containing actual data records stored in an Iceberg table, configured according to the table's + * schema and partition specification. + * + * @param the type of data records the writer will accept + * @param the type of the schema for the input data + */ +public interface DataWriteBuilder extends CommonWriteBuilder> { + + /** Set the file schema. */ + DataWriteBuilder schema(Schema schema); + + /** + * Sets the input schema accepted by the writer. If not provided derived from the {@link + * #schema(Schema)}. + */ + DataWriteBuilder inputSchema(S schema); + + /** + * Creates a data file writer configured with the current builder settings. + * + *

The returned {@link DataWriter} produces files that conform to the Iceberg table format, + * generating proper {@link DataFile} metadata on completion. The writer accepts input records + * exactly matching the Iceberg schema specified via {@link #schema(Schema)} for writing. + * + * @return a fully configured {@link DataWriter} instance + * @throws IOException if the writer cannot be created due to I/O errors + */ + DataWriter build() throws IOException; +} diff --git a/core/src/main/java/org/apache/iceberg/formats/EqualityDeleteWriteBuilder.java b/core/src/main/java/org/apache/iceberg/formats/EqualityDeleteWriteBuilder.java new file mode 100644 index 000000000000..25b38da22159 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/formats/EqualityDeleteWriteBuilder.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.formats; + +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.Schema; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.util.ArrayUtil; + +/** + * A specialized builder for creating equality-based delete file writers. + * + *

This builder extends the generic {@link CommonWriteBuilder} interface with functionality + * specific to creating {@link EqualityDeleteWriter} instances. + * + *

The builder provides methods to configure which fields should be used for equality comparison + * through {@link #equalityFieldIds(List)} or {@link #equalityFieldIds(int...)}, along with schema + * configuration for the delete records. + * + * @param the type of data records the writer will accept + * @param the type of the schema for the input data + */ +public interface EqualityDeleteWriteBuilder + extends CommonWriteBuilder> { + + /** + * Sets the input schema accepted by the writer. If not provided derived from the {@link + * #rowSchema(Schema)}. + */ + EqualityDeleteWriteBuilder inputSchema(S schema); + + /** Sets the row schema for the delete writers. */ + EqualityDeleteWriteBuilder rowSchema(Schema rowSchema); + + /** Sets the equality field ids for the equality delete writer. */ + default EqualityDeleteWriteBuilder equalityFieldIds(List fieldIds) { + return equalityFieldIds(ArrayUtil.toIntArray(fieldIds)); + } + + /** Sets the equality field ids for the equality delete writer. */ + EqualityDeleteWriteBuilder equalityFieldIds(int... fieldIds); + + /** + * Creates an equality-based delete file writer configured with the current builder settings. + * + *

The returned {@link EqualityDeleteWriter} produces files that identify records to be deleted + * based on field equality, generating proper {@link DeleteFile} metadata on completion. + * + *

The writer accepts input records exactly matching the input schema specified via {@link + * #rowSchema(Schema)} for deletion. + * + * @return a fully configured {@link EqualityDeleteWriter} instance + * @throws IOException if the writer cannot be created due to I/O errors + */ + EqualityDeleteWriter build() throws IOException; +} diff --git a/core/src/main/java/org/apache/iceberg/formats/FormatModel.java b/core/src/main/java/org/apache/iceberg/formats/FormatModel.java new file mode 100644 index 000000000000..c8164aba1d8f --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/formats/FormatModel.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.formats; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.io.InputFile; + +/** + * Interface that provides a unified abstraction for converting between data file formats and + * input/output data representations. + * + *

{@link FormatModel} serves as a bridge between storage formats ({@link FileFormat}) and + * expected input/output data structures, optimizing performance through direct conversion without + * intermediate representations. File format implementations handle the low-level parsing details + * while the object model determines the in-memory representation used for the parsed data. + * Together, these provide a consistent API for consuming data files while optimizing for specific + * processing engines. + * + *

Iceberg provides some built-in object models and processing engines can implement custom + * object models to integrate with Iceberg's file reading and writing capabilities. + * + * @param output type used for reading data, and input type for writing data and deletes + * @param the type of the schema for the input/output data + */ +public interface FormatModel { + /** The file format which is read/written by the object model. */ + FileFormat format(); + + /** + * Return the row type class for the object model implementation processed by this factory. + * + *

The model types act as a contract specifying the expected data structures for both reading + * (converting file formats into output objects) and writing (converting input objects into file + * formats). This ensures proper integration between Iceberg's storage layer and processing + * engines. + * + *

Processing engines can define their own object models by implementing this interface and + * using their own model name. They can register these models with Iceberg by using the {@link + * FormatModelRegistry}. This allows custom data representations to be seamlessly integrated with + * Iceberg's file format handlers. + * + * @return the type of the data structures handled by this model implementation + */ + Class type(); + + /** + * Return the schema type class for the object model implementation processed by this factory. + * + * @return the type of the schema for the data structures handled by this model implementation + */ + Class schemaType(); + + /** + * Creates a writer builder for data files. + * + *

The returned {@link WriteBuilder} configures and creates a writer that converts input + * objects into the file format supported by this factory. + * + * @param outputFile destination for the written data + * @return configured writer builder + */ + WriteBuilder writeBuilder(EncryptedOutputFile outputFile); + + /** + * Creates a file reader builder for the specified input file. + * + *

The returned {@link ReadBuilder} configures and creates a reader that converts data from the + * file format into output objects supported by this factory. + * + * @param inputFile source file to read from + * @return configured reader builder for the specified input + */ + ReadBuilder readBuilder(InputFile inputFile); +} diff --git a/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java b/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java new file mode 100644 index 000000000000..f45bc9942cb6 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.formats; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A registry that manages file-format-specific readers and writers through a unified object model + * factory interface. + * + *

This registry provides access to {@link ReadBuilder}s for data consumption and various writer + * builders: + * + *

+ * + * The appropriate builder is selected based on {@link FileFormat} and object model name. + * + *

{@link FormatModel} objects are registered through {@link #register(FormatModel)} and used for + * creating readers and writers. Read builders are returned directly from the factory. Write + * builders may be wrapped in specialized content file writer implementations depending on the + * requested builder type. + */ +public final class FormatModelRegistry { + private static final Logger LOG = LoggerFactory.getLogger(FormatModelRegistry.class); + // The list of classes which are used for registering the reader and writer builders + private static final List CLASSES_TO_REGISTER = ImmutableList.of(); + + // Format models indexed by file format and object model class + private static final Map>, FormatModel> MODELS = + Maps.newConcurrentMap(); + + static { + registerSupportedFormats(); + } + + /** + * Registers an {@link FormatModel} in this registry. + * + *

The {@link FormatModel} creates readers and writers for a specific combinations of file + * format (Parquet, ORC, Avro) and object model (for example: "generic", "spark", "flink", etc.). + * Registering custom factories allows integration of new data processing engines for the + * supported file formats with Iceberg's file access mechanisms. + * + *

Each factory must be uniquely identified by its combination of file format and object model + * name. This uniqueness constraint prevents ambiguity when selecting factories for read and write + * operations. + * + * @param formatModel the factory implementation to register + * @throws IllegalArgumentException if a factory is already registered for the combination of + * {@link FormatModel#format()} and {@link FormatModel#type()} + */ + public static synchronized void register(FormatModel formatModel) { + Pair> key = Pair.of(formatModel.format(), formatModel.type()); + + FormatModel existing = MODELS.get(key); + Preconditions.checkArgument( + existing == null, + "Cannot register %s: %s is registered for format=%s type=%s schemaType=%s", + formatModel.getClass(), + existing == null ? null : existing.getClass(), + key.first(), + key.second(), + existing == null ? null : existing.schemaType()); + + MODELS.put(key, formatModel); + } + + /** + * Returns a reader builder for the specified file format and object model. + * + *

The returned {@link ReadBuilder} provides a fluent interface for configuring how data is + * read from the input file and converted to the output objects. + * + * @param format the file format (Parquet, Avro, ORC) that determines the parsing implementation + * @param type the output type + * @param inputFile source file to read data from + * @param the type of data records the reader will produce + * @param the type of the output schema for the reader + * @return a configured reader builder for the specified format and object model + */ + public static ReadBuilder readBuilder( + FileFormat format, Class type, InputFile inputFile) { + FormatModel factory = factoryFor(format, type); + return factory.readBuilder(inputFile); + } + + /** + * Returns a writer builder for generating a {@link DataFile}. + * + *

The returned builder produces a writer that accepts records defined by the specified object + * model and persists them using the provided file format. Unlike basic writers, this writer + * collects file metadata during the writing process and generates a {@link DataFile} that can be + * used for table operations. + * + * @param format the file format used for writing + * @param type the input type + * @param outputFile destination for the written data + * @param the type of data records the writer will accept + * @param the type of the input schema for the writer + * @return a configured data write builder for creating a {@link DataWriter} + */ + public static DataWriteBuilder dataWriteBuilder( + FileFormat format, Class type, EncryptedOutputFile outputFile) { + FormatModel factory = factoryFor(format, type); + return CommonWriteBuilderImpl.forDataFile( + factory.writeBuilder(outputFile), outputFile.encryptingOutputFile().location(), format); + } + + /** + * Creates a writer builder for generating a {@link DeleteFile} with equality deletes. + * + *

The returned builder produces a writer that accepts records defined by the specified object + * model and persists them using the given file format. The writer persists equality delete + * records that identify rows to be deleted based on the configured equality fields, producing a + * {@link DeleteFile} that can be used for table operations. + * + * @param format the file format used for writing + * @param type the input type + * @param outputFile destination for the written data + * @param the type of data records the writer will accept + * @param the type of the input schema for the writer + * @return a configured delete write builder for creating an {@link EqualityDeleteWriter} + */ + public static EqualityDeleteWriteBuilder equalityDeleteWriteBuilder( + FileFormat format, Class type, EncryptedOutputFile outputFile) { + FormatModel factory = factoryFor(format, type); + return CommonWriteBuilderImpl.forEqualityDelete( + factory.writeBuilder(outputFile), outputFile.encryptingOutputFile().location(), format); + } + + /** + * Creates a writer builder for generating a {@link DeleteFile} with position-based deletes. + * + *

The returned builder produces a writer that accepts records defined by the specified object + * model and persists them using the given file format. The writer accepts {@link PositionDelete} + * records that identify rows to be deleted by file path and position, producing a {@link + * DeleteFile} that can be used for table operations. + * + * @param format the file format used for writing + * @param outputFile destination for the written data + * @return a configured delete write builder for creating a {@link PositionDeleteWriter} + */ + @SuppressWarnings("rawtypes") + public static PositionDeleteWriteBuilder positionDeleteWriteBuilder( + FileFormat format, EncryptedOutputFile outputFile) { + FormatModel factory = factoryFor(format, PositionDelete.class); + return CommonWriteBuilderImpl.forPositionDelete( + factory.writeBuilder(outputFile), outputFile.encryptingOutputFile().location(), format); + } + + @VisibleForTesting + static Map>, FormatModel> models() { + return MODELS; + } + + @SuppressWarnings("unchecked") + private static FormatModel factoryFor(FileFormat format, Class type) { + FormatModel model = (FormatModel) MODELS.get(Pair.of(format, type)); + Preconditions.checkArgument( + model != null, "Format model is not registered for format %s and type %s", format, type); + return model; + } + + @SuppressWarnings("CatchBlockLogException") + private static void registerSupportedFormats() { + // Uses dynamic methods to call the `register` for the listed classes + for (String classToRegister : CLASSES_TO_REGISTER) { + try { + DynMethods.builder("register").impl(classToRegister).buildStaticChecked().invoke(); + } catch (NoSuchMethodException e) { + // failing to register a factory is normal and does not require a stack trace + LOG.info( + "Skip registration of {}. Likely the jar is not in the classpath", classToRegister); + } + } + } + + private FormatModelRegistry() {} +} diff --git a/core/src/main/java/org/apache/iceberg/formats/PositionDeleteWriteBuilder.java b/core/src/main/java/org/apache/iceberg/formats/PositionDeleteWriteBuilder.java new file mode 100644 index 000000000000..ee379bfc249d --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/formats/PositionDeleteWriteBuilder.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.formats; + +import java.io.IOException; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; + +/** + * A specialized builder for creating position-based delete file writers. + * + *

This builder extends the generic {@link CommonWriteBuilder} interface with functionality + * specific to creating {@link PositionDeleteWriter} instances. + */ +public interface PositionDeleteWriteBuilder extends CommonWriteBuilder { + + /** + * Creates a position-based delete file writer configured with the current builder settings. + * + *

The returned {@link PositionDeleteWriter} produces files that identify records to be deleted + * by their file path and position, generating proper {@link DeleteFile} metadata on completion. + * The writer expects {@link PositionDelete} records as input. + * + * @param Only kept for backwards compatibility, the writer expects {@link PositionDelete} + * records as input, and the actual row data is not used. + * @return a fully configured {@link PositionDeleteWriter} instance + * @throws IOException if the writer cannot be created due to I/O errors + */ + PositionDeleteWriter build() throws IOException; +} diff --git a/core/src/main/java/org/apache/iceberg/formats/ReadBuilder.java b/core/src/main/java/org/apache/iceberg/formats/ReadBuilder.java new file mode 100644 index 000000000000..20116d059c4b --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/formats/ReadBuilder.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.formats; + +import java.util.Map; +import org.apache.iceberg.Schema; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.mapping.NameMapping; + +/** + * Builder interface for creating file readers across supported data file formats. The {@link + * FormatModel} implementations provides appropriate {@link ReadBuilder} instances + * + *

The {@link ReadBuilder} follows the builder pattern to configure and create {@link + * CloseableIterable} instances that read data from source files. Configuration options include + * schema projection, predicate filtering, record batching, and encryption settings. + * + *

This interface is directly exposed to users for parameterizing readers. + * + * @param the output data type produced by the reader + * @param the type of the schema for the output data type + */ +public interface ReadBuilder { + /** + * Restricts the read to the given range: [start, start + length). + * + * @param start the start position for this read + * @param length the length of the range this read should scan + */ + ReadBuilder split(long start, long length); + + /** Set the projection schema. */ + ReadBuilder project(Schema schema); + + /** Sets the expected output schema. If not provided derived from the {@link #project(Schema)}. */ + ReadBuilder outputSchema(S schema); + + /** + * Configures whether filtering should be case-sensitive. If the reader supports filtering, it + * must respect this setting. The default value is true. + * + * @param caseSensitive indicates if filtering is case-sensitive + */ + ReadBuilder caseSensitive(boolean caseSensitive); + + /** + * Pushes down the {@link Expression} filter for the reader to prevent reading unnecessary + * records. Some readers may not support filtering, or may only support filtering for certain + * expressions. In this case the reader might return unfiltered or partially filtered rows. It is + * the caller's responsibility to apply the filter again. + * + * @param filter the filter to set + */ + ReadBuilder filter(Expression filter); + + /** + * Set a reader configuration property which affects the reader behavior. Reader builders should + * ignore configuration keys not known for them. + * + * @param key a reader config property name + * @param value config value + * @return this for method chaining + */ + ReadBuilder set(String key, String value); + + /** + * Sets multiple reader configuration properties that affect the reader behavior. Reader builders + * should ignore configuration keys not known for them. + * + * @param properties reader config properties to set + * @return this for method chaining + */ + default ReadBuilder setAll(Map properties) { + properties.forEach(this::set); + return this; + } + + /** Enables reusing the containers returned by the reader. Decreases pressure on GC. */ + ReadBuilder reuseContainers(); + + /** Sets the batch size for vectorized readers. */ + ReadBuilder recordsPerBatch(int rowsPerBatch); + + /** + * Contains the values in the result objects which are coming from metadata and not coming from + * the data files themselves. The keys of the map are the column ids, the values are the constant + * values to be used in the result. + */ + ReadBuilder idToConstant(Map idToConstant); + + /** Sets a mapping from external schema names to Iceberg type IDs. */ + ReadBuilder withNameMapping(NameMapping nameMapping); + + /** Builds the reader. */ + CloseableIterable build(); +} diff --git a/core/src/main/java/org/apache/iceberg/formats/WriteBuilder.java b/core/src/main/java/org/apache/iceberg/formats/WriteBuilder.java new file mode 100644 index 000000000000..f1fee495e3da --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/formats/WriteBuilder.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.formats; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.FileAppender; + +/** + * Builder interface for creating file writers across supported data file formats. The {@link + * FormatModel} implementations provide the appropriate {@link WriteBuilder} instances. + * + *

The {@link WriteBuilder} follows the builder pattern to configure and create {@link + * FileAppender} instances that write data to the target output files. + * + *

This interface is directly exposed to users for parameterizing when only an appender is + * required. + * + * @param the output data type produced by the reader + * @param the type of the schema for the output data type + */ +public interface WriteBuilder { + /** Set the file schema. */ + WriteBuilder schema(Schema schema); + + /** + * Sets the input schema accepted by the writer. If not provided derived from the {@link + * #schema(Schema)}. + */ + WriteBuilder inputSchema(S schema); + + /** + * Set a writer configuration property which affects the writer behavior. Writer builders should + * ignore configuration keys not known for them. + * + * @param property a writer config property name + * @param value config value + * @return this for method chaining + */ + WriteBuilder set(String property, String value); + + /** + * Sets multiple writer configuration properties that affect the writer behavior. Writer builders + * should ignore configuration keys not known for them. + * + * @param properties writer config properties to set + * @return this for method chaining + */ + default WriteBuilder setAll(Map properties) { + properties.forEach(this::set); + return this; + } + + /** + * Set a file metadata property in the created file. + * + * @param property a file metadata property name + * @param value config value + * @return this for method chaining + */ + WriteBuilder meta(String property, String value); + + /** + * Sets multiple file metadata properties in the created file. + * + * @param properties file metadata properties to set + * @return this for method chaining + */ + default WriteBuilder meta(Map properties) { + properties.forEach(this::meta); + return this; + } + + /** + * Based on the target file content the generated {@link FileAppender} needs different + * configuration. + */ + WriteBuilder content(FileContent content); + + /** Sets the metrics configuration used for collecting column metrics for the created file. */ + WriteBuilder metricsConfig(MetricsConfig metricsConfig); + + /** Overwrite the file if it already exists. By default, overwrite is disabled. */ + WriteBuilder overwrite(); + + /** + * Sets the encryption key used for writing the file. If the writer does not support encryption, + * then an exception should be thrown. + */ + WriteBuilder withFileEncryptionKey(ByteBuffer encryptionKey); + + /** + * Sets the additional authentication data (AAD) prefix used for writing the file. If the writer + * does not support encryption, then an exception should be thrown. + */ + WriteBuilder withAADPrefix(ByteBuffer aadPrefix); + + /** Finalizes the configuration and builds the {@link FileAppender}. */ + FileAppender build() throws IOException; +} diff --git a/core/src/test/java/org/apache/iceberg/formats/TestFormatModelRegistry.java b/core/src/test/java/org/apache/iceberg/formats/TestFormatModelRegistry.java new file mode 100644 index 000000000000..24e168d3131b --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/formats/TestFormatModelRegistry.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.formats; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.util.Pair; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class TestFormatModelRegistry { + + @BeforeEach + void clearRegistry() { + FormatModelRegistry.models().clear(); + } + + @Test + void testSuccessfulRegister() { + FormatModel model = new DummyParquetFormatModel(Object.class, Object.class); + FormatModelRegistry.register(model); + assertThat(FormatModelRegistry.models()) + .containsEntry(Pair.of(FileFormat.PARQUET, Object.class), model); + } + + /** Tests that registering the same class with the same configuration updates the registration. */ + @Test + void testRegistrationForDifferentType() { + FormatModel model1 = new DummyParquetFormatModel(Object.class, Object.class); + FormatModel model2 = new DummyParquetFormatModel(Long.class, Object.class); + FormatModelRegistry.register(model1); + assertThat(FormatModelRegistry.models().get(Pair.of(FileFormat.PARQUET, model1.type()))) + .isSameAs(model1); + + // Registering a new model with the different format will succeed + FormatModelRegistry.register(model2); + assertThat(FormatModelRegistry.models().get(Pair.of(FileFormat.PARQUET, model1.type()))) + .isSameAs(model1); + assertThat(FormatModelRegistry.models().get(Pair.of(FileFormat.PARQUET, model2.type()))) + .isSameAs(model2); + } + + /** + * Tests that registering different classes, or different schema type for the same file format and + * type is failing. + */ + @Test + void testFailingReRegistrations() { + FormatModel model = new DummyParquetFormatModel(Object.class, Object.class); + FormatModelRegistry.register(model); + assertThat(FormatModelRegistry.models()) + .containsEntry(Pair.of(FileFormat.PARQUET, Object.class), model); + + // Registering a new model with different schema type should fail + assertThatThrownBy( + () -> + FormatModelRegistry.register( + new DummyParquetFormatModel(Object.class, String.class))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Cannot register class"); + + // Registering a new model with null schema type should fail + assertThatThrownBy( + () -> FormatModelRegistry.register(new DummyParquetFormatModel(Object.class, null))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Cannot register class"); + } + + private static class DummyParquetFormatModel implements FormatModel { + private final Class type; + private final Class schemaType; + + private DummyParquetFormatModel(Class type, Class schemaType) { + this.type = type; + this.schemaType = schemaType; + } + + @Override + public FileFormat format() { + return FileFormat.PARQUET; + } + + @Override + @SuppressWarnings("unchecked") + public Class type() { + return (Class) type; + } + + @Override + @SuppressWarnings("unchecked") + public Class schemaType() { + return (Class) schemaType; + } + + @Override + public WriteBuilder writeBuilder(EncryptedOutputFile outputFile) { + return null; + } + + @Override + public ReadBuilder readBuilder(InputFile inputFile) { + return null; + } + } +}