Skip to content
93 changes: 88 additions & 5 deletions docs/docs/kafka-connect.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ The Apache Iceberg Sink Connector for Kafka Connect is a sink connector for writ
* Commit coordination for centralized Iceberg commits
* Exactly-once delivery semantics
* Multi-table fan-out
* Change data capture
* Automatic table creation and schema evolution
* Field name mapping via Iceberg’s column mapping functionality

Expand Down Expand Up @@ -70,17 +71,19 @@ for exactly-once semantics. This requires Kafka 2.5 or later.
| iceberg.tables.evolve-schema-enabled | Set to `true` to add any missing record fields to the table schema, default is `false` |
| iceberg.tables.schema-force-optional | Set to `true` to set columns as optional during table create and evolution, default is `false` to respect schema |
| iceberg.tables.schema-case-insensitive | Set to `true` to look up table columns by case-insensitive name, default is `false` for case-sensitive |
| iceberg.tables.cdc-field | Source record field that identifies the type of operation (insert, update, or delete) |
| iceberg.tables.upsert-mode-enabled | Set to true to treat all appends as upserts, false otherwise. |
| iceberg.tables.auto-create-props.* | Properties set on new tables during auto-create |
| iceberg.tables.write-props.* | Properties passed through to Iceberg writer initialization, these take precedence |
| iceberg.table.<_table-name_\>.commit-branch | Table-specific branch for commits, use `iceberg.tables.default-commit-branch` if not specified |
| iceberg.table.<_table-name_\>.id-columns | Comma-separated list of columns that identify a row in the table (primary key) |
| iceberg.table.<_table-name_\>.partition-by | Comma-separated list of partition fields to use when creating the table |
| iceberg.table.<_table-name_\>.route-regex | The regex used to match a record's `routeField` to a table |
| iceberg.table.<_table-name_\>.commit-branch | Table-specific branch for commits, use `iceberg.tables.default-commit-branch` if not specified |
| iceberg.table.<_table-name_\>.id-columns | Comma-separated list of columns that identify a row in the table (primary key) |
| iceberg.table.<_table-name_\>.partition-by | Comma-separated list of partition fields to use when creating the table |
| iceberg.table.<_table-name_\>.route-regex | The regex used to match a record's `routeField` to a table |
| iceberg.control.topic | Name of the control topic, default is `control-iceberg` |
| iceberg.control.group-id-prefix | Prefix for the control consumer group, default is `cg-control` |
| iceberg.control.commit.interval-ms | Commit interval in msec, default is 300,000 (5 min) |
| iceberg.control.commit.timeout-ms | Commit timeout interval in msec, default is 30,000 (30 sec) |
| iceberg.control.commit.threads | Number of threads to use for commits, default is (`cores * 2`) |
| iceberg.control.commit.threads | Number of threads to use for commits, default is (`cores * 2`) |
| iceberg.coordinator.transactional.prefix | Prefix for the transactional id to use for the coordinator producer, default is to use no/empty prefix |
| iceberg.catalog | Name of the catalog, default is `iceberg` |
| iceberg.catalog.* | Properties passed through to Iceberg catalog initialization |
Expand Down Expand Up @@ -364,6 +367,86 @@ See above for creating two tables.
}
```

### Change data capture
This example applies inserts, updates, and deletes based on the value of a field in the record.
For example, if the `cdc-field` is set to `I` or `R` then the record is inserted, if `U` then it is
upserted, and if `D` then it is deleted. This requires that the table `format-version` to be greater than 2.
The Iceberg identifier field(s) are used to identify a row, if that is not set for the table,
then the `iceberg.tables.default-id-columns` or `iceberg.table.\<table name\>.id-columns` configuration
can be set instead. CDC can be combined with multi-table fan-out.

CDC mode writes equality deletes to handle updates and deletes. During reads, the query engine must
apply equality deletes by scanning data files that may contain matching rows based on the identifier columns.

#### Production recommendations

**Compaction is required**: For production CDC workloads, periodic compaction is essential to maintain
query performance. Compaction merges equality deletes with their corresponding data files, reducing
the number of delete files that need to be processed during reads.

**Table partitioning**: Proper partitioning significantly reduces the scan overhead of equality deletes.
When a table is partitioned, equality deletes only need to be applied to data files within the same
partition. Choose partition columns that align with your CDC data patterns (e.g., date columns for
time-series data).

**Identifier column selection**: The identifier columns define which rows are matched for updates and
deletes. These should be:

* Unique or form a composite unique key for the data
* Included in the table's partition spec when possible to limit delete scope

#### Compaction

Run compaction periodically to merge equality deletes with data files. This can be done using Spark:

```sql
-- Run compaction on the table
CALL catalog_name.system.rewrite_data_files('db.events')

-- Compaction with specific options
CALL catalog_name.system.rewrite_data_files(
table => 'db.events',
options => map('delete-file-threshold', '10')
)

