Flink: Add CDC (Change Data Capture) streaming read support#15282
Open
fightBoxing wants to merge 3 commits intoapache:mainfrom
Open
Flink: Add CDC (Change Data Capture) streaming read support#15282fightBoxing wants to merge 3 commits intoapache:mainfrom
fightBoxing wants to merge 3 commits intoapache:mainfrom
Conversation
This PR adds CDC support for Flink Iceberg source, allowing users to read
changelog data with proper RowKind (INSERT, DELETE, UPDATE_BEFORE, UPDATE_AFTER).
Key changes:
- Add StreamingReadMode enum (APPEND_ONLY, CHANGELOG)
- Add ChangelogDataIterator for iterating changelog scan tasks
- Add RowDataChangelogScanTaskReader for reading changelog data
- Add ChangelogScanSplit for CDC split handling
- Add ChangelogRowDataReaderFunction for reader function support
- Modify FlinkReadOptions/FlinkReadConf to support streaming-read-mode config
- Modify ScanContext to support changelog scan mode
- Modify FlinkSplitPlanner to plan changelog scan tasks
- Modify ContinuousSplitPlannerImpl to support CDC mode
- Modify IcebergSource to support streamingReadMode builder method
- Modify IcebergTableSource to support CDC ChangelogMode
- Add integration tests for CDC streaming read
Usage:
- Java API: IcebergSource.forRowData().streamingReadMode(StreamingReadMode.CHANGELOG)
- SQL: SELECT * FROM table /*+ OPTIONS('streaming-read-mode' = 'CHANGELOG') */
Supported Flink versions: v1.20, v2.0, v2.1
added 2 commits
February 10, 2026 17:09
- Fix line length violations in FlinkSplitPlanner, IcebergSource, RowDataChangelogScanTaskReader - Remove unused import FlinkReadConf in IcebergTableSource - Fix indentation in ContinuousSplitPlannerImpl - Remove trailing blank lines in ChangelogDataIterator - Fix test formatting in TestIcebergSourceCdcStreaming
- Rewrite ChangelogRowDataReaderFunction to implement ReaderFunction<RowData> directly instead of extending DataIteratorReaderFunction, fixing: - batcher() method not accessible (private in parent class) - ChangelogDataIterator not compatible with DataIterator type hierarchy - Add custom ChangelogBatchIterator using Pool and ArrayBatchRecords for efficient batch processing of changelog records - Delegate normal (non-changelog) splits to RowDataReaderFunction - Fix v2.0 API incompatibility: use correct Flink 2.0 imports (org.apache.flink.table.legacy.api.TableSchema) - Fix v2.0 IcebergTableSource: use correct applyProjection(int[][], DataType) signature matching Flink 2.0 SupportsProjectionPushDown interface - Add CDC getChangelogMode support to v2.0 and v2.1 IcebergTableSource - Fix PreferUncheckedIoException: use UncheckedIOException instead of RuntimeException when wrapping IOException in ContinuousSplitPlannerImpl - Remove unused 'limit' field from ChangelogRowDataReaderFunction
mxm
reviewed
Feb 13, 2026
Contributor
mxm
left a comment
There was a problem hiding this comment.
Thanks for the PR @fightBoxing!
- Do you mind briefly describing the approach taken in this PR? I'm assuming this does some kind of merge-on-read. What is the general architecture? Are there any limitations to the approach taken in this PR?
- Could you remove all the files except for Flink 2.1? We usually merge support for the latest version first and then backport to the older ones.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This PR adds CDC support for Flink Iceberg source, allowing users to read changelog data with proper RowKind (INSERT, DELETE, UPDATE_BEFORE, UPDATE_AFTER).
Key changes:
Usage:
Supported Flink versions: v1.20, v2.0, v2.1