Skip to content

Commit bd6e112

Browse files
author
xiaoyu
committed
[FLINK-36549][formats] Fix using the ignore-parse-errors parameter in Debezium/Canal/Maxwell/Ogg JSON results in unexpected data loss
1 parent 08990c7 commit bd6e112

File tree

12 files changed

+417
-38
lines changed

12 files changed

+417
-38
lines changed

flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
import java.io.IOException;
4242
import java.io.Serializable;
43+
import java.util.ArrayList;
4344
import java.util.List;
4445
import java.util.Objects;
4546
import java.util.regex.Pattern;
@@ -214,6 +215,7 @@ public void deserialize(@Nullable byte[] message, Collector<RowData> out) throws
214215
if (message == null || message.length == 0) {
215216
return;
216217
}
218+
List<GenericRowData> genericRowDataList = new ArrayList<>();
217219
try {
218220
final JsonNode root = jsonDeserializer.deserializeToJsonNode(message);
219221
if (database != null) {
@@ -238,7 +240,7 @@ public void deserialize(@Nullable byte[] message, Collector<RowData> out) throws
238240
for (int i = 0; i < data.size(); i++) {
239241
GenericRowData insert = (GenericRowData) data.getRow(i, fieldCount);
240242
insert.setRowKind(RowKind.INSERT);
241-
emitRow(row, insert, out);
243+
genericRowDataList.add(handleRow(row, insert));
242244
}
243245
} else if (OP_UPDATE.equals(type)) {
244246
// "data" field is an array of row, contains new rows
@@ -260,16 +262,16 @@ public void deserialize(@Nullable byte[] message, Collector<RowData> out) throws
260262
}
261263
before.setRowKind(RowKind.UPDATE_BEFORE);
262264
after.setRowKind(RowKind.UPDATE_AFTER);
263-
emitRow(row, before, out);
264-
emitRow(row, after, out);
265+
genericRowDataList.add(handleRow(row, before));
266+
genericRowDataList.add(handleRow(row, after));
265267
}
266268
} else if (OP_DELETE.equals(type)) {
267269
// "data" field is an array of row, contains deleted rows
268270
ArrayData data = row.getArray(0);
269271
for (int i = 0; i < data.size(); i++) {
270272
GenericRowData insert = (GenericRowData) data.getRow(i, fieldCount);
271273
insert.setRowKind(RowKind.DELETE);
272-
emitRow(row, insert, out);
274+
genericRowDataList.add(handleRow(row, insert));
273275
}
274276
} else if (OP_CREATE.equals(type)) {
275277
// "data" field is null and "type" is "CREATE" which means
@@ -290,14 +292,15 @@ public void deserialize(@Nullable byte[] message, Collector<RowData> out) throws
290292
format("Corrupt Canal JSON message '%s'.", new String(message)), t);
291293
}
292294
}
295+
for (GenericRowData genericRowData : genericRowDataList) {
296+
out.collect(genericRowData);
297+
}
293298
}
294299

