Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36549]Fix using the ignore-parse-errors parameter in Debezium/Canal/Maxwell/Ogg JSON results in unexpected data loss #25919

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

xiaoyu19950911
Copy link

What is the purpose of the change

Enabling the ignore-parse-errors parameter does not skip exceptions caused by the format of the emitted data.

Brief change log

  • Enabling the ignore-parse-errors parameter does not skip exceptions caused by the format of the emitted data.

Verifying this change

This change added tests and can be verified as follows:

  • CanalJsonSerDeSchemaTest#testIgnoreParseErrors
  • DebeziumJsonSerDeSchemaTest#testIgnoreParseErrors
  • MaxwellJsonSerDerTest#testIgnoreParseErrors
  • OggJsonSerDeSchemaTest#testIgnoreParseErrors

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (yes)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Jan 8, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@xiaoyu19950911
Copy link
Author

@flinkbot run azure

… Debezium/Canal/Maxwell/Ogg JSON results in unexpected data loss
@davidradl
Copy link
Contributor

Reviewed by Chi on 09/01/2025 Need a committer to review

Copy link
Member

@libenchao libenchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiaoyu19950911 Thank you for your contribution, I've left my comments, the PR is generally in a good shape.

@@ -214,6 +215,7 @@ public void deserialize(@Nullable byte[] message, Collector<RowData> out) throws
if (message == null || message.length == 0) {
return;
}
List<GenericRowData> genericRowDataList = new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add a transient field to CacalJsonDeserializationSchema to avoid creating new list for every record, it's more efficient, WDYT?

@@ -73,10 +76,69 @@ void testFilteringTables() throws Exception {
InternalTypeInfo.of(PHYSICAL_DATA_TYPE.getLogicalType()))
.setDatabase("^my.*")
.setTable("^prod.*")
.setIgnoreParseErrors(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of enabling ignore-parse-error for this test case?


@Override
public void collect(RowData record) {
if (record.getRowKind().equals(RowKind.DELETE) && record.getInt(0) == 103) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The criteria is too specific, could we just throw exception directly no matter what the data is? We can only test the case that when exception occurs in chained downstream operator, the exception is correctly thrown.

@@ -319,4 +381,22 @@ public void close() {
// do nothing
}
}

private static class SpecialCollector implements Collector<RowData> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about naming it ThrowingExceptionCollector?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants