ids) {
+ StructType structType = schema.asStruct();
+ return new RecordProjection(structType, TypeUtil.project(structType, ids));
+ }
+
+ /**
+ * Creates a projecting wrapper for {@link Record} rows.
+ *
+ * This projection does not work with repeated types like lists and maps.
+ *
+ * @param dataSchema schema of rows wrapped by this projection
+ * @param projectedSchema result schema of the projected rows
+ * @return a wrapper to project rows
+ */
+ public static RecordProjection create(Schema dataSchema, Schema projectedSchema) {
+ return new RecordProjection(dataSchema.asStruct(), projectedSchema.asStruct());
+ }
+
+ /**
+ * Creates a projecting wrapper for {@link Record} rows.
+ *
+ *
This projection does not work with repeated types like lists and maps.
+ *
+ * @param structType type of rows wrapped by this projection
+ * @param projectedStructType result type of the projected rows
+ * @return a wrapper to project rows
+ */
+ public static RecordProjection create(StructType structType, StructType projectedStructType) {
+ return new RecordProjection(structType, projectedStructType);
+ }
+
+ /**
+ * Creates a projecting wrapper for {@link Record} rows.
+ *
+ *
This projection allows missing fields and does not work with repeated types like lists and
+ * maps.
+ *
+ * @param structType type of rows wrapped by this projection
+ * @param projectedStructType result type of the projected rows
+ * @return a wrapper to project rows
+ */
+ public static RecordProjection createAllowMissing(
+ StructType structType, StructType projectedStructType) {
+ return new RecordProjection(structType, projectedStructType, true);
+ }
+
+ private final StructType type;
+ private final int[] positionMap;
+ private final RecordProjection[] nestedProjections;
+ private Record record;
+
+ private RecordProjection(
+ StructType type, int[] positionMap, RecordProjection[] nestedProjections) {
+ this.type = type;
+ this.positionMap = positionMap;
+ this.nestedProjections = nestedProjections;
+ }
+
+ private RecordProjection(StructType structType, StructType projection) {
+ this(structType, projection, false);
+ }
+
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ private RecordProjection(StructType structType, StructType projection, boolean allowMissing) {
+ this.type = projection;
+ this.positionMap = new int[projection.fields().size()];
+ this.nestedProjections = new RecordProjection[projection.fields().size()];
+
+ // set up the projection positions and any nested projections that are needed
+ List dataFields = structType.fields();
+ for (int pos = 0; pos < positionMap.length; pos += 1) {
+ NestedField projectedField = projection.fields().get(pos);
+
+ boolean found = false;
+ for (int i = 0; !found && i < dataFields.size(); i += 1) {
+ NestedField dataField = dataFields.get(i);
+ if (projectedField.fieldId() == dataField.fieldId()) {
+ found = true;
+ positionMap[pos] = i;
+ switch (projectedField.type().typeId()) {
+ case STRUCT:
+ nestedProjections[pos] =
+ new RecordProjection(
+ dataField.type().asStructType(), projectedField.type().asStructType());
+ break;
+ case MAP:
+ MapType projectedMap = projectedField.type().asMapType();
+ MapType originalMap = dataField.type().asMapType();
+
+ boolean keyProjectable =
+ !projectedMap.keyType().isNestedType()
+ || projectedMap.keyType().equals(originalMap.keyType());
+ boolean valueProjectable =
+ !projectedMap.valueType().isNestedType()
+ || projectedMap.valueType().equals(originalMap.valueType());
+ Preconditions.checkArgument(
+ keyProjectable && valueProjectable,
+ "Cannot project a partial map key or value struct. Trying to project %s out of %s",
+ projectedField,
+ dataField);
+
+ nestedProjections[pos] = null;
+ break;
+ case LIST:
+ ListType projectedList = projectedField.type().asListType();
+ ListType originalList = dataField.type().asListType();
+
+ boolean elementProjectable =
+ !projectedList.elementType().isNestedType()
+ || projectedList.elementType().equals(originalList.elementType());
+ Preconditions.checkArgument(
+ elementProjectable,
+ "Cannot project a partial list element struct. Trying to project %s out of %s",
+ projectedField,
+ dataField);
+
+ nestedProjections[pos] = null;
+ break;
+ default:
+ nestedProjections[pos] = null;
+ }
+ }
+ }
+
+ if (!found && projectedField.isOptional() && allowMissing) {
+ positionMap[pos] = -1;
+ nestedProjections[pos] = null;
+ } else if (!found) {
+ throw new IllegalArgumentException(
+ String.format("Cannot find field %s in %s", projectedField, structType));
+ }
+ }
+ }
+
+ public int projectedFields() {
+ return (int) Ints.asList(positionMap).stream().filter(val -> val != -1).count();
+ }
+
+ public RecordProjection wrap(Record newRecord) {
+ this.record = newRecord;
+ return this;
+ }
+
+ public static RecordProjection copyFor(Record newRecord) {
+ return new RecordProjection(
+ newRecord.struct(), new int[newRecord.size()], new RecordProjection[newRecord.size()])
+ .wrap(newRecord);
+ }
+
+ @Override
+ public int size() {
+ return type.fields().size();
+ }
+
+ @Override
+ public T get(int pos, Class javaClass) {
+ // struct can be null if wrap is not called first before the get call
+ // or if a null struct is wrapped.
+ if (record == null) {
+ return null;
+ }
+
+ int recordPos = positionMap[pos];
+ if (nestedProjections[pos] != null) {
+ Record nestedStruct = record.get(recordPos, Record.class);
+ if (nestedStruct == null) {
+ return null;
+ }
+
+ return javaClass.cast(nestedProjections[pos].wrap(nestedStruct));
+ }
+
+ if (recordPos != -1) {
+ return record.get(recordPos, javaClass);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public void set(int pos, T value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public StructType struct() {
+ return type;
+ }
+
+ @Override
+ public Object getField(String name) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setField(String name, Object value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object get(int pos) {
+ return get(pos, Object.class);
+ }
+
+ @Override
+ public Record copy() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Record copy(Map overwriteValues) {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java
index 5ed820c8cbe1..4e1fe24fda79 100644
--- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java
+++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java
@@ -26,6 +26,7 @@
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableUtil;
import org.apache.iceberg.connect.IcebergSinkConfig;
import org.apache.iceberg.connect.events.TableReference;
import org.apache.iceberg.data.GenericFileWriterFactory;
@@ -44,9 +45,13 @@
import org.apache.iceberg.util.PropertyUtil;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class RecordUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(RecordUtils.class);
+
@SuppressWarnings("unchecked")
static Object extractFromRecordValue(Object recordValue, String fieldName) {
List fields = Splitter.on('.').splitToList(fieldName);
@@ -159,20 +164,75 @@ public static TaskWriter createTableWriter(
.build();
TaskWriter writer;
- if (table.spec().isUnpartitioned()) {
- writer =
- new UnpartitionedWriter<>(
- table.spec(), format, writerFactory, fileFactory, table.io(), targetFileSize);
+ boolean isCdcEnabled =
+ (config.tablesCdcField() != null && !config.tablesCdcField().isEmpty())
+ || config.isUpsertMode();
+ if (!isCdcEnabled) {
+ if (table.spec().isUnpartitioned()) {
+ writer =
+ new UnpartitionedWriter<>(
+ table.spec(), format, writerFactory, fileFactory, table.io(), targetFileSize);
+ } else {
+ writer =
+ new PartitionedAppendWriter(
+ table.spec(),
+ format,
+ writerFactory,
+ fileFactory,
+ table.io(),
+ targetFileSize,
+ table.schema());
+ }
} else {
- writer =
- new PartitionedAppendWriter(
- table.spec(),
- format,
- writerFactory,
- fileFactory,
- table.io(),
- targetFileSize,
- table.schema());
+
+ // DV enabled for table format version >=3
+ boolean useDv;
+ switch (TableUtil.formatVersion(table)) {
+ case 1:
+ throw new IllegalArgumentException(
+ "CDC and upsert modes are not supported for Iceberg table format version 1");
+ case 2:
+ LOG.warn(
+ "Table {} format version 2 detected. Delete Vectors are disabled. "
+ + "CDC and upsert modes work best with format version 3 or higher. "
+ + "Consider upgrading the table.",
+ tableReference.identifier());
+ useDv = false;
+ break;
+ default:
+ useDv = true;
+ break;
+ }
+
+ if (table.spec().isUnpartitioned()) {
+ writer =
+ new UnpartitionedDeltaWriter(
+ table.spec(),
+ format,
+ writerFactory,
+ fileFactory,
+ table.io(),
+ targetFileSize,
+ table.schema(),
+ identifierFieldIds,
+ config.isUpsertMode(),
+ useDv,
+ config.tablesCdcField());
+ } else {
+ writer =
+ new PartitionedDeltaWriter(
+ table.spec(),
+ format,
+ writerFactory,
+ fileFactory,
+ table.io(),
+ targetFileSize,
+ table.schema(),
+ identifierFieldIds,
+ config.isUpsertMode(),
+ useDv,
+ config.tablesCdcField());
+ }
}
return writer;
}
diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/UnpartitionedDeltaWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/UnpartitionedDeltaWriter.java
new file mode 100644
index 000000000000..7c7267a98f4f
--- /dev/null
+++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/UnpartitionedDeltaWriter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import java.io.IOException;
+import java.util.Set;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileWriterFactory;
+import org.apache.iceberg.io.OutputFileFactory;
+
+public class UnpartitionedDeltaWriter extends BaseDeltaWriter {
+ private final RowDataDeltaWriter writer;
+
+ UnpartitionedDeltaWriter(
+ PartitionSpec spec,
+ FileFormat format,
+ FileWriterFactory fileWriterFactory,
+ OutputFileFactory fileFactory,
+ FileIO io,
+ long targetFileSize,
+ Schema schema,
+ Set identifierFieldIds,
+ boolean upsert,
+ boolean useDv,
+ String cdcField) {
+ super(
+ spec,
+ format,
+ fileWriterFactory,
+ fileFactory,
+ io,
+ targetFileSize,
+ schema,
+ identifierFieldIds,
+ upsert,
+ useDv,
+ cdcField);
+ this.writer = new RowDataDeltaWriter(null);
+ }
+
+ @Override
+ RowDataDeltaWriter route(Record row) {
+ return this.writer;
+ }
+
+ @Override
+ public void close() throws IOException {
+ writer.close();
+ super.close();
+ }
+}
diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/DeltaWriterTestBase.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/DeltaWriterTestBase.java
new file mode 100644
index 000000000000..a399dc64e247
--- /dev/null
+++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/DeltaWriterTestBase.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Base test class for Delta Writer tests with CDC (Change Data Capture) support.
+ *
+ * Provides CDC-specific schemas with configurable CDC field locations and helper methods for
+ * creating CDC records and validating test results.
+ */
+public abstract class DeltaWriterTestBase extends WriterTestBase {
+
+ protected static final Schema CDC_SCHEMA =
+ new Schema(
+ ImmutableList.of(
+ Types.NestedField.required(1, "id", Types.LongType.get()),
+ Types.NestedField.required(2, "data", Types.StringType.get()),
+ Types.NestedField.required(3, "id2", Types.LongType.get()),
+ Types.NestedField.required(4, "_op", Types.StringType.get())),
+ ImmutableSet.of(1, 3));
+
+ protected static final Schema CDC_SCHEMA_NESTED =
+ new Schema(
+ ImmutableList.of(
+ Types.NestedField.required(1, "id", Types.LongType.get()),
+ Types.NestedField.required(2, "data", Types.StringType.get()),
+ Types.NestedField.required(3, "id2", Types.LongType.get()),
+ Types.NestedField.required(
+ 4,
+ "_cdc",
+ Types.StructType.of(
+ Types.NestedField.required(5, "op", Types.StringType.get())))),
+ ImmutableSet.of(1, 3));
+
+ /**
+ * Creates a CDC record with the given values using a flat CDC field location (e.g., "_op").
+ *
+ * @param id the id field value
+ * @param data the data field value
+ * @param id2 the id2 field value (second identifier)
+ * @param op the CDC operation (C/R for INSERT, U for UPDATE, D for DELETE)
+ * @return a CDC record with all fields set
+ */
+ protected Record createCDCRecord(long id, String data, long id2, String op) {
+ return createCDCRecord(id, data, id2, op, CDC_SCHEMA, "_op");
+ }
+
+ /**
+ * Creates a CDC record with the given values and CDC field location.
+ *
+ * @param id the id field value
+ * @param data the data field value
+ * @param id2 the id2 field value (second identifier)
+ * @param op the CDC operation (C/R for INSERT, U for UPDATE, D for DELETE)
+ * @param schema the schema to use for record creation
+ * @param cdcFieldPath the path to the CDC field (e.g., "_op" or "_cdc.op")
+ * @return a CDC record with all fields set
+ */
+ protected Record createCDCRecord(
+ long id, String data, long id2, String op, Schema schema, String cdcFieldPath) {
+ Record record = GenericRecord.create(schema);
+ record.setField("id", id);
+ record.setField("data", data);
+ record.setField("id2", id2);
+ setNestedField(record, cdcFieldPath, op);
+ return record;
+ }
+
+ /**
+ * Creates a CDC record with matching id and id2 values.
+ *
+ * @param id the id field value (also used for id2)
+ * @param data the data field value
+ * @param op the CDC operation (C/R for INSERT, U for UPDATE, D for DELETE)
+ * @return a CDC record with all fields set
+ */
+ protected Record createCDCRecord(long id, String data, String op) {
+ return createCDCRecord(id, data, id, op);
+ }
+
+ /**
+ * Creates a CDC record with matching id and id2 values using a custom schema and CDC field
+ * location.
+ *
+ * @param id the id field value (also used for id2)
+ * @param data the data field value
+ * @param schema the schema to use for record creation
+ * @param cdcFieldPath the path to the CDC field (e.g., "_op" or "_cdc.op")
+ * @param op the CDC operation (C/R for INSERT, U for UPDATE, D for DELETE)
+ * @return a CDC record with all fields set
+ */
+ protected Record createCDCRecord(
+ long id, String data, Schema schema, String cdcFieldPath, String op) {
+ return createCDCRecord(id, data, id, op, schema, cdcFieldPath);
+ }
+
+ /**
+ * Sets a field value, supporting nested paths like "_cdc.op".
+ *
+ * @param record the record to set the field on
+ * @param fieldPath the field path (e.g., "_op" or "_cdc.op")
+ * @param value the value to set
+ */
+ private void setNestedField(Record record, String fieldPath, String value) {
+ String[] parts = fieldPath.split("\\.");
+ if (parts.length == 1) {
+ record.setField(fieldPath, value);
+ } else {
+ Record nested = (Record) record.getField(parts[0]);
+ if (nested == null) {
+ // Create the nested record from the schema's struct type
+ Types.NestedField nestedField = record.struct().field(parts[0]);
+ if (nestedField == null) {
+ throw new IllegalStateException("Nested field " + parts[0] + " not found in schema");
+ }
+ Types.StructType structType = nestedField.type().asStructType();
+ nested = GenericRecord.create(structType);
+ record.setField(parts[0], nested);
+ }
+ nested.setField(parts[1], value);
+ }
+ }
+
+ /**
+ * Validates that the actual data matches the expected data, including the CDC field.
+ *
+ *
Note: This method validates by comparing record fields. For InMemoryFileIO-based tests, this
+ * is primarily useful for validating WriteResult contents.
+ *
+ * @param actual the actual records
+ * @param expected the expected records
+ */
+ protected void assertDataMatches(List actual, List expected) {
+ assertDataMatches(actual, expected, "_op");
+ }
+
+ /**
+ * Validates that the actual data matches the expected data, including the CDC field.
+ *
+ * Note: This method validates by comparing record fields. For InMemoryFileIO-based tests, this
+ * is primarily useful for validating WriteResult contents.
+ *
+ * @param actual the actual records
+ * @param expected the expected records
+ * @param cdcFieldPath the path to the CDC field (e.g., "_op" or "_cdc.op")
+ */
+ protected void assertDataMatches(
+ List actual, List expected, String cdcFieldPath) {
+ assertThat(actual).hasSize(expected.size());
+
+ for (int i = 0; i < expected.size(); i++) {
+ Record expectedRecord = expected.get(i);
+ Record actualRecord = actual.get(i);
+
+ assertThat(actualRecord.getField("id")).isEqualTo(expectedRecord.getField("id"));
+ assertThat(actualRecord.getField("data")).isEqualTo(expectedRecord.getField("data"));
+ assertThat(actualRecord.getField("id2")).isEqualTo(expectedRecord.getField("id2"));
+ assertThat(getNestedField(actualRecord, cdcFieldPath))
+ .isEqualTo(getNestedField(expectedRecord, cdcFieldPath));
+ }
+ }
+
+ /**
+ * Validates that the actual data matches the expected data, ignoring the _op field.
+ *
+ * Useful for validating the final data state after CDC operations have been applied, where the
+ * _op field is not relevant to the final result.
+ *
+ * @param actual the actual records
+ * @param expected the expected records (can use any _op value)
+ */
+ protected void assertDataMatchesIgnoringOp(List actual, List expected) {
+ assertThat(actual).hasSize(expected.size());
+
+ for (int i = 0; i < expected.size(); i++) {
+ Record expectedRecord = expected.get(i);
+ Record actualRecord = actual.get(i);
+
+ assertThat(actualRecord.getField("id")).isEqualTo(expectedRecord.getField("id"));
+ assertThat(actualRecord.getField("data")).isEqualTo(expectedRecord.getField("data"));
+ assertThat(actualRecord.getField("id2")).isEqualTo(expectedRecord.getField("id2"));
+ }
+ }
+
+ /**
+ * Helper method to validate that a record has the expected field values.
+ *
+ * @param record the record to validate
+ * @param expectedId the expected id value
+ * @param expectedData the expected data value
+ * @param expectedId2 the expected id2 value
+ * @param expectedOp the expected CDC operation value
+ */
+ protected void assertRecordEquals(
+ Record record, long expectedId, String expectedData, long expectedId2, String expectedOp) {
+ assertRecordEquals(record, expectedId, expectedData, expectedId2, expectedOp, "_op");
+ }
+
+ /**
+ * Helper method to validate that a record has the expected field values.
+ *
+ * @param record the record to validate
+ * @param expectedId the expected id value
+ * @param expectedData the expected data value
+ * @param expectedId2 the expected id2 value
+ * @param expectedOp the expected CDC operation value
+ * @param cdcFieldPath the path to the CDC field (e.g., "_op" or "_cdc.op")
+ */
+ protected void assertRecordEquals(
+ Record record,
+ long expectedId,
+ String expectedData,
+ long expectedId2,
+ String expectedOp,
+ String cdcFieldPath) {
+ assertThat(record.getField("id")).isEqualTo(expectedId);
+ assertThat(record.getField("data")).isEqualTo(expectedData);
+ assertThat(record.getField("id2")).isEqualTo(expectedId2);
+ assertThat(getNestedField(record, cdcFieldPath)).isEqualTo(expectedOp);
+ }
+
+ /**
+ * Gets a field value, supporting nested paths like "_cdc.op".
+ *
+ * @param record the record to get the field from
+ * @param fieldPath the field path (e.g., "_op" or "_cdc.op")
+ * @return the field value
+ */
+ private String getNestedField(Record record, String fieldPath) {
+ String[] parts = fieldPath.split("\\.");
+ if (parts.length == 1) {
+ return (String) record.getField(fieldPath);
+ } else {
+ Record nested = (Record) record.getField(parts[0]);
+ if (nested == null) {
+ throw new IllegalStateException("Nested field " + parts[0] + " not found");
+ }
+ return (String) nested.getField(parts[1]);
+ }
+ }
+
+ /**
+ * Helper method to read all data files from a WriteResult and return their contents.
+ *
+ * Note: This is a placeholder for future implementation. Reading data back from InMemoryFileIO
+ * requires setting up Iceberg readers which may be complex.
+ *
+ * @param result the WriteResult to read from
+ * @return list of records read from data files
+ * @throws IOException if reading fails
+ */
+ protected List readDataFiles(org.apache.iceberg.io.WriteResult result)
+ throws IOException {
+ // TODO: Implement if needed for data validation
+ // This would require:
+ // 1. Creating a FileReaderFactory
+ // 2. Reading each DataFile from result.dataFiles()
+ // 3. Collecting all records into a list
+ throw new UnsupportedOperationException("Not yet implemented");
+ }
+}
diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestPartitionedDeltaWriter.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestPartitionedDeltaWriter.java
new file mode 100644
index 000000000000..fed9ea47a17f
--- /dev/null
+++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestPartitionedDeltaWriter.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.connect.TableSinkConfig;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+public class TestPartitionedDeltaWriter extends DeltaWriterTestBase {
+
+ @Override
+ @BeforeEach
+ public void before() {
+ super.before();
+ // Override the schema for Partitioned CDC tests
+ when(table.schema()).thenReturn(CDC_SCHEMA);
+ when(table.spec()).thenReturn(SPEC);
+ }
+
+ @ParameterizedTest
+ @CsvSource({"parquet,2", "parquet,3", "orc,2", "orc,3"})
+ public void testPartitionedDeltaWriterDVMode(String format, int formatVersion) {
+ mockTableFormatVersion(formatVersion);
+
+ IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+ when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
+ when(config.writeProps()).thenReturn(ImmutableMap.of("write.format.default", format));
+ when(config.isUpsertMode()).thenReturn(true);
+ when(config.tablesDefaultIdColumns()).thenReturn("id,id2");
+ when(config.tablesCdcField()).thenReturn("_op");
+
+ when(table.spec()).thenReturn(SPEC);
+
+ Record row1 = createCDCRecord(123L, "partition1", "C");
+ Record row2 = createCDCRecord(234L, "partition2", "C");
+ Record row3 = createCDCRecord(345L, "partition1", "C");
+
+ WriteResult result =
+ writeTest(ImmutableList.of(row1, row2, row3), config, PartitionedDeltaWriter.class);
+
+ // With DV mode, no delete files should be created
+ assertThat(result.dataFiles()).hasSize(2); // 2 partitions
+ assertThat(result.dataFiles()).allMatch(file -> file.format() == FileFormat.fromString(format));
+ assertThat(result.deleteFiles()).hasSize(0);
+ }
+
+ @ParameterizedTest
+ @CsvSource({"parquet,2", "parquet,3", "orc,2", "orc,3"})
+ public void testCDCOperationsAcrossPartitions(String format, int formatVersion) {
+ mockTableFormatVersion(formatVersion);
+
+ IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+ when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
+ when(config.writeProps()).thenReturn(ImmutableMap.of("write.format.default", format));
+ when(config.isUpsertMode()).thenReturn(true);
+ when(config.tablesDefaultIdColumns()).thenReturn("id,id2");
+ when(config.tablesCdcField()).thenReturn("_op");
+
+ when(table.spec()).thenReturn(SPEC);
+
+ // Different operations in different partitions
+ Record insert1 = createCDCRecord(1L, "partition-a", "C");
+ Record insert2 = createCDCRecord(2L, "partition-b", "C");
+ Record update1 = createCDCRecord(1L, "partition-a", "U");
+ Record delete2 = createCDCRecord(2L, "partition-b", "D");
+
+ WriteResult result =
+ writeTest(
+ ImmutableList.of(insert1, insert2, update1, delete2),
+ config,
+ PartitionedDeltaWriter.class);
+
+ // 2 partitions with data files and delete files
+ assertThat(result.dataFiles()).hasSize(2);
+ assertThat(result.deleteFiles()).hasSize(2);
+ }
+
+ @ParameterizedTest
+ @CsvSource({"parquet,2", "parquet,3", "orc,2", "orc,3"})
+ public void testMultipleUpdatesPerPartition(String format, int formatVersion) {
+ mockTableFormatVersion(formatVersion);
+
+ IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+ when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
+ when(config.writeProps()).thenReturn(ImmutableMap.of("write.format.default", format));
+ when(config.isUpsertMode()).thenReturn(true);
+ when(config.tablesDefaultIdColumns()).thenReturn("id,id2");
+ when(config.tablesCdcField()).thenReturn("_op");
+
+ when(table.spec()).thenReturn(SPEC);
+
+ // Multiple operations in the same partition
+ Record insert1 = createCDCRecord(1L, "same-partition", "C");
+ Record insert2 = createCDCRecord(2L, "same-partition", "C");
+ Record update1 = createCDCRecord(1L, "same-partition", "U");
+ Record update2 = createCDCRecord(2L, "same-partition", "U");
+
+ WriteResult result =
+ writeTest(
+ ImmutableList.of(insert1, insert2, update1, update2),
+ config,
+ PartitionedDeltaWriter.class);
+
+ // Single partition with all operations
+ assertThat(result.dataFiles()).hasSize(1);
+ assertThat(result.deleteFiles()).hasSize(1);
+ }
+
+ @ParameterizedTest
+ @CsvSource({"parquet,2", "parquet,3", "orc,2", "orc,3"})
+ public void testPartitionedInsertOnly(String format, int formatVersion) {
+ mockTableFormatVersion(formatVersion);
+
+ IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+ when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
+ when(config.writeProps()).thenReturn(ImmutableMap.of("write.format.default", format));
+ when(config.isUpsertMode()).thenReturn(true);
+ when(config.tablesDefaultIdColumns()).thenReturn("id,id2");
+ when(config.tablesCdcField()).thenReturn("_op");
+
+ when(table.spec()).thenReturn(SPEC);
+
+ Record insert1 = createCDCRecord(1L, "p1", "C");
+ Record insert2 = createCDCRecord(2L, "p2", "C");
+ Record insert3 = createCDCRecord(3L, "p3", "C");
+ Record insert4 = createCDCRecord(4L, "p1", "R");
+
+ WriteResult result =
+ writeTest(
+ ImmutableList.of(insert1, insert2, insert3, insert4),
+ config,
+ PartitionedDeltaWriter.class);
+
+ assertThat(result.dataFiles()).hasSize(3);
+ assertThat(result.deleteFiles()).hasSize(0);
+
+ Arrays.asList(result.dataFiles())
+ .forEach(
+ file -> {
+ assertThat(file.format()).isEqualTo(FileFormat.fromString(format));
+ });
+ }
+
+ @ParameterizedTest
+ @CsvSource({"parquet,2", "parquet,3", "orc,2", "orc,3"})
+ public void testPartitionedDeleteOnly(String format, int formatVersion) {
+ mockTableFormatVersion(formatVersion);
+
+ IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+ when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
+ when(config.writeProps()).thenReturn(ImmutableMap.of("write.format.default", format));
+ when(config.isUpsertMode()).thenReturn(true);
+ when(config.tablesDefaultIdColumns()).thenReturn("id,id2");
+ when(config.tablesCdcField()).thenReturn("_op");
+
+ when(table.spec()).thenReturn(SPEC);
+
+ // Inserts followed by deletes across partitions
+ Record insert1 = createCDCRecord(1L, "pa", "C");
+ Record insert2 = createCDCRecord(2L, "pb", "C");
+ Record delete1 = createCDCRecord(1L, "pa", "D");
+ Record delete2 = createCDCRecord(2L, "pb", "D");
+
+ WriteResult result =
+ writeTest(
+ ImmutableList.of(insert1, insert2, delete1, delete2),
+ config,
+ PartitionedDeltaWriter.class);
+
+ // 2 partitions with data and delete files
+ assertThat(result.dataFiles()).hasSize(2);
+ assertThat(result.deleteFiles()).hasSize(2);
+ }
+
+ @ParameterizedTest
+ @CsvSource({"parquet,2", "parquet,3", "orc,2", "orc,3"})
+ public void testMixedOperationsSinglePartition(String format, int formatVersion) {
+ mockTableFormatVersion(formatVersion);
+
+ IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+ when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
+ when(config.writeProps()).thenReturn(ImmutableMap.of("write.format.default", format));
+ when(config.isUpsertMode()).thenReturn(true);
+ when(config.tablesDefaultIdColumns()).thenReturn("id,id2");
+ when(config.tablesCdcField()).thenReturn("_op");
+
+ when(table.spec()).thenReturn(SPEC);
+
+ // Mix of INSERT, UPDATE, DELETE in single partition
+ Record insert1 = createCDCRecord(1L, "partition-x", "C");
+ Record insert2 = createCDCRecord(2L, "partition-x", "C");
+ Record insert3 = createCDCRecord(3L, "partition-x", "C");
+ Record update1 = createCDCRecord(1L, "partition-x", "U");
+ Record delete2 = createCDCRecord(2L, "partition-x", "D");
+
+ WriteResult result =
+ writeTest(
+ ImmutableList.of(insert1, insert2, insert3, update1, delete2),
+ config,
+ PartitionedDeltaWriter.class);
+
+ // Single partition with all operations
+ assertThat(result.dataFiles()).hasSize(1);
+ assertThat(result.deleteFiles()).hasSize(1);
+ }
+
+ @ParameterizedTest
+ @CsvSource({"parquet,2", "parquet,3", "orc,2", "orc,3"})
+ public void testPartitionedNonUpsertMode(String format, int formatVersion) {
+ mockTableFormatVersion(formatVersion);
+
+ IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+ when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
+ when(config.writeProps()).thenReturn(ImmutableMap.of("write.format.default", format));
+ when(config.isUpsertMode()).thenReturn(false); // Non-upsert mode
+ when(config.tablesDefaultIdColumns()).thenReturn("id,id2");
+ when(config.tablesCdcField()).thenReturn("_op");
+
+ when(table.spec()).thenReturn(SPEC);
+
+ Record insert1 = createCDCRecord(1L, "part-a", "C");
+ Record insert2 = createCDCRecord(2L, "part-b", "C");
+ Record update1 = createCDCRecord(1L, "part-a", "U");
+ Record delete2 = createCDCRecord(2L, "part-b", "D");
+
+ WriteResult result =
+ writeTest(
+ ImmutableList.of(insert1, insert2, update1, delete2),
+ config,
+ PartitionedDeltaWriter.class);
+
+ // 2 partitions with data and delete files
+ assertThat(result.dataFiles()).hasSize(2);
+ assertThat(result.deleteFiles()).hasSize(2);
+ }
+
+ @ParameterizedTest
+ @CsvSource({"parquet,2", "parquet,3", "orc,2", "orc,3"})
+ public void testEmptyPartitions(String format, int formatVersion) {
+ mockTableFormatVersion(formatVersion);
+
+ IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+ when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
+ when(config.writeProps()).thenReturn(ImmutableMap.of("write.format.default", format));
+ when(config.isUpsertMode()).thenReturn(true);
+ when(config.tablesDefaultIdColumns()).thenReturn("id,id2");
+ when(config.tablesCdcField()).thenReturn("_op");
+
+ when(table.spec()).thenReturn(SPEC);
+
+ // Empty write should produce no files
+ WriteResult result = writeTest(ImmutableList.of(), config, PartitionedDeltaWriter.class);
+
+ assertThat(result.dataFiles()).isEmpty();
+ assertThat(result.deleteFiles()).isEmpty();
+ }
+
+ @ParameterizedTest
+ @CsvSource({"parquet,2", "parquet,3", "orc,2", "orc,3"})
+ public void testNestedCDCMixedOperationsMultiplePartitions(String format, int formatVersion) {
+ mockTableFormatVersion(formatVersion);
+
+ IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+ when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
+ when(config.writeProps()).thenReturn(ImmutableMap.of("write.format.default", format));
+ when(config.isUpsertMode()).thenReturn(true);
+ when(config.tablesDefaultIdColumns()).thenReturn("id,id2");
+ when(config.tablesCdcField()).thenReturn("_cdc.op");
+
+ // Update table schema to use nested CDC schema
+ when(table.schema()).thenReturn(CDC_SCHEMA_NESTED);
+ when(table.spec()).thenReturn(SPEC);
+
+ // Multiple partitions with mixed INSERT, UPDATE, DELETE operations using nested CDC field
+ Record row1 = createCDCRecord(1L, "insert-partition-a", CDC_SCHEMA_NESTED, "_cdc.op", "C");
+ Record row2 = createCDCRecord(2L, "insert-partition-b", CDC_SCHEMA_NESTED, "_cdc.op", "R");
+ Record row1Update =
+ createCDCRecord(1L, "updated-partition-a", CDC_SCHEMA_NESTED, "_cdc.op", "U");
+ Record row3 = createCDCRecord(3L, "insert-partition-a", CDC_SCHEMA_NESTED, "_cdc.op", "C");
+ Record row2Delete =
+ createCDCRecord(2L, "insert-partition-b", CDC_SCHEMA_NESTED, "_cdc.op", "D");
+
+ WriteResult result =
+ writeTest(
+ ImmutableList.of(row1, row2, row1Update, row3, row2Delete),
+ config,
+ PartitionedDeltaWriter.class);
+
+ // Multiple partitions with mixed operations
+ assertThat(result.dataFiles()).isNotEmpty();
+ assertThat(result.deleteFiles()).isNotEmpty();
+ }
+}
diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestRecordUtils.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestRecordUtils.java
index 7f6d01b8a3c3..0ae8cbf9eb38 100644
--- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestRecordUtils.java
+++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestRecordUtils.java
@@ -19,9 +19,32 @@
package org.apache.iceberg.connect.data;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.LocationProviders;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.connect.TableSinkConfig;
+import org.apache.iceberg.connect.events.TableReference;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.encryption.PlaintextEncryptionManager;
+import org.apache.iceberg.inmemory.InMemoryFileIO;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.types.Types;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
@@ -90,4 +113,86 @@ public void testExtractFromRecordValueMapNull() {
result = RecordUtils.extractFromRecordValue(val, "xkey");
assertThat(result).isNull();
}
+
+ private static final org.apache.iceberg.Schema CDC_SCHEMA =
+ new org.apache.iceberg.Schema(
+ ImmutableList.of(
+ Types.NestedField.required(1, "id", Types.LongType.get()),
+ Types.NestedField.required(2, "data", Types.StringType.get()),
+ Types.NestedField.required(3, "id2", Types.LongType.get()),
+ Types.NestedField.required(4, "_op", Types.StringType.get())),
+ ImmutableSet.of(1, 3));
+
+ @Test
+ public void testCreateTableWriterFormatVersion1ThrowsException() {
+ Table table = createMockTable(1);
+ IcebergSinkConfig config = createCdcConfig();
+ TableReference tableReference =
+ TableReference.of("test_catalog", TableIdentifier.of("test_table"), UUID.randomUUID());
+
+ assertThatThrownBy(() -> RecordUtils.createTableWriter(table, tableReference, config))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining(
+ "CDC and upsert modes are not supported for Iceberg table format version 1");
+ }
+
+ @Test
+ public void testCreateTableWriterFormatVersion2CreatesEqualityDeleteWriter() {
+ Table table = createMockTable(2);
+ IcebergSinkConfig config = createCdcConfig();
+ TableReference tableReference =
+ TableReference.of("test_catalog", TableIdentifier.of("test_table"), UUID.randomUUID());
+
+ try (TaskWriter writer = RecordUtils.createTableWriter(table, tableReference, config)) {
+ // Format version 2 should create UnpartitionedDeltaWriter with useDv=false
+ assertThat(writer).isInstanceOf(UnpartitionedDeltaWriter.class);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testCreateTableWriterFormatVersion3CreatesDeleteVectorWriter() {
+ Table table = createMockTable(3);
+ IcebergSinkConfig config = createCdcConfig();
+ TableReference tableReference =
+ TableReference.of("test_catalog", TableIdentifier.of("test_table"), UUID.randomUUID());
+
+ try (TaskWriter writer = RecordUtils.createTableWriter(table, tableReference, config)) {
+ // Format version 3 should create UnpartitionedDeltaWriter with useDv=true
+ assertThat(writer).isInstanceOf(UnpartitionedDeltaWriter.class);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Table createMockTable(int formatVersion) {
+ InMemoryFileIO fileIO = new InMemoryFileIO();
+ Table table = mock(Table.class, withSettings().extraInterfaces(HasTableOperations.class));
+ when(table.schema()).thenReturn(CDC_SCHEMA);
+ when(table.spec()).thenReturn(PartitionSpec.unpartitioned());
+ when(table.io()).thenReturn(fileIO);
+ when(table.locationProvider())
+ .thenReturn(LocationProviders.locationsFor("file", ImmutableMap.of()));
+ when(table.encryption()).thenReturn(PlaintextEncryptionManager.instance());
+ when(table.properties()).thenReturn(ImmutableMap.of());
+
+ TableOperations ops = mock(TableOperations.class);
+ TableMetadata metadata = mock(TableMetadata.class);
+ when(metadata.formatVersion()).thenReturn(formatVersion);
+ when(ops.current()).thenReturn(metadata);
+ when(((HasTableOperations) table).operations()).thenReturn(ops);
+
+ return table;
+ }
+
+ private IcebergSinkConfig createCdcConfig() {
+ IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+ when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
+ when(config.writeProps()).thenReturn(ImmutableMap.of());
+ when(config.isUpsertMode()).thenReturn(true);
+ when(config.tablesDefaultIdColumns()).thenReturn("id,id2");
+ when(config.tablesCdcField()).thenReturn("_op");
+ return config;
+ }
}
diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestUnpartitionedDeltaWriter.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestUnpartitionedDeltaWriter.java
new file mode 100644
index 000000000000..e72da0347d9e
--- /dev/null
+++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestUnpartitionedDeltaWriter.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.connect.TableSinkConfig;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+public class TestUnpartitionedDeltaWriter extends DeltaWriterTestBase {
+
+ @Override
+ @BeforeEach
+ public void before() {
+ super.before();
+ // Override the schema for CDC tests
+ when(table.schema()).thenReturn(CDC_SCHEMA);
+ }
+
+ @ParameterizedTest
+ @CsvSource({"parquet,2", "parquet,3", "orc,2", "orc,3"})
+ public void testCDCInsertOperations(String format, int formatVersion) {
+ mockTableFormatVersion(formatVersion);
+
+ IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+ when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
+ when(config.writeProps()).thenReturn(ImmutableMap.of("write.format.default", format));
+ when(config.isUpsertMode()).thenReturn(true);
+ when(config.tablesDefaultIdColumns()).thenReturn("id,id2");
+ when(config.tablesCdcField()).thenReturn("_op");
+
+ Record row1 = createCDCRecord(1L, "insert-c", "C");
+ Record row2 = createCDCRecord(2L, "insert-r", "R");
+ Record row3 = createCDCRecord(3L, "insert-c2", "C");
+
+ WriteResult result =
+ writeTest(ImmutableList.of(row1, row2, row3), config, UnpartitionedDeltaWriter.class);
+
+ assertThat(result.dataFiles()).hasSize(1);
+ assertThat(result.deleteFiles()).hasSize(0);
+
+ Arrays.asList(result.dataFiles())
+ .forEach(
+ file -> {
+ assertThat(file.format()).isEqualTo(FileFormat.fromString(format));
+ assertThat(file.recordCount()).isEqualTo(3);
+ });
+ }
+
+ @ParameterizedTest
+ @CsvSource({"parquet,2", "parquet,3", "orc,2", "orc,3"})
+ public void testCDCUpdateOperations(String format, int formatVersion) {
+ mockTableFormatVersion(formatVersion);
+
+ IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+ when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
+ when(config.writeProps()).thenReturn(ImmutableMap.of("write.format.default", format));
+ when(config.isUpsertMode()).thenReturn(true);
+ when(config.tablesDefaultIdColumns()).thenReturn("id,id2");
+ when(config.tablesCdcField()).thenReturn("_op");
+
+ // INSERT followed by UPDATE
+ Record row1Insert = createCDCRecord(100L, "original", "C");
+ Record row1Update = createCDCRecord(100L, "updated", "U");
+
+ WriteResult result =
+ writeTest(ImmutableList.of(row1Insert, row1Update), config, UnpartitionedDeltaWriter.class);
+
+ // In upsert mode, UPDATE = delete by key + insert
+ assertThat(result.dataFiles()).hasSize(1);
+ assertThat(result.deleteFiles()).hasSize(1);
+ }
+
+ @ParameterizedTest
+ @CsvSource({"parquet,2", "parquet,3", "orc,2", "orc,3"})
+ public void testCDCDeleteOperations(String format, int formatVersion) {
+ mockTableFormatVersion(formatVersion);
+
+ IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+ when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
+ when(config.writeProps()).thenReturn(ImmutableMap.of("write.format.default", format));
+ when(config.isUpsertMode()).thenReturn(true);
+ when(config.tablesDefaultIdColumns()).thenReturn("id,id2");
+ when(config.tablesCdcField()).thenReturn("_op");
+
+ // INSERT followed by DELETE
+ Record row1Insert = createCDCRecord(200L, "to-be-deleted", "C");
+ Record row1Delete = createCDCRecord(200L, "to-be-deleted", "D");
+
+ WriteResult result =
+ writeTest(ImmutableList.of(row1Insert, row1Delete), config, UnpartitionedDeltaWriter.class);
+
+ // In upsert mode, DELETE = delete by key (no insert)
+ assertThat(result.dataFiles()).hasSize(1);
+ assertThat(result.deleteFiles()).hasSize(1);
+ }
+
+ @ParameterizedTest
+ @CsvSource({"parquet,2", "parquet,3", "orc,2", "orc,3"})
+ public void testCDCMixedOperations(String format, int formatVersion) {
+ mockTableFormatVersion(formatVersion);
+
+ IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+ when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
+ when(config.writeProps()).thenReturn(ImmutableMap.of("write.format.default", format));
+ when(config.isUpsertMode()).thenReturn(true);
+ when(config.tablesDefaultIdColumns()).thenReturn("id,id2");
+ when(config.tablesCdcField()).thenReturn("_op");
+
+ // Mix of INSERT, UPDATE, DELETE in single batch
+ Record insert1 = createCDCRecord(1L, "row1", "C");
+ Record insert2 = createCDCRecord(2L, "row2", "C");
+ Record update1 = createCDCRecord(1L, "row1-updated", "U");
+ Record delete2 = createCDCRecord(2L, "row2", "D");
+ Record insert3 = createCDCRecord(3L, "row3", "C");
+
+ WriteResult result =
+ writeTest(
+ ImmutableList.of(insert1, insert2, update1, delete2, insert3),
+ config,
+ UnpartitionedDeltaWriter.class);
+
+ assertThat(result.dataFiles()).hasSize(1);
+ assertThat(result.deleteFiles()).hasSize(1);
+ }
+
+ @ParameterizedTest
+ @CsvSource({"parquet,2", "parquet,3", "orc,2", "orc,3"})
+ public void testNonUpsertMode(String format, int formatVersion) {
+ mockTableFormatVersion(formatVersion);
+
+ IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+ when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
+ when(config.writeProps()).thenReturn(ImmutableMap.of("write.format.default", format));
+ when(config.isUpsertMode()).thenReturn(false); // Non-upsert mode
+ when(config.tablesDefaultIdColumns()).thenReturn("id,id2");
+ when(config.tablesCdcField()).thenReturn("_op");
+
+ Record insert = createCDCRecord(1L, "row1", "C");
+ Record update = createCDCRecord(1L, "row1-updated", "U");
+ Record delete = createCDCRecord(1L, "row1-updated", "D");
+
+ WriteResult result =
+ writeTest(ImmutableList.of(insert, update, delete), config, UnpartitionedDeltaWriter.class);
+
+ // In non-upsert mode, deletes use full row comparison
+ assertThat(result.dataFiles()).hasSize(1);
+ assertThat(result.deleteFiles()).hasSize(1);
+ }
+
+ @ParameterizedTest
+ @CsvSource({"parquet,2", "parquet,3", "orc,2", "orc,3"})
+ public void testDataCorrectnessDVMode(String format, int formatVersion) {
+ mockTableFormatVersion(formatVersion);
+
+ IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+ when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
+ when(config.writeProps()).thenReturn(ImmutableMap.of("write.format.default", format));
+ when(config.isUpsertMode()).thenReturn(true);
+ when(config.tablesDefaultIdColumns()).thenReturn("id,id2");
+ when(config.tablesCdcField()).thenReturn("_op");
+
+ Record insert1 = createCDCRecord(1L, "data1", "C");
+ Record insert2 = createCDCRecord(2L, "data2", "C");
+ Record update1 = createCDCRecord(1L, "data1-updated", "U");
+
+ WriteResult result =
+ writeTest(
+ ImmutableList.of(insert1, insert2, update1), config, UnpartitionedDeltaWriter.class);
+
+ assertThat(result.dataFiles()).hasSize(1);
+ assertThat(result.deleteFiles()).hasSize(1);
+
+ Arrays.asList(result.dataFiles())
+ .forEach(
+ file -> {
+ assertThat(file.format()).isEqualTo(FileFormat.fromString(format));
+ assertThat(file.recordCount()).isEqualTo(3);
+ });
+ }
+
+ @ParameterizedTest
+ @CsvSource({"parquet,2", "parquet,3", "orc,2", "orc,3"})
+ public void testEmptyWriteResult(String format, int formatVersion) {
+ mockTableFormatVersion(formatVersion);
+
+ IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+ when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
+ when(config.writeProps()).thenReturn(ImmutableMap.of("write.format.default", format));
+ when(config.isUpsertMode()).thenReturn(true);
+ when(config.tablesDefaultIdColumns()).thenReturn("id,id2");
+ when(config.tablesCdcField()).thenReturn("_op");
+
+ WriteResult result = writeTest(ImmutableList.of(), config, UnpartitionedDeltaWriter.class);
+
+ // Empty write should produce no files
+ assertThat(result.dataFiles()).isEmpty();
+ assertThat(result.deleteFiles()).isEmpty();
+ }
+
+ @ParameterizedTest
+ @CsvSource({"parquet,2", "parquet,3", "orc,2", "orc,3"})
+ public void testNestedCDCMixedOperations(String format, int formatVersion) {
+ mockTableFormatVersion(formatVersion);
+
+ IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+ when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
+ when(config.writeProps()).thenReturn(ImmutableMap.of("write.format.default", format));
+ when(config.isUpsertMode()).thenReturn(true);
+ when(config.tablesDefaultIdColumns()).thenReturn("id,id2");
+ when(config.tablesCdcField()).thenReturn("_cdc.op");
+
+ // Update table schema to use nested CDC schema
+ when(table.schema()).thenReturn(CDC_SCHEMA_NESTED);
+
+ // Mix of INSERT, UPDATE, DELETE in single batch with nested CDC field
+ Record insert1 = createCDCRecord(1L, "row1", CDC_SCHEMA_NESTED, "_cdc.op", "C");
+ Record insert2 = createCDCRecord(2L, "row2", CDC_SCHEMA_NESTED, "_cdc.op", "C");
+ Record update1 = createCDCRecord(1L, "row1-updated", CDC_SCHEMA_NESTED, "_cdc.op", "U");
+ Record delete2 = createCDCRecord(2L, "row2", CDC_SCHEMA_NESTED, "_cdc.op", "D");
+ Record insert3 = createCDCRecord(3L, "row3", CDC_SCHEMA_NESTED, "_cdc.op", "C");
+
+ WriteResult result =
+ writeTest(
+ ImmutableList.of(insert1, insert2, update1, delete2, insert3),
+ config,
+ UnpartitionedDeltaWriter.class);
+
+ assertThat(result.dataFiles()).hasSize(1);
+ assertThat(result.deleteFiles()).hasSize(1);
+ }
+}
diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/WriterTestBase.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/WriterTestBase.java
index 30b60fb3c542..31f1193feb8c 100644
--- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/WriterTestBase.java
+++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/WriterTestBase.java
@@ -21,15 +21,19 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.UUID;
+import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.LocationProviders;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.connect.IcebergSinkConfig;
import org.apache.iceberg.connect.events.TableReference;
@@ -64,7 +68,7 @@ public class WriterTestBase {
public void before() {
fileIO = new InMemoryFileIO();
- table = mock(Table.class);
+ table = mock(Table.class, withSettings().extraInterfaces(HasTableOperations.class));
when(table.schema()).thenReturn(SCHEMA);
when(table.spec()).thenReturn(PartitionSpec.unpartitioned());
when(table.io()).thenReturn(fileIO);
@@ -72,6 +76,23 @@ public void before() {
.thenReturn(LocationProviders.locationsFor("file", ImmutableMap.of()));
when(table.encryption()).thenReturn(PlaintextEncryptionManager.instance());
when(table.properties()).thenReturn(ImmutableMap.of());
+
+ // Default to format version 3 (supports delete vectors)
+ mockTableFormatVersion(3);
+ }
+
+ /**
+ * Configure the mock table to return the specified format version. This is useful for testing
+ * behavior that depends on the table format version (e.g., CDC/upsert mode support).
+ *
+ * @param formatVersion the format version to mock (1, 2, or 3)
+ */
+ protected void mockTableFormatVersion(int formatVersion) {
+ TableOperations ops = mock(TableOperations.class);
+ TableMetadata metadata = mock(TableMetadata.class);
+ when(metadata.formatVersion()).thenReturn(formatVersion);
+ when(ops.current()).thenReturn(metadata);
+ when(((HasTableOperations) table).operations()).thenReturn(ops);
}
protected WriteResult writeTest(