Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 61 additions & 29 deletions docs/docs/kafka-connect.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ The Apache Iceberg Sink Connector for Kafka Connect is a sink connector for writ
* Commit coordination for centralized Iceberg commits
* Exactly-once delivery semantics
* Multi-table fan-out
* Change data capture
* Automatic table creation and schema evolution
* Field name mapping via Iceberg’s column mapping functionality

Expand All @@ -58,35 +59,37 @@ for exactly-once semantics. This requires Kafka 2.5 or later.

## Configuration

| Property | Description |
|--------------------------------------------|------------------------------------------------------------------------------------------------------------------|
| iceberg.tables | Comma-separated list of destination tables |
| iceberg.tables.dynamic-enabled | Set to `true` to route to a table specified in `routeField` instead of using `routeRegex`, default is `false` |
| iceberg.tables.route-field | For multi-table fan-out, the name of the field used to route records to tables |
| iceberg.tables.default-commit-branch | Default branch for commits, main is used if not specified |
| iceberg.tables.default-id-columns | Default comma-separated list of columns that identify a row in tables (primary key) |
| iceberg.tables.default-partition-by | Default comma-separated list of partition field names to use when creating tables |
| iceberg.tables.auto-create-enabled | Set to `true` to automatically create destination tables, default is `false` |
| iceberg.tables.evolve-schema-enabled | Set to `true` to add any missing record fields to the table schema, default is `false` |
| iceberg.tables.schema-force-optional | Set to `true` to set columns as optional during table create and evolution, default is `false` to respect schema |
| iceberg.tables.schema-case-insensitive | Set to `true` to look up table columns by case-insensitive name, default is `false` for case-sensitive |
| iceberg.tables.auto-create-props.* | Properties set on new tables during auto-create |
| iceberg.tables.write-props.* | Properties passed through to Iceberg writer initialization, these take precedence |
| iceberg.table.\<table name\>.commit-branch | Table-specific branch for commits, use `iceberg.tables.default-commit-branch` if not specified |
| iceberg.table.\<table name\>.id-columns | Comma-separated list of columns that identify a row in the table (primary key) |
| iceberg.table.\<table name\>.partition-by | Comma-separated list of partition fields to use when creating the table |
| iceberg.table.\<table name\>.route-regex | The regex used to match a record's `routeField` to a table |
| iceberg.control.topic | Name of the control topic, default is `control-iceberg` |
| iceberg.control.group-id-prefix | Prefix for the control consumer group, default is `cg-control` |
| iceberg.control.commit.interval-ms | Commit interval in msec, default is 300,000 (5 min) |
| iceberg.control.commit.timeout-ms | Commit timeout interval in msec, default is 30,000 (30 sec) |
| iceberg.control.commit.threads | Number of threads to use for commits, default is (cores * 2) |
| iceberg.coordinator.transactional.prefix | Prefix for the transactional id to use for the coordinator producer, default is to use no/empty prefix |
| iceberg.catalog | Name of the catalog, default is `iceberg` |
| iceberg.catalog.* | Properties passed through to Iceberg catalog initialization |
| iceberg.hadoop-conf-dir | If specified, Hadoop config files in this directory will be loaded |
| iceberg.hadoop.* | Properties passed through to the Hadoop configuration |
| iceberg.kafka.* | Properties passed through to control topic Kafka client initialization |
| Property | Description |
|---------------------------------------------------|------------------------------------------------------------------------------------------------------------------|
| iceberg.tables | Comma-separated list of destination tables |
| iceberg.tables.dynamic-enabled | Set to `true` to route to a table specified in `routeField` instead of using `routeRegex`, default is `false` |
| iceberg.tables.route-field | For multi-table fan-out, the name of the field used to route records to tables |
| iceberg.tables.default-commit-branch | Default branch for commits, main is used if not specified |
| iceberg.tables.default-id-columns | Default comma-separated list of columns that identify a row in tables (primary key) |
| iceberg.tables.default-partition-by | Default comma-separated list of partition field names to use when creating tables |
| iceberg.tables.auto-create-enabled | Set to `true` to automatically create destination tables, default is `false` |
| iceberg.tables.evolve-schema-enabled | Set to `true` to add any missing record fields to the table schema, default is `false` |
| iceberg.tables.schema-force-optional | Set to `true` to set columns as optional during table create and evolution, default is `false` to respect schema |
| iceberg.tables.schema-case-insensitive | Set to `true` to look up table columns by case-insensitive name, default is `false` for case-sensitive |
| iceberg.tables.cdc-field | Source record field that identifies the type of operation (insert, update, or delete) |
| iceberg.tables.iceberg.tables.upsert-mode-enabled | Set to true to treat all appends as upserts, false otherwise |
| iceberg.tables.auto-create-props.* | Properties set on new tables during auto-create |
| iceberg.tables.write-props.* | Properties passed through to Iceberg writer initialization, these take precedence |
| iceberg.table.\<table name\>.commit-branch | Table-specific branch for commits, use `iceberg.tables.default-commit-branch` if not specified |
| iceberg.table.\<table name\>.id-columns | Comma-separated list of columns that identify a row in the table (primary key) |
| iceberg.table.\<table name\>.partition-by | Comma-separated list of partition fields to use when creating the table |
| iceberg.table.\<table name\>.route-regex | The regex used to match a record's `routeField` to a table |
| iceberg.control.topic | Name of the control topic, default is `control-iceberg` |
| iceberg.control.group-id-prefix | Prefix for the control consumer group, default is `cg-control` |
| iceberg.control.commit.interval-ms | Commit interval in msec, default is 300,000 (5 min) |
| iceberg.control.commit.timeout-ms | Commit timeout interval in msec, default is 30,000 (30 sec) |
| iceberg.control.commit.threads | Number of threads to use for commits, default is (cores * 2) |
| iceberg.coordinator.transactional.prefix | Prefix for the transactional id to use for the coordinator producer, default is to use no/empty prefix |
| iceberg.catalog | Name of the catalog, default is `iceberg` |
| iceberg.catalog.* | Properties passed through to Iceberg catalog initialization |
| iceberg.hadoop-conf-dir | If specified, Hadoop config files in this directory will be loaded |
| iceberg.hadoop.* | Properties passed through to the Hadoop configuration |
| iceberg.kafka.* | Properties passed through to control topic Kafka client initialization |

