Skip to content

Conversation

@t3hw
Copy link

@t3hw t3hw commented Dec 8, 2025

Introduce Delta Writer functionality for both unpartitioned and partitioned tables, enabling CDC and upsert modes. Enhance configuration options for CDC fields, upsert mode, and DV usage.

Inspired by #12070
Resolves #10842

@bryanck

edit:
"using DVs for CDC" - DVs only help for in-batch deduplication.
Out of batch deletes/updates fall back to equality deletes.

Partitioning the table is highly recommended, periodically compacting the table when using CDC mode is mandatory.

Copy link
Contributor

@bryanck bryanck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR.

As you may know, the original (non-Apache) sink had delta writer support that relied on equality deletes. When the sink was contributed to this project, the community decided it was best to remove that functionality, as using it can result in severely degraded performance. This can lead to poor user experience, reflect badly on the project, and increase support-related questions. Also, there are alternative solutions that don't have the same issues.

We should revisit those discussions and resolve those concerns before we proceed with this.

@t3hw
Copy link
Author

t3hw commented Dec 8, 2025

Thanks for the PR.

As you may know, the original (non-Apache) sink had delta writer support that relied on equality deletes. When the sink was contributed to this project, the community decided it was best to remove that functionality, as using it can result in severely degraded performance. This can lead to poor user experience, reflect badly on the project, and increase support-related questions. Also, there are alternative solutions that don't have the same issues.

We should revisit those discussions and resolve those concerns before we proceed with this.

Thanks for the reply!
I followed the Flink Sink implementation and attempted to port it into Kafka Connect.
Would it be viable to use the connector plugin for CDC with DV mode enabled?
also, should the iceberg table properties be set to read-on-merge for this use case?

@hpcnt-stewart
Copy link

hpcnt-stewart commented Dec 12, 2025

I believe this feature is absolutely essential.

@SeongJuMoon
Copy link

SeongJuMoon commented Dec 12, 2025

Thanks for contributing for iceberg ecosystem.
In my view, utilising Kafka sink connect could readily facilitate efforts to establish SCD Type 1. Whilst this functionality may prove difficult to implement due to various internal community considerations, such as potential degradation in write and read performance, I ultimately hope it will be integrated so that diverse users can further benefit from Iceberg's advantages.

@t3hw
Copy link
Author

t3hw commented Dec 12, 2025

For what it's worth, I tested it under a modest CDC load, and it seems to be working fine.
I can make another commit that locks the CDC mode to use DV only, but that would make the connector incompatible with v2 tables.
Regardless, the exception thrown when the table is a v2 table makes it clear that using DV mode is what causes the failure. At this point, users can decide for themselves how they want to proceed.

@t3hw t3hw requested a review from bryanck December 15, 2025 09:53
@rajansadasivan
Copy link

rajansadasivan commented Dec 17, 2025

We did a test with this PR branch with properties in kafka connect for sink connector on AWS
iceberg.tables.use-dv: "true"
iceberg.tables.write-props.format-version: "3"

We found the table was created in version 2 in metadata file. So this upsert & DV support is for version 2 tables only ?
The upsert mode for insert/update/delete worked as expected for all tables with PK=id.

@t3hw
Copy link
Author

t3hw commented Dec 17, 2025

We did a test with this PR branch with properties in kafka connect for sink connector on AWS
iceberg.tables.use-dv: "true"
iceberg.tables.write-props.format-version: "3"

We found the table was created in version 2 in metadata file. So this upsert & DV support is for version 2 tables only ?
The upsert mode for insert/update/delete worked as expected for all tables with PK=id.

Yeah, it's a limitation. I should have probably made a note about it above, but it is kind of out of scope for this PR.

What I ended up doing is writing some code that interfaces with the iceberg catalog outside of kafka connect and initilizes the table ahead of time with the correct format version

@nalawadechinmay
Copy link

nalawadechinmay commented Jan 6, 2026

@bryanck @t3hw Great to see the progress here! Our team is looking to use this connector for a project that requires iceberg sink connector upsert/CDC support. Is there any guidance on whether the implementation part of this PR will be accommodated in a later release? Any insight into the timeline or priority would be very helpful for our planning."

@karankk007
Copy link

karankk007 commented Jan 6, 2026

@bryanck @t3hw Great progress here, We are looking to use Apache Iceberg sink connector, and we need sink connector upsert functionality.

Could you please clarify whether there are plans to support or merge this functionality in the future? If this is on the roadmap, we can proceed with this approach. Otherwise, we may need to consider alternative solutions, such as using Flink.

It would be helpful to understand the expected direction or future of this PR. Thanks for the guidance.

@ashokcoupa
Copy link

@bryanck +1 in strong support of this PR.

The addition of Delta Writer and upsert/CDC support in the Kafka Connect sink unlocks important production use cases. It significantly simplifies incremental and CDC-based data pipelines.

Many users including our team are looking to adopt Iceberg for CDC/upsert workflows, and having this merged into main would meaningfully improve adoption while reducing long-term maintenance and workarounds.

We’ve also tested this PR under a sustained load of ~100 TPS for about 30 minutes and observed a stable lag in the range of 7–10 minutes. Notably, the lag did not scale proportionally with load, which is a promising signal for production readiness.

Thanks for the thoughtful work on this, could the community consider moving this change forward in the near future? We’d be excited to see it included in an upcoming release.

