pulsar: support debezium protocol#5054
Conversation
Extend IsPulsarSupportedProtocols to include ProtocolDebezium so that Pulsar changefeeds can use the debezium message format. The debezium codec is already implemented and shared with the Kafka sink via the common codec builder, so no additional encoding logic is needed.
|
Hi @HGHNice. Thanks for your PR. I'm waiting for a pingcap member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
Welcome @HGHNice! |
📝 WalkthroughWalkthroughPulsar sink now accepts ChangesPulsar Debezium Protocol Support
🎯 3 (Moderate) | ⏱️ ~20 minutes
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request adds support for the Debezium protocol to the Pulsar sink by updating the protocol validation logic and error messages. Feedback includes a recommendation to replace a hardcoded boolean with a dynamic protocol check in the event router, a request for additional test coverage in existing protocol parsing and string conversion tests, and a minor grammatical correction in a user-facing error message.
- Make isAvro parameter dynamic in NewEventRouter call - Add debezium and simple cases to protocol parsing and string tests
|
/ok-to-test |
|
Please add the Debezium integration tests for Pulsar. |
Pulsar Debezium integration tests have been added. Thank! |
|
/test all |
[LGTM Timeline notifier]Timeline:
|
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: 3AceShowHand, flowbehappy, wk989898 The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
|
@HGHNice: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
4495deb to
aa6dee8
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
tests/integration_tests/debezium_basic/run.sh (1)
13-15: Prevent silent no-op on unexpectedSINK_TYPEindebezium_basicCI runs
tests/integration_tests/run.shinvokes each integration test asbash "$script" "$sink_type", andrun_light_it_in_ci.sh/run_heavy_it_in_ci.shexit on anysink_typeother thanmysql|kafka|pulsar|storage. Inrun_heavy_it_in_ci.sh,debezium_basicis scheduled only in thekafka_groupsandpulsar_groups, sotests/integration_tests/debezium_basic/run.shis only called withkafkaorpulsarin CI—meaning thereturnbranch for other values won’t be hit. Add an explicit error/skip message (instead of a barereturn) if you want protection for manual/developer invocations with an unexpected argument.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/integration_tests/debezium_basic/run.sh` around lines 13 - 15, The script currently does a silent `return` when SINK_TYPE is not "kafka" or "pulsar", which can hide unexpected invocations; replace the bare `return` in the conditional that checks the SINK_TYPE variable with an explicit message and exit to make behavior clear (e.g., echo "Skipping debezium_basic: unsupported SINK_TYPE='$SINK_TYPE'" and exit 0 for a deliberate skip, or exit 1 if you prefer treating it as an error). Update the branch guarding SINK_TYPE so it prints that message (including the value of SINK_TYPE) and exits accordingly instead of silently returning.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@tests/integration_tests/debezium_basic/run.sh`:
- Line 32: Calls to run_pulsar_cluster (and similar commands) use unquoted
variable expansions like run_pulsar_cluster $WORK_DIR normal which can break on
whitespace or globbing; update each call (referenced occurrences of
run_pulsar_cluster and any other command invocations using WORK_DIR or similar
variables at the noted locations) to quote variables (e.g. run_pulsar_cluster
"$WORK_DIR" "normal" or quote each expanded variable) so arguments are passed
safely and globbing/word-splitting is avoided.
---
Nitpick comments:
In `@tests/integration_tests/debezium_basic/run.sh`:
- Around line 13-15: The script currently does a silent `return` when SINK_TYPE
is not "kafka" or "pulsar", which can hide unexpected invocations; replace the
bare `return` in the conditional that checks the SINK_TYPE variable with an
explicit message and exit to make behavior clear (e.g., echo "Skipping
debezium_basic: unsupported SINK_TYPE='$SINK_TYPE'" and exit 0 for a deliberate
skip, or exit 1 if you prefer treating it as an error). Update the branch
guarding SINK_TYPE so it prints that message (including the value of SINK_TYPE)
and exits accordingly instead of silently returning.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: b729cbf1-1c49-4120-9a63-348d7c9f8095
📒 Files selected for processing (1)
tests/integration_tests/debezium_basic/run.sh
| fi | ||
|
|
||
| if [ "$SINK_TYPE" == "pulsar" ]; then | ||
| run_pulsar_cluster $WORK_DIR normal |
There was a problem hiding this comment.
Quote command arguments expanded from variables.
Line [32], Line [36], Line [40], and Line [44] use unquoted variable expansions in command arguments. This can break on whitespace/glob characters and is easy to harden.
Suggested patch
- run_pulsar_cluster $WORK_DIR normal
+ run_pulsar_cluster "$WORK_DIR" normal
@@
- cdc_cli_changefeed create --sink-uri="$SINK_URI" --config=$CUR/conf/changefeed.toml
+ cdc_cli_changefeed create --sink-uri="$SINK_URI" --config="$CUR/conf/changefeed.toml"
@@
- run_kafka_consumer $WORK_DIR $SINK_URI $CUR/conf/changefeed.toml
+ run_kafka_consumer "$WORK_DIR" "$SINK_URI" "$CUR/conf/changefeed.toml"
@@
- run_pulsar_consumer --upstream-uri $SINK_URI --config $CUR/conf/changefeed.toml
+ run_pulsar_consumer --upstream-uri "$SINK_URI" --config "$CUR/conf/changefeed.toml"Also applies to: 36-36, 40-40, 44-44
🧰 Tools
🪛 Shellcheck (0.11.0)
[info] 32-32: Double quote to prevent globbing and word splitting.
(SC2086)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/integration_tests/debezium_basic/run.sh` at line 32, Calls to
run_pulsar_cluster (and similar commands) use unquoted variable expansions like
run_pulsar_cluster $WORK_DIR normal which can break on whitespace or globbing;
update each call (referenced occurrences of run_pulsar_cluster and any other
command invocations using WORK_DIR or similar variables at the noted locations)
to quote variables (e.g. run_pulsar_cluster "$WORK_DIR" "normal" or quote each
expanded variable) so arguments are passed safely and globbing/word-splitting is
avoided.
Source: Linters/SAST tools
What problem does this PR solve?
Issue Number: close #5056
The Pulsar sink currently only supports
canal-json. Users who consumeTiCDC events via Pulsar with Debezium-compatible consumers (e.g. Flink CDC)
have no way to use the standard Debezium message format.
What is changed and how it works?
IsPulsarSupportedProtocols()inpkg/config/sink_protocol.goto include
ProtocolDebezium.downstreamadapter/sink/pulsar/helper.goto reflect the expanded protocol list.
sink via the common codec builder, so no additional encoding logic is needed.
TestIsPulsarSupportedProtocols.Check List
Tests
Questions
Will it cause performance regression or break compatibility?
No. Existing
canal-jsonbehavior is unchanged.Do you need to update user documentation, design documentation or monitoring documentation?
The Pulsar sink docs should note that
debeziumis now a validprotocolvalue.Release note
Summary by CodeRabbit
New Features
Tests