If `iceberg.tables.dynamic-enabled` is `false` (the default) then you must specify `iceberg.tables`. If
`iceberg.tables.dynamic-enabled` is `true` then you must specify `iceberg.tables.route-field` which will
Expand Down Expand Up @@ -353,6 +356,35 @@ See above for creating two tables.
}
```

### Change data capture
This example applies inserts, updates, and deletes based on the value of a field in the record.
For example, if the `_cdc_op` field is set to `I` then the record is inserted, if `U` then it is
upserted, and if `D` then it is deleted. This requires that the table be in Iceberg v2 format.
The Iceberg identifier field(s) are used to identify a row, if that is not set for the table,
then the `iceberg.tables.default-id-columns` or `iceberg.table.\<table name\>.id-columns`configuration
can be set instead. CDC can be combined with multi-table fan-out.

#### Create the destination table
See above for creating the table

#### Connector config
```json
{
"name": "events-sink",
"config": {
"connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
"tasks.max": "2",
"topics": "events",
"iceberg.tables": "default.events",
"iceberg.tables.cdc-field": "_cdc_op",
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "https://localhost",
"iceberg.catalog.credential": "<credential>",
"iceberg.catalog.warehouse": "<warehouse name>"
}
}
```

## SMTs for the Apache Iceberg Sink Connector

This project contains some SMTs that could be useful when transforming Kafka data for use by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ public class IcebergSinkConfig extends AbstractConfig {
private static final String TABLES_PROP = "iceberg.tables";
private static final String TABLES_DYNAMIC_PROP = "iceberg.tables.dynamic-enabled";
private static final String TABLES_ROUTE_FIELD_PROP = "iceberg.tables.route-field";
private static final String TABLES_CDC_FIELD_PROP = "iceberg.tables.cdc-field";
private static final String TABLES_UPSERT_MODE_ENABLED_PROP =
"iceberg.tables.upsert-mode-enabled";
private static final String TABLES_DEFAULT_COMMIT_BRANCH = "iceberg.tables.default-commit-branch";
private static final String TABLES_DEFAULT_ID_COLUMNS = "iceberg.tables.default-id-columns";
private static final String TABLES_DEFAULT_PARTITION_BY = "iceberg.tables.default-partition-by";
Expand Down Expand Up @@ -129,6 +132,12 @@ private static ConfigDef newConfigDef() {
null,
Importance.MEDIUM,
"Source record field for routing records to tables");
configDef.define(
TABLES_CDC_FIELD_PROP,
ConfigDef.Type.STRING,
null,
Importance.MEDIUM,
"Source record field that identifies the type of operation (insert, update, or delete)");
configDef.define(
TABLES_DEFAULT_COMMIT_BRANCH,
ConfigDef.Type.STRING,
Expand Down Expand Up @@ -159,6 +168,12 @@ private static ConfigDef newConfigDef() {
false,
Importance.MEDIUM,
"Set to true to set columns as optional during table create and evolution, false to respect schema");
configDef.define(
TABLES_UPSERT_MODE_ENABLED_PROP,
ConfigDef.Type.BOOLEAN,
false,
Importance.MEDIUM,
"Set to true to treat all appends as upserts, false otherwise");
configDef.define(
TABLES_SCHEMA_CASE_INSENSITIVE_PROP,
ConfigDef.Type.BOOLEAN,
Expand Down Expand Up @@ -361,6 +376,10 @@ public TableSinkConfig tableConfig(String tableName) {
});
}

public String tablesCdcField() {
return getString(TABLES_CDC_FIELD_PROP);
}

@VisibleForTesting
static List<String> stringToList(String value, String regex) {
if (value == null || value.isEmpty()) {
Expand Down Expand Up @@ -397,6 +416,10 @@ public int commitTimeoutMs() {
return getInt(COMMIT_TIMEOUT_MS_PROP);
}

public boolean isUpsertMode() {
return getBoolean(TABLES_UPSERT_MODE_ENABLED_PROP);
}

public int commitThreads() {
return getInt(COMMIT_THREADS_PROP);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.connect.data;

import java.io.IOException;
import java.util.Set;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;

abstract class BaseDeltaTaskWriter extends BaseTaskWriter<Record> {

private final Schema schema;
private final Schema deleteSchema;
private final InternalRecordWrapper wrapper;
private final InternalRecordWrapper keyWrapper;
private final RecordProjection keyProjection;
private final boolean upsertMode;

BaseDeltaTaskWriter(
PartitionSpec spec,
FileFormat format,
FileAppenderFactory<Record> appenderFactory,
OutputFileFactory fileFactory,
FileIO io,
long targetFileSize,
Schema schema,
Set<Integer> identifierFieldIds,
boolean upsertMode) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
this.schema = schema;
this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(identifierFieldIds));
this.wrapper = new InternalRecordWrapper(schema.asStruct());
this.keyWrapper = new InternalRecordWrapper(deleteSchema.asStruct());
this.keyProjection = RecordProjection.create(schema, deleteSchema);
this.upsertMode = upsertMode;
}

abstract RowDataDeltaWriter route(Record row);

InternalRecordWrapper wrapper() {
return wrapper;
}

@Override
public void write(Record row) throws IOException {
Operation op;
if (row instanceof RecordWrapper) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If CDC enabled and upsertMode enabled the CDC insert will not be changed to an UPDATE. Which I think might be wrong.
I raised a similar PR in the old tabular code last week: databricks/iceberg-kafka-connect#332

op = ((RecordWrapper) row).op();
} else {
if (upsertMode) {
op = Operation.UPDATE;
} else {
op = Operation.INSERT;
}
}

RowDataDeltaWriter writer = route(row);
switch (op) {
case INSERT:
writer.write(row);
break;
case DELETE:
writer.deleteKey(keyProjection.wrap(row));
break;
default:
writer.deleteKey(keyProjection.wrap(row));
writer.write(row);
break;
}
}

class RowDataDeltaWriter extends BaseEqualityDeltaWriter {

RowDataDeltaWriter(PartitionKey partition) {
super(partition, schema, deleteSchema);
}

@Override
protected StructLike asStructLike(Record data) {
return wrapper.wrap(data);
}

@Override
protected StructLike asStructLikeKey(Record data) {
return keyWrapper.wrap(data);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.connect.data;

public enum Operation {
INSERT,
UPDATE,
DELETE
}
Loading