295-
private void emitRow(
296-
GenericRowData rootRow, GenericRowData physicalRow, Collector<RowData> out) {
300+
private GenericRowData handleRow(GenericRowData rootRow, GenericRowData physicalRow) {
297301
// shortcut in case no output projection is required
298302
if (!hasMetadata) {
299-
out.collect(physicalRow);
300-
return;
303+
return physicalRow;
301304
}
302305
final int physicalArity = physicalRow.getArity();
303306
final int metadataArity = metadataConverters.length;
@@ -310,7 +313,7 @@ private void emitRow(
310313
producedRow.setField(
311314
physicalArity + metadataPos, metadataConverters[metadataPos].convert(rootRow));
312315
}
313-
out.collect(producedRow);
316+
return producedRow;
314317
}
315318

316319
@Override

flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
import java.io.IOException;
3737
import java.io.Serializable;
38+
import java.util.ArrayList;
3839
import java.util.List;
3940
import java.util.Objects;
4041
import java.util.stream.Collectors;
@@ -132,6 +133,7 @@ public void deserialize(byte[] message, Collector<RowData> out) throws IOExcepti
132133
// skip tombstone messages
133134
return;
134135
}
136+
List<GenericRowData> genericRowDataList = new ArrayList<>();
135137
try {
136138
GenericRowData row = (GenericRowData) jsonDeserializer.deserialize(message);
137139
GenericRowData payload;
@@ -146,23 +148,23 @@ public void deserialize(byte[] message, Collector<RowData> out) throws IOExcepti
146148
String op = payload.getField(2).toString();
147149
if (OP_CREATE.equals(op) || OP_READ.equals(op)) {
148150
after.setRowKind(RowKind.INSERT);
149-
emitRow(row, after, out);
151+
genericRowDataList.add(handleRow(row, after));
150152
} else if (OP_UPDATE.equals(op)) {
151153
if (before == null) {
152154
throw new IllegalStateException(
153155
String.format(REPLICA_IDENTITY_EXCEPTION, "UPDATE"));
154156
}
155157
before.setRowKind(RowKind.UPDATE_BEFORE);
156158
after.setRowKind(RowKind.UPDATE_AFTER);
157-
emitRow(row, before, out);
158-
emitRow(row, after, out);
159+
genericRowDataList.add(handleRow(row, before));
160+
genericRowDataList.add(handleRow(row, after));
159161
} else if (OP_DELETE.equals(op)) {
160162
if (before == null) {
161163
throw new IllegalStateException(
162164
String.format(REPLICA_IDENTITY_EXCEPTION, "DELETE"));
163165
}
164166
before.setRowKind(RowKind.DELETE);
165-
emitRow(row, before, out);
167+
genericRowDataList.add(handleRow(row, before));
166168
} else {
167169
if (!ignoreParseErrors) {
168170
throw new IOException(
@@ -178,14 +180,15 @@ public void deserialize(byte[] message, Collector<RowData> out) throws IOExcepti
178180
format("Corrupt Debezium JSON message '%s'.", new String(message)), t);
179181
}
180182
}
183+
for (GenericRowData genericRowData : genericRowDataList) {
184+
out.collect(genericRowData);
185+
}
181186
}
182187

183-
private void emitRow(
184-
GenericRowData rootRow, GenericRowData physicalRow, Collector<RowData> out) {
188+
private GenericRowData handleRow(GenericRowData rootRow, GenericRowData physicalRow) {
185189
// shortcut in case no output projection is required
186190
if (!hasMetadata) {
187-
out.collect(physicalRow);
188-
return;
191+
return physicalRow;
189192
}
190193

191194
final int physicalArity = physicalRow.getArity();
@@ -202,8 +205,7 @@ private void emitRow(
202205
producedRow.setField(
203206
physicalArity + metadataPos, metadataConverters[metadataPos].convert(rootRow));
204207
}
205-
206-
out.collect(producedRow);
208+
return producedRow;
207209
}
208210

209211
@Override

flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636

3737
import java.io.IOException;
3838
import java.io.Serializable;
39+
import java.util.ArrayList;
3940
import java.util.List;
4041
import java.util.Objects;
4142
import java.util.stream.Collectors;
@@ -125,6 +126,7 @@ public void deserialize(byte[] message, Collector<RowData> out) throws IOExcepti
125126
if (message == null || message.length == 0) {
126127
return;
127128
}
129+
List<GenericRowData> genericRowDataList = new ArrayList<>();
128130
try {
129131
final JsonNode root = jsonDeserializer.deserializeToJsonNode(message);
130132
final GenericRowData row = (GenericRowData) jsonDeserializer.convertToRowData(root);
@@ -133,7 +135,7 @@ public void deserialize(byte[] message, Collector<RowData> out) throws IOExcepti
133135
// "data" field is a row, contains inserted rows
134136
GenericRowData insert = (GenericRowData) row.getRow(0, fieldCount);
135137
insert.setRowKind(RowKind.INSERT);
136-
emitRow(row, insert, out);
138+
genericRowDataList.add(handleRow(row, insert));
137139
} else if (OP_UPDATE.equals(type)) {
138140
// "data" field is a row, contains new rows
139141
// "old" field is a row, contains old values
@@ -151,13 +153,13 @@ public void deserialize(byte[] message, Collector<RowData> out) throws IOExcepti
151153
}
152154
before.setRowKind(RowKind.UPDATE_BEFORE);
153155
after.setRowKind(RowKind.UPDATE_AFTER);
154-
emitRow(row, before, out);
155-
emitRow(row, after, out);
156+
genericRowDataList.add(handleRow(row, before));
157+
genericRowDataList.add(handleRow(row, after));
156158
} else if (OP_DELETE.equals(type)) {
157159
// "data" field is a row, contains deleted rows
158160
GenericRowData delete = (GenericRowData) row.getRow(0, fieldCount);
159161
delete.setRowKind(RowKind.DELETE);
160-
emitRow(row, delete, out);
162+
genericRowDataList.add(handleRow(row, delete));
161163
} else {
162164
if (!ignoreParseErrors) {
163165
throw new IOException(
@@ -173,14 +175,15 @@ public void deserialize(byte[] message, Collector<RowData> out) throws IOExcepti
173175
format("Corrupt Maxwell JSON message '%s'.", new String(message)), t);
174176
}
175177
}
178+
for (GenericRowData genericRowData : genericRowDataList) {
179+
out.collect(genericRowData);
180+
}
176181
}
177182

178-
private void emitRow(
179-
GenericRowData rootRow, GenericRowData physicalRow, Collector<RowData> out) {
183+
private GenericRowData handleRow(GenericRowData rootRow, GenericRowData physicalRow) {
180184
// shortcut in case no output projection is required
181185
if (!hasMetadata) {
182-
out.collect(physicalRow);
183-
return;
186+
return physicalRow;
184187
}
185188
final int metadataArity = metadataConverters.length;
186189
final GenericRowData producedRow =
@@ -192,7 +195,7 @@ private void emitRow(
192195
producedRow.setField(
193196
fieldCount + metadataPos, metadataConverters[metadataPos].convert(rootRow));
194197
}
195-
out.collect(producedRow);
198+
return producedRow;
196199
}
197200

198201
@Override

flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDeserializationSchema.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
import java.io.IOException;
3737
import java.io.Serializable;
38+
import java.util.ArrayList;
3839
import java.util.List;
3940
import java.util.Objects;
4041
import java.util.stream.Collectors;
@@ -160,6 +161,7 @@ public void deserialize(byte[] message, Collector<RowData> out) throws IOExcepti
160161
// skip tombstone messages
161162
return;
162163
}
164+
List<GenericRowData> genericRowDataList = new ArrayList<>();
163165
try {
164166
GenericRowData row = (GenericRowData) jsonDeserializer.deserialize(message);
165167

@@ -168,23 +170,23 @@ public void deserialize(byte[] message, Collector<RowData> out) throws IOExcepti
168170
String op = row.getField(2).toString();
169171
if (OP_CREATE.equals(op)) {
170172
after.setRowKind(RowKind.INSERT);
171-
emitRow(row, after, out);
173+
genericRowDataList.add(emitRow(row, after));
172174
} else if (OP_UPDATE.equals(op)) {
173175
if (before == null) {
174176
throw new IllegalStateException(
175177
String.format(REPLICA_IDENTITY_EXCEPTION, "UPDATE"));
176178
}
177179
before.setRowKind(RowKind.UPDATE_BEFORE);
178180
after.setRowKind(RowKind.UPDATE_AFTER);
179-
emitRow(row, before, out);
180-
emitRow(row, after, out);
181+
genericRowDataList.add(emitRow(row, before));
182+
genericRowDataList.add(emitRow(row, after));
181183
} else if (OP_DELETE.equals(op)) {
182184
if (before == null) {
183185
throw new IllegalStateException(
184186
String.format(REPLICA_IDENTITY_EXCEPTION, "DELETE"));
185187
}
186188
before.setRowKind(RowKind.DELETE);
187-
emitRow(row, before, out);
189+
genericRowDataList.add(emitRow(row, before));
188190
} else {
189191
if (!ignoreParseErrors) {
190192
throw new IOException(
@@ -200,16 +202,17 @@ public void deserialize(byte[] message, Collector<RowData> out) throws IOExcepti
200202
format("Corrupt Ogg JSON message '%s'.", new String(message)), t);
201203
}
202204
}
205+
for (GenericRowData genericRowData : genericRowDataList) {
206+
out.collect(genericRowData);
207+
}
203208
}
204209

205210
// --------------------------------------------------------------------------------------------
206211

207-
private void emitRow(
208-
GenericRowData rootRow, GenericRowData physicalRow, Collector<RowData> out) {
212+
private GenericRowData emitRow(GenericRowData rootRow, GenericRowData physicalRow) {
209213
// shortcut in case no output projection is required
210214
if (!hasMetadata) {
211-
out.collect(physicalRow);
212-
return;
215+
return physicalRow;
213216
}
214217

215218
final int physicalArity = physicalRow.getArity();
@@ -226,8 +229,7 @@ private void emitRow(
226229
producedRow.setField(
227230
physicalArity + metadataPos, metadataConverters[metadataPos].convert(rootRow));
228231
}
229-
230-
out.collect(producedRow);
232+
return producedRow;
231233
}
232234

233235
@Override

0 commit comments

Comments
 (0)