-- Remove orphan delete files after compaction
CALL catalog_name.system.rewrite_deletes('db.events')
```

Or using the Iceberg Actions API:

```java
SparkActions.get(spark)
.rewriteDataFiles(table)
.option("delete-file-threshold", "10")
.execute();
```

For automated compaction, consider scheduling these operations via a workflow orchestrator or
using managed Iceberg services that provide automatic compaction.


#### Create the destination table
See above for creating the table

#### Connector config
```json
{
"name": "events-sink",
"config": {
"connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
"tasks.max": "2",
"topics": "events",
"iceberg.tables": "default.events",
"iceberg.tables.cdc-field": "_cdc_op",
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "https://localhost",
"iceberg.catalog.credential": "<credential>",
"iceberg.catalog.warehouse": "<warehouse name>"
}
}
```

## SMTs for the Apache Iceberg Sink Connector

This project contains some SMTs that could be useful when transforming Kafka data for use by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ public class IcebergSinkConfig extends AbstractConfig {
private static final String TABLES_PROP = "iceberg.tables";
private static final String TABLES_DYNAMIC_PROP = "iceberg.tables.dynamic-enabled";
private static final String TABLES_ROUTE_FIELD_PROP = "iceberg.tables.route-field";
private static final String TABLES_CDC_FIELD_PROP = "iceberg.tables.cdc-field";
private static final String TABLES_UPSERT_MODE_ENABLED_PROP =
"iceberg.tables.upsert-mode-enabled";
private static final String TABLES_DEFAULT_COMMIT_BRANCH = "iceberg.tables.default-commit-branch";
private static final String TABLES_DEFAULT_ID_COLUMNS = "iceberg.tables.default-id-columns";
private static final String TABLES_DEFAULT_PARTITION_BY = "iceberg.tables.default-partition-by";
Expand Down Expand Up @@ -133,6 +136,18 @@ private static ConfigDef newConfigDef() {
null,
Importance.MEDIUM,
"Source record field for routing records to tables");
configDef.define(
TABLES_CDC_FIELD_PROP,
ConfigDef.Type.STRING,
null,
Importance.MEDIUM,
"Source record field that identifies the type of operation (insert, update, or delete)");
configDef.define(
TABLES_UPSERT_MODE_ENABLED_PROP,
ConfigDef.Type.BOOLEAN,
false,
Importance.MEDIUM,
"Set to true to treat all appends as upserts, false otherwise");
configDef.define(
TABLES_DEFAULT_COMMIT_BRANCH,
ConfigDef.Type.STRING,
Expand Down Expand Up @@ -415,6 +430,14 @@ public int commitTimeoutMs() {
return getInt(COMMIT_TIMEOUT_MS_PROP);
}

public String tablesCdcField() {
return getString(TABLES_CDC_FIELD_PROP);
}

public boolean isUpsertMode() {
return getBoolean(TABLES_UPSERT_MODE_ENABLED_PROP);
}

public int commitThreads() {
return getInt(COMMIT_THREADS_PROP);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.connect.data;

import java.io.IOException;
import java.util.Set;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileWriterFactory;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;

abstract class BaseDeltaWriter extends BaseTaskWriter<Record> {

private final Schema schema;
private final Schema deleteSchema;
private final InternalRecordWrapper wrapper;
private final InternalRecordWrapper keyWrapper;
private final RecordProjection keyProjection;
private final boolean upsert;
private final String[] cdcField;

BaseDeltaWriter(
PartitionSpec spec,
FileFormat format,
FileWriterFactory<Record> writerFactory,
OutputFileFactory fileFactory,
FileIO io,
long targetFileSize,
Schema schema,
Set<Integer> identifierFieldIds,
boolean upsert,
boolean useDv,
String cdcField) {
super(spec, format, writerFactory, fileFactory, io, targetFileSize, useDv);

this.schema = schema;
this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(identifierFieldIds));
this.wrapper = new InternalRecordWrapper(schema.asStruct());
this.keyWrapper = new InternalRecordWrapper(deleteSchema.asStruct());
this.keyProjection = RecordProjection.create(schema, deleteSchema);
this.upsert = upsert;

if (cdcField == null || cdcField.isEmpty()) {
throw new IllegalArgumentException("CDC field must be provided for delta writer");
}
this.cdcField = cdcField.split("\\.");
}

abstract RowDataDeltaWriter route(Record row);

@Override
public void write(Record row) throws IOException {

Operation op = Operation.fromString(getCdcOpFromRow(row));
RowDataDeltaWriter writer = route(row);

switch (op) {
case INSERT:
writer.write(row);
break;
case UPDATE:
if (upsert) {
writer.deleteKey(keyProjection.wrap(row));
} else {
writer.delete(row);
}
writer.write(row);
break;
case DELETE:
if (upsert) {
writer.deleteKey(keyProjection.wrap(row));
} else {
writer.delete(row);
}
break;

default:
throw new UnsupportedOperationException("Unknown row kind: " + op);
}
}

private String getCdcOpFromRow(Record row) {
Record currentFieldLookup = row;
for (String field : cdcField) {
Object value = currentFieldLookup.getField(field);
if (value == null) {
throw new IllegalArgumentException("CDC field " + String.join(".", cdcField) + " is null");
}
if (value instanceof String) {
return (String) value;
} else {
currentFieldLookup = (Record) value;
}
}
throw new IllegalArgumentException(
"CDC field " + String.join(".", cdcField) + " is not a string");
}

public InternalRecordWrapper getWrapper() {
return wrapper;
}

protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {

RowDataDeltaWriter(PartitionKey partition) {
super(partition, schema, deleteSchema, DeleteGranularity.FILE, dvFileWriter());
}

@Override
protected StructLike asStructLike(Record data) {
return wrapper.wrap(data);
}

@Override
protected StructLike asStructLikeKey(Record data) {
return keyWrapper.wrap(data);
}
}
}
Loading
Loading