diff --git a/docs/docs/kafka-connect.md b/docs/docs/kafka-connect.md index 92bcb8c70891..5dabbbc6fff7 100644 --- a/docs/docs/kafka-connect.md +++ b/docs/docs/kafka-connect.md @@ -38,6 +38,7 @@ The Apache Iceberg Sink Connector for Kafka Connect is a sink connector for writ * Commit coordination for centralized Iceberg commits * Exactly-once delivery semantics * Multi-table fan-out +* Change data capture * Automatic table creation and schema evolution * Field name mapping via Iceberg’s column mapping functionality @@ -58,35 +59,37 @@ for exactly-once semantics. This requires Kafka 2.5 or later. ## Configuration -| Property | Description | -|--------------------------------------------|------------------------------------------------------------------------------------------------------------------| -| iceberg.tables | Comma-separated list of destination tables | -| iceberg.tables.dynamic-enabled | Set to `true` to route to a table specified in `routeField` instead of using `routeRegex`, default is `false` | -| iceberg.tables.route-field | For multi-table fan-out, the name of the field used to route records to tables | -| iceberg.tables.default-commit-branch | Default branch for commits, main is used if not specified | -| iceberg.tables.default-id-columns | Default comma-separated list of columns that identify a row in tables (primary key) | -| iceberg.tables.default-partition-by | Default comma-separated list of partition field names to use when creating tables | -| iceberg.tables.auto-create-enabled | Set to `true` to automatically create destination tables, default is `false` | -| iceberg.tables.evolve-schema-enabled | Set to `true` to add any missing record fields to the table schema, default is `false` | -| iceberg.tables.schema-force-optional | Set to `true` to set columns as optional during table create and evolution, default is `false` to respect schema | -| iceberg.tables.schema-case-insensitive | Set to `true` to look up table columns by case-insensitive name, default is `false` for case-sensitive | -| iceberg.tables.auto-create-props.* | Properties set on new tables during auto-create | -| iceberg.tables.write-props.* | Properties passed through to Iceberg writer initialization, these take precedence | -| iceberg.table.\.commit-branch | Table-specific branch for commits, use `iceberg.tables.default-commit-branch` if not specified | -| iceberg.table.\
.id-columns | Comma-separated list of columns that identify a row in the table (primary key) | -| iceberg.table.\
.partition-by | Comma-separated list of partition fields to use when creating the table | -| iceberg.table.\
.route-regex | The regex used to match a record's `routeField` to a table | -| iceberg.control.topic | Name of the control topic, default is `control-iceberg` | -| iceberg.control.group-id-prefix | Prefix for the control consumer group, default is `cg-control` | -| iceberg.control.commit.interval-ms | Commit interval in msec, default is 300,000 (5 min) | -| iceberg.control.commit.timeout-ms | Commit timeout interval in msec, default is 30,000 (30 sec) | -| iceberg.control.commit.threads | Number of threads to use for commits, default is (cores * 2) | -| iceberg.coordinator.transactional.prefix | Prefix for the transactional id to use for the coordinator producer, default is to use no/empty prefix | -| iceberg.catalog | Name of the catalog, default is `iceberg` | -| iceberg.catalog.* | Properties passed through to Iceberg catalog initialization | -| iceberg.hadoop-conf-dir | If specified, Hadoop config files in this directory will be loaded | -| iceberg.hadoop.* | Properties passed through to the Hadoop configuration | -| iceberg.kafka.* | Properties passed through to control topic Kafka client initialization | +| Property | Description | +|---------------------------------------------------|------------------------------------------------------------------------------------------------------------------| +| iceberg.tables | Comma-separated list of destination tables | +| iceberg.tables.dynamic-enabled | Set to `true` to route to a table specified in `routeField` instead of using `routeRegex`, default is `false` | +| iceberg.tables.route-field | For multi-table fan-out, the name of the field used to route records to tables | +| iceberg.tables.default-commit-branch | Default branch for commits, main is used if not specified | +| iceberg.tables.default-id-columns | Default comma-separated list of columns that identify a row in tables (primary key) | +| iceberg.tables.default-partition-by | Default comma-separated list of partition field names to use when creating tables | +| iceberg.tables.auto-create-enabled | Set to `true` to automatically create destination tables, default is `false` | +| iceberg.tables.evolve-schema-enabled | Set to `true` to add any missing record fields to the table schema, default is `false` | +| iceberg.tables.schema-force-optional | Set to `true` to set columns as optional during table create and evolution, default is `false` to respect schema | +| iceberg.tables.schema-case-insensitive | Set to `true` to look up table columns by case-insensitive name, default is `false` for case-sensitive | +| iceberg.tables.cdc-field | Source record field that identifies the type of operation (insert, update, or delete) | +| iceberg.tables.iceberg.tables.upsert-mode-enabled | Set to true to treat all appends as upserts, false otherwise | +| iceberg.tables.auto-create-props.* | Properties set on new tables during auto-create | +| iceberg.tables.write-props.* | Properties passed through to Iceberg writer initialization, these take precedence | +| iceberg.table.\
.commit-branch | Table-specific branch for commits, use `iceberg.tables.default-commit-branch` if not specified | +| iceberg.table.\
.id-columns | Comma-separated list of columns that identify a row in the table (primary key) | +| iceberg.table.\
.partition-by | Comma-separated list of partition fields to use when creating the table | +| iceberg.table.\
.route-regex | The regex used to match a record's `routeField` to a table | +| iceberg.control.topic | Name of the control topic, default is `control-iceberg` | +| iceberg.control.group-id-prefix | Prefix for the control consumer group, default is `cg-control` | +| iceberg.control.commit.interval-ms | Commit interval in msec, default is 300,000 (5 min) | +| iceberg.control.commit.timeout-ms | Commit timeout interval in msec, default is 30,000 (30 sec) | +| iceberg.control.commit.threads | Number of threads to use for commits, default is (cores * 2) | +| iceberg.coordinator.transactional.prefix | Prefix for the transactional id to use for the coordinator producer, default is to use no/empty prefix | +| iceberg.catalog | Name of the catalog, default is `iceberg` | +| iceberg.catalog.* | Properties passed through to Iceberg catalog initialization | +| iceberg.hadoop-conf-dir | If specified, Hadoop config files in this directory will be loaded | +| iceberg.hadoop.* | Properties passed through to the Hadoop configuration | +| iceberg.kafka.* | Properties passed through to control topic Kafka client initialization | If `iceberg.tables.dynamic-enabled` is `false` (the default) then you must specify `iceberg.tables`. If `iceberg.tables.dynamic-enabled` is `true` then you must specify `iceberg.tables.route-field` which will @@ -353,6 +356,35 @@ See above for creating two tables. } ``` +### Change data capture +This example applies inserts, updates, and deletes based on the value of a field in the record. +For example, if the `_cdc_op` field is set to `I` then the record is inserted, if `U` then it is +upserted, and if `D` then it is deleted. This requires that the table be in Iceberg v2 format. +The Iceberg identifier field(s) are used to identify a row, if that is not set for the table, +then the `iceberg.tables.default-id-columns` or `iceberg.table.\
.id-columns`configuration +can be set instead. CDC can be combined with multi-table fan-out. + +#### Create the destination table +See above for creating the table + +#### Connector config +```json +{ +"name": "events-sink", +"config": { + "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector", + "tasks.max": "2", + "topics": "events", + "iceberg.tables": "default.events", + "iceberg.tables.cdc-field": "_cdc_op", + "iceberg.catalog.type": "rest", + "iceberg.catalog.uri": "https://localhost", + "iceberg.catalog.credential": "", + "iceberg.catalog.warehouse": "" + } +} +``` + ## SMTs for the Apache Iceberg Sink Connector This project contains some SMTs that could be useful when transforming Kafka data for use by diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index a4e15932f1a3..60964b665023 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -68,6 +68,9 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String TABLES_PROP = "iceberg.tables"; private static final String TABLES_DYNAMIC_PROP = "iceberg.tables.dynamic-enabled"; private static final String TABLES_ROUTE_FIELD_PROP = "iceberg.tables.route-field"; + private static final String TABLES_CDC_FIELD_PROP = "iceberg.tables.cdc-field"; + private static final String TABLES_UPSERT_MODE_ENABLED_PROP = + "iceberg.tables.upsert-mode-enabled"; private static final String TABLES_DEFAULT_COMMIT_BRANCH = "iceberg.tables.default-commit-branch"; private static final String TABLES_DEFAULT_ID_COLUMNS = "iceberg.tables.default-id-columns"; private static final String TABLES_DEFAULT_PARTITION_BY = "iceberg.tables.default-partition-by"; @@ -129,6 +132,12 @@ private static ConfigDef newConfigDef() { null, Importance.MEDIUM, "Source record field for routing records to tables"); + configDef.define( + TABLES_CDC_FIELD_PROP, + ConfigDef.Type.STRING, + null, + Importance.MEDIUM, + "Source record field that identifies the type of operation (insert, update, or delete)"); configDef.define( TABLES_DEFAULT_COMMIT_BRANCH, ConfigDef.Type.STRING, @@ -159,6 +168,12 @@ private static ConfigDef newConfigDef() { false, Importance.MEDIUM, "Set to true to set columns as optional during table create and evolution, false to respect schema"); + configDef.define( + TABLES_UPSERT_MODE_ENABLED_PROP, + ConfigDef.Type.BOOLEAN, + false, + Importance.MEDIUM, + "Set to true to treat all appends as upserts, false otherwise"); configDef.define( TABLES_SCHEMA_CASE_INSENSITIVE_PROP, ConfigDef.Type.BOOLEAN, @@ -361,6 +376,10 @@ public TableSinkConfig tableConfig(String tableName) { }); } + public String tablesCdcField() { + return getString(TABLES_CDC_FIELD_PROP); + } + @VisibleForTesting static List stringToList(String value, String regex) { if (value == null || value.isEmpty()) { @@ -397,6 +416,10 @@ public int commitTimeoutMs() { return getInt(COMMIT_TIMEOUT_MS_PROP); } + public boolean isUpsertMode() { + return getBoolean(TABLES_UPSERT_MODE_ENABLED_PROP); + } + public int commitThreads() { return getInt(COMMIT_THREADS_PROP); } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/BaseDeltaTaskWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/BaseDeltaTaskWriter.java new file mode 100644 index 000000000000..b3721a5f982f --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/BaseDeltaTaskWriter.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.io.IOException; +import java.util.Set; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.data.InternalRecordWrapper; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.BaseTaskWriter; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; + +abstract class BaseDeltaTaskWriter extends BaseTaskWriter { + + private final Schema schema; + private final Schema deleteSchema; + private final InternalRecordWrapper wrapper; + private final InternalRecordWrapper keyWrapper; + private final RecordProjection keyProjection; + private final boolean upsertMode; + + BaseDeltaTaskWriter( + PartitionSpec spec, + FileFormat format, + FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, + FileIO io, + long targetFileSize, + Schema schema, + Set identifierFieldIds, + boolean upsertMode) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + this.schema = schema; + this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(identifierFieldIds)); + this.wrapper = new InternalRecordWrapper(schema.asStruct()); + this.keyWrapper = new InternalRecordWrapper(deleteSchema.asStruct()); + this.keyProjection = RecordProjection.create(schema, deleteSchema); + this.upsertMode = upsertMode; + } + + abstract RowDataDeltaWriter route(Record row); + + InternalRecordWrapper wrapper() { + return wrapper; + } + + @Override + public void write(Record row) throws IOException { + Operation op; + if (row instanceof RecordWrapper) { + op = ((RecordWrapper) row).op(); + } else { + if (upsertMode) { + op = Operation.UPDATE; + } else { + op = Operation.INSERT; + } + } + + RowDataDeltaWriter writer = route(row); + switch (op) { + case INSERT: + writer.write(row); + break; + case DELETE: + writer.deleteKey(keyProjection.wrap(row)); + break; + default: + writer.deleteKey(keyProjection.wrap(row)); + writer.write(row); + break; + } + } + + class RowDataDeltaWriter extends BaseEqualityDeltaWriter { + + RowDataDeltaWriter(PartitionKey partition) { + super(partition, schema, deleteSchema); + } + + @Override + protected StructLike asStructLike(Record data) { + return wrapper.wrap(data); + } + + @Override + protected StructLike asStructLikeKey(Record data) { + return keyWrapper.wrap(data); + } + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Operation.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Operation.java new file mode 100644 index 000000000000..7f428a6dc2b6 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Operation.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +public enum Operation { + INSERT, + UPDATE, + DELETE +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/PartitionedDeltaWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/PartitionedDeltaWriter.java new file mode 100644 index 000000000000..47ce0cf36049 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/PartitionedDeltaWriter.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.Tasks; + +public class PartitionedDeltaWriter extends BaseDeltaTaskWriter { + private final PartitionKey partitionKey; + + private final Map writers = Maps.newHashMap(); + + PartitionedDeltaWriter( + PartitionSpec spec, + FileFormat format, + FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, + FileIO io, + long targetFileSize, + Schema schema, + Set identifierFieldIds, + boolean upsertMode) { + super( + spec, + format, + appenderFactory, + fileFactory, + io, + targetFileSize, + schema, + identifierFieldIds, + upsertMode); + this.partitionKey = new PartitionKey(spec, schema); + } + + @Override + RowDataDeltaWriter route(Record row) { + partitionKey.partition(wrapper().wrap(row)); + + RowDataDeltaWriter writer = writers.get(partitionKey); + if (writer == null) { + // NOTICE: we need to copy a new partition key here, in case of messing up the keys in + // writers. + PartitionKey copiedKey = partitionKey.copy(); + writer = new RowDataDeltaWriter(copiedKey); + writers.put(copiedKey, writer); + } + + return writer; + } + + @Override + public void close() { + try { + Tasks.foreach(writers.values()) + .throwFailureWhenFinished() + .noRetry() + .run(RowDataDeltaWriter::close, IOException.class); + + writers.clear(); + } catch (IOException e) { + throw new UncheckedIOException("Failed to close equality delta writer", e); + } + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordProjection.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordProjection.java new file mode 100644 index 000000000000..79ce2c111a3a --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordProjection.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.MapType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StructType; + +/** + * This is modified from {@link org.apache.iceberg.util.StructProjection} to support record types. + */ +public class RecordProjection implements Record { + + /** + * Creates a projecting wrapper for {@link Record} rows. + * + *

This projection does not work with repeated types like lists and maps. + * + * @param dataSchema schema of rows wrapped by this projection + * @param projectedSchema result schema of the projected rows + * @return a wrapper to project rows + */ + public static RecordProjection create(Schema dataSchema, Schema projectedSchema) { + return new RecordProjection(dataSchema.asStruct(), projectedSchema.asStruct()); + } + + private final StructType type; + private final int[] positionMap; + private final RecordProjection[] nestedProjections; + private Record record; + + private RecordProjection(StructType structType, StructType projection) { + this(structType, projection, false); + } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") + private RecordProjection(StructType structType, StructType projection, boolean allowMissing) { + this.type = projection; + this.positionMap = new int[projection.fields().size()]; + this.nestedProjections = new RecordProjection[projection.fields().size()]; + + // set up the projection positions and any nested projections that are needed + List dataFields = structType.fields(); + for (int pos = 0; pos < positionMap.length; pos += 1) { + NestedField projectedField = projection.fields().get(pos); + + boolean found = false; + for (int i = 0; !found && i < dataFields.size(); i += 1) { + NestedField dataField = dataFields.get(i); + if (projectedField.fieldId() == dataField.fieldId()) { + found = true; + positionMap[pos] = i; + switch (projectedField.type().typeId()) { + case STRUCT: + nestedProjections[pos] = + new RecordProjection( + dataField.type().asStructType(), projectedField.type().asStructType()); + break; + case MAP: + MapType projectedMap = projectedField.type().asMapType(); + MapType originalMap = dataField.type().asMapType(); + + boolean keyProjectable = + !projectedMap.keyType().isNestedType() + || projectedMap.keyType().equals(originalMap.keyType()); + boolean valueProjectable = + !projectedMap.valueType().isNestedType() + || projectedMap.valueType().equals(originalMap.valueType()); + Preconditions.checkArgument( + keyProjectable && valueProjectable, + "Cannot project a partial map key or value struct. Trying to project %s out of %s", + projectedField, + dataField); + + nestedProjections[pos] = null; + break; + case LIST: + ListType projectedList = projectedField.type().asListType(); + ListType originalList = dataField.type().asListType(); + + boolean elementProjectable = + !projectedList.elementType().isNestedType() + || projectedList.elementType().equals(originalList.elementType()); + Preconditions.checkArgument( + elementProjectable, + "Cannot project a partial list element struct. Trying to project %s out of %s", + projectedField, + dataField); + + nestedProjections[pos] = null; + break; + default: + nestedProjections[pos] = null; + } + } + } + + if (!found && projectedField.isOptional() && allowMissing) { + positionMap[pos] = -1; + nestedProjections[pos] = null; + } else if (!found) { + throw new IllegalArgumentException( + String.format("Cannot find field %s in %s", projectedField, structType)); + } + } + } + + public RecordProjection wrap(Record newRecord) { + this.record = newRecord; + return this; + } + + @Override + public int size() { + return type.fields().size(); + } + + @Override + public T get(int pos, Class javaClass) { + // struct can be null if wrap is not called first before the get call + // or if a null struct is wrapped. + if (record == null) { + return null; + } + + int recordPos = positionMap[pos]; + if (nestedProjections[pos] != null) { + Record nestedStruct = record.get(recordPos, Record.class); + if (nestedStruct == null) { + return null; + } + + return javaClass.cast(nestedProjections[pos].wrap(nestedStruct)); + } + + if (recordPos != -1) { + return record.get(recordPos, javaClass); + } else { + return null; + } + } + + @Override + public void set(int pos, T value) { + throw new UnsupportedOperationException(); + } + + @Override + public StructType struct() { + return type; + } + + @Override + public Object getField(String name) { + throw new UnsupportedOperationException(); + } + + @Override + public void setField(String name, Object value) { + throw new UnsupportedOperationException(); + } + + @Override + public Object get(int pos) { + return get(pos, Object.class); + } + + @Override + public Record copy() { + throw new UnsupportedOperationException(); + } + + @Override + public Record copy(Map overwriteValues) { + throw new UnsupportedOperationException(); + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java index 5ac930739738..1cd4d08fa999 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java @@ -154,19 +154,47 @@ public static TaskWriter createTableWriter( TaskWriter writer; if (table.spec().isUnpartitioned()) { - writer = - new UnpartitionedWriter<>( - table.spec(), format, appenderFactory, fileFactory, table.io(), targetFileSize); + if (config.tablesCdcField() == null && !config.isUpsertMode()) { + writer = + new UnpartitionedWriter<>( + table.spec(), format, appenderFactory, fileFactory, table.io(), targetFileSize); + } else { + writer = + new UnpartitionedDeltaWriter( + table.spec(), + format, + appenderFactory, + fileFactory, + table.io(), + targetFileSize, + table.schema(), + identifierFieldIds, + config.isUpsertMode()); + } } else { - writer = - new PartitionedAppendWriter( - table.spec(), - format, - appenderFactory, - fileFactory, - table.io(), - targetFileSize, - table.schema()); + if (config.tablesCdcField() == null && !config.isUpsertMode()) { + writer = + new PartitionedAppendWriter( + table.spec(), + format, + appenderFactory, + fileFactory, + table.io(), + targetFileSize, + table.schema()); + } else { + writer = + new PartitionedDeltaWriter( + table.spec(), + format, + appenderFactory, + fileFactory, + table.io(), + targetFileSize, + table.schema(), + identifierFieldIds, + config.isUpsertMode()); + } } return writer; } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWrapper.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWrapper.java new file mode 100644 index 000000000000..915608562034 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWrapper.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.util.Map; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.types.Types.StructType; + +public class RecordWrapper implements Record { + + private final Record delegate; + private final Operation op; + + public RecordWrapper(Record delegate, Operation op) { + this.delegate = delegate; + this.op = op; + } + + public Operation op() { + return op; + } + + @Override + public StructType struct() { + return delegate.struct(); + } + + @Override + public Object getField(String name) { + return delegate.getField(name); + } + + @Override + public void setField(String name, Object value) { + delegate.setField(name, value); + } + + @Override + public Object get(int pos) { + return delegate.get(pos); + } + + @Override + public Record copy() { + return new RecordWrapper(delegate.copy(), op); + } + + @Override + public Record copy(Map overwriteValues) { + return new RecordWrapper(delegate.copy(overwriteValues), op); + } + + @Override + public int size() { + return delegate.size(); + } + + @Override + public T get(int pos, Class javaClass) { + return delegate.get(pos, javaClass); + } + + @Override + public void set(int pos, T value) { + delegate.set(pos, value); + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/UnpartitionedDeltaWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/UnpartitionedDeltaWriter.java new file mode 100644 index 000000000000..46b0a45d532d --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/UnpartitionedDeltaWriter.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.io.IOException; +import java.util.Set; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; + +public class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter { + private final RowDataDeltaWriter writer; + + UnpartitionedDeltaWriter( + PartitionSpec spec, + FileFormat format, + FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, + FileIO io, + long targetFileSize, + Schema schema, + Set identifierFieldIds, + boolean upsertMode) { + super( + spec, + format, + appenderFactory, + fileFactory, + io, + targetFileSize, + schema, + identifierFieldIds, + upsertMode); + this.writer = new RowDataDeltaWriter(null); + } + + @Override + RowDataDeltaWriter route(Record row) { + return writer; + } + + @Override + public void close() throws IOException { + writer.close(); + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/WriterResult.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/WriterResult.java new file mode 100644 index 000000000000..3828b3ad6c86 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/WriterResult.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.types.Types.StructType; +import org.apache.kafka.common.TopicPartition; + +public class WriterResult { + + private final TableIdentifier tableIdentifier; + private final List dataFiles; + private final List deleteFiles; + private final StructType partitionStruct; + private final Map offsets; + + public WriterResult( + TableIdentifier tableIdentifier, + List dataFiles, + List deleteFiles, + StructType partitionStruct, + Map offsets) { + this.tableIdentifier = tableIdentifier; + this.dataFiles = dataFiles; + this.deleteFiles = deleteFiles; + this.partitionStruct = partitionStruct; + this.offsets = offsets; + } + + public TableIdentifier getTableIdentifier() { + return tableIdentifier; + } + + public List getDataFiles() { + return dataFiles; + } + + public List getDeleteFiles() { + return deleteFiles; + } + + public StructType getPartitionStruct() { + return partitionStruct; + } + + public Map getOffsets() { + return offsets; + } +} diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/BaseWriterTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/BaseWriterTest.java index ac44952a5c15..9696ad528e50 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/BaseWriterTest.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/BaseWriterTest.java @@ -69,11 +69,12 @@ public void before() { .thenReturn(LocationProviders.locationsFor("file", ImmutableMap.of())); when(table.encryption()).thenReturn(PlaintextEncryptionManager.instance()); when(table.properties()).thenReturn(ImmutableMap.of()); + when(table.name()).thenReturn("name"); } protected WriteResult writeTest( List rows, IcebergSinkConfig config, Class expectedWriterClass) { - try (TaskWriter writer = RecordUtils.createTableWriter(table, "name", config)) { + try (TaskWriter writer = RecordUtils.createTableWriter(table, table.name(), config)) { assertThat(writer.getClass()).isEqualTo(expectedWriterClass); rows.forEach( diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/PartitionedDeltaWriterTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/PartitionedDeltaWriterTest.java new file mode 100644 index 000000000000..4d44147267b6 --- /dev/null +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/PartitionedDeltaWriterTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.regex.Pattern; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.connect.TableSinkConfig; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.junit.jupiter.api.Test; + +public class PartitionedDeltaWriterTest extends BaseWriterTest { + + @Test + public void testPartitionedDeltaWriter() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.isUpsertMode()).thenReturn(true); + when(config.tableConfig(table.name())) + .thenReturn(new TableSinkConfig(Pattern.compile(""), List.of(), List.of(), "")); + + when(table.spec()).thenReturn(SPEC); + + Record row1 = GenericRecord.create(SCHEMA); + row1.setField("id", 123L); + row1.setField("data", "hello world!"); + row1.setField("id2", 123L); + + Record row2 = GenericRecord.create(SCHEMA); + row2.setField("id", 234L); + row2.setField("data", "foobar"); + row2.setField("id2", 234L); + + WriteResult result = + writeTest(ImmutableList.of(row1, row2), config, PartitionedDeltaWriter.class); + + // in upsert mode, each write is a delete + append, so we'll have 1 data file + // and 1 delete file for each partition (2 total) + assertThat(result.dataFiles()).hasSize(2); + assertThat(result.deleteFiles()).hasSize(2); + } +} diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/UnpartitionedDeltaWriterTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/UnpartitionedDeltaWriterTest.java new file mode 100644 index 000000000000..b7ec1e02f94d --- /dev/null +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/UnpartitionedDeltaWriterTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.regex.Pattern; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.connect.TableSinkConfig; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.junit.jupiter.api.Test; + +public class UnpartitionedDeltaWriterTest extends BaseWriterTest { + + @Test + public void testUnpartitionedDeltaWriter() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.isUpsertMode()).thenReturn(true); + when(config.tableConfig(table.name())) + .thenReturn(new TableSinkConfig(Pattern.compile(""), List.of(), List.of(), "")); + + Record row = GenericRecord.create(SCHEMA); + row.setField("id", 123L); + row.setField("data", "hello world!"); + row.setField("id2", 123L); + + WriteResult result = writeTest(ImmutableList.of(row), config, UnpartitionedDeltaWriter.class); + + // in upsert mode, each write is a delete + append, so we'll have 1 data file + // and 1 delete file + assertThat(result.dataFiles()).hasSize(1); + assertThat(result.deleteFiles()).hasSize(1); + } +}