@supalp
Copy link

supalp commented Jan 6, 2026

@bryanck @t3hw and community,

We need the ability to have the updates merged and this PR solves that. I understand the review and merge can take time so not asking to rush it but we would really benefit by getting a feedback from the community on:

  1. Does this capability aligns with the roadmap for the Kafka connect?
  2. If not, what are the major road blocks?

We are blocked on our implementation journey pending this PR merge hence asking if this is confirmed to be on the roadmap or not. Appreciate your urgent attention and everything you and other contributors do.

Thank you!

@hladush
Copy link

hladush commented Jan 9, 2026

@bryanck could you review this and, if you’re comfortable, approve?

We’re seeing growing interest from teams that manage their own Kafka infrastructure to use this sink. While our Data Platform teams can achieve similar outcomes with Spark Streaming or the older Iceberg sink (https://github.com/databricks/iceberg-kafka-connect), I believe the current Iceberg sink simplifies day-to-day work for a lot of acquisition teams
You noted: This can lead to poor user experience, reflect badly on the project, and increase support-related questions. Also, there are alternative solutions that don't have the same issues. It's a valid point and totally makes sense. In my view, the main gap is documentation - something we can fix. Strong docs and examples should reduce support load significantly. The alternatives either require additional infrastructure (e.g., Flink) or are less straightforward to operate for teams (Spark).
If you’re open to it, I’ll add a docs package (setup, config, examples, and a troubleshooting guide) as part of this change. Happy to discuss.

@bryanck
Copy link
Contributor

bryanck commented Jan 9, 2026

@hladush This is something the Iceberg community has decided, including PMC members, so we'd need to get the community on board in order to proceed. You can raise this as a topic on the dev list (again) or in a community sync if you want.

@hladush
Copy link

hladush commented Jan 13, 2026

@bryanck thanks a lot for your response, could you please share with me any doc how to do that ?

@supalp
Copy link

supalp commented Jan 13, 2026

@hladush - thanks for the voice of support for this change. We would love to partner and add our perspective on why this is an important change for consideration by PMV and getting this added in dev list as suggested vt @bryanck

@t3hw t3hw changed the title Implement Iceberg Kafka Connect with Delta Writer Support in DV Mode Implement Iceberg Kafka Connect with Delta Writer Support in DV Mode for in-batch deduplication Jan 13, 2026
@t3hw
Copy link
Author

t3hw commented Jan 13, 2026

added a small clarification to the PR description:

"using DVs for CDC" - DVs only help for in-batch deduplication.
Out of batch deletes/updates fall back to equality deletes.
Partitioning the table is highly recommended, periodically compacting the table when using CDC mode is mandatory.

@t3hw
Copy link
Author

t3hw commented Jan 13, 2026

Pasting my comment from another discussion about this PR:

Original commenter said:
The primary issue with using DVs for this is identifying the position of a deleted row in the dataset. If receive a delete record for an existing row, you need to scan the table to locate the file+position in order to create the DV. That results is lots of scan behavior and data materialization on the worker, which really isn't practical/scalable.
That issue is the main issue that equality deletes address, but equality deletes also introduce significant maintenance burden that isn't available in KC natively. This means in order to use this approach with KC, you also need Spark or equivalent to aggressively maintain the deletes.
Based on the PR, I'm not clear how these issues are addressed

And the response:
My implementation of BaseDeltaWriter inherits the insertedRowMap from the BaseEqualityDeltaWriter class. It deletes using the DV Writer by default, while falling back to Equality Deletes if the key was not found. The map is cleared when the batch is closed and the file gets written.
I believe RisingWave is using a similar approach.

If this gets accepted the documentation should be updated to let users know that compaction is highly recommended.
The implementation includes PartitionedDeltaWriter which routes records to partition-specific writers. This means equality deletes are scoped to their partition - query engines only need to scan files within that partition, not the entire table. For well-partitioned tables (e.g., by date), this significantly reduces the read-time cost of equality deletes.
Another approach would be to keep a separate index of the id-field->parquet file, but running compactions is required regardless, so this is kind of a wasted effort as it would need to be replicated, persisted, or managed outside of kafka connect, and it would lose its state and have to be re-built once a compaction is triggered.
I agree the PR description was misleading about "using DVs for CDC" - DVs only help for in-batch deduplication. I'll update the description to be clearer.

Given that equality deletes + compaction is how Flink handles CDC (and is the industry standard), would this approach be acceptable if we:

  1. Update the PR description to accurately describe the behavior
  2. Document that periodic compaction is required for production use
  3. Recommend proper table partitioning to minimize equality delete scan overhead
  4. Provide example compaction commands in the docs

@sagarm-traveloka
Copy link

Thank you @t3hw for quick fix for the cdc-field and it works. 👍

@t3hw
Copy link
Author

t3hw commented Jan 20, 2026

Removed the use-dv property in favor of automatic format-version detection

(cc @hladush @rajansadasivan)

@t3hw t3hw force-pushed the cdc-support-with-DV branch from 2dc6b8f to 6f410bd Compare January 20, 2026 13:29
@t3hw t3hw force-pushed the cdc-support-with-DV branch from 6f410bd to 6e5d469 Compare January 20, 2026 14:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Kafka Connect: Add delta writer support