Skip to content

Commit 39b5fbc

Browse files
authored
[source]Add compatibility for IP data types after Doris 2.1.3 (#576)
1 parent a8decd9 commit 39b5fbc

File tree

2 files changed

+200
-201
lines changed

2 files changed

+200
-201
lines changed

Diff for: flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java

+62-47
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
import org.apache.arrow.vector.complex.reader.FieldReader;
4747
import org.apache.arrow.vector.ipc.ArrowReader;
4848
import org.apache.arrow.vector.ipc.ArrowStreamReader;
49-
import org.apache.arrow.vector.types.Types;
49+
import org.apache.arrow.vector.types.Types.MinorType;
5050
import org.apache.doris.flink.exception.DorisException;
5151
import org.apache.doris.flink.exception.DorisRuntimeException;
5252
import org.apache.doris.flink.rest.models.Schema;
@@ -177,7 +177,7 @@ public RowBatch readArrow() {
177177
throw new DorisException(
178178
"Load Doris data failed, schema size of fetch data is wrong.");
179179
}
180-
if (fieldVectors.size() == 0 || root.getRowCount() == 0) {
180+
if (fieldVectors.isEmpty() || root.getRowCount() == 0) {
181181
logger.debug("One batch in arrow has no data.");
182182
continue;
183183
}
@@ -217,12 +217,12 @@ public void convertArrowToRowBatch() throws DorisException {
217217
try {
218218
for (int col = 0; col < fieldVectors.size(); col++) {
219219
FieldVector fieldVector = fieldVectors.get(col);
220-
Types.MinorType minorType = fieldVector.getMinorType();
220+
MinorType minorType = fieldVector.getMinorType();
221221
final String currentType = schema.get(col).getType();
222222
for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
223223
boolean passed = doConvert(col, rowIndex, minorType, currentType, fieldVector);
224224
if (!passed) {
225-
throw new java.lang.IllegalArgumentException(
225+
throw new IllegalArgumentException(
226226
"FLINK type is "
227227
+ currentType
228228
+ ", but arrow type is "
@@ -239,17 +239,13 @@ public void convertArrowToRowBatch() throws DorisException {
239239

240240
@VisibleForTesting
241241
public boolean doConvert(
242-
int col,
243-
int rowIndex,
244-
Types.MinorType minorType,
245-
String currentType,
246-
FieldVector fieldVector)
242+
int col, int rowIndex, MinorType minorType, String currentType, FieldVector fieldVector)
247243
throws DorisException {
248244
switch (currentType) {
249245
case "NULL_TYPE":
250246
break;
251247
case "BOOLEAN":
252-
if (!minorType.equals(Types.MinorType.BIT)) {
248+
if (!minorType.equals(MinorType.BIT)) {
253249
return false;
254250
}
255251
BitVector bitVector = (BitVector) fieldVector;
@@ -258,57 +254,71 @@ public boolean doConvert(
258254
addValueToRow(rowIndex, fieldValue);
259255
break;
260256
case "TINYINT":
261-
if (!minorType.equals(Types.MinorType.TINYINT)) {
257+
if (!minorType.equals(MinorType.TINYINT)) {
262258
return false;
263259
}
264260
TinyIntVector tinyIntVector = (TinyIntVector) fieldVector;
265261
fieldValue = tinyIntVector.isNull(rowIndex) ? null : tinyIntVector.get(rowIndex);
266262
addValueToRow(rowIndex, fieldValue);
267263
break;
268264
case "SMALLINT":
269-
if (!minorType.equals(Types.MinorType.SMALLINT)) {
265+
if (!minorType.equals(MinorType.SMALLINT)) {
270266
return false;
271267
}
272268
SmallIntVector smallIntVector = (SmallIntVector) fieldVector;
273269
fieldValue = smallIntVector.isNull(rowIndex) ? null : smallIntVector.get(rowIndex);
274270
addValueToRow(rowIndex, fieldValue);
275271
break;
276272
case "INT":
277-
if (!minorType.equals(Types.MinorType.INT)) {
273+
if (!minorType.equals(MinorType.INT)) {
278274
return false;
279275
}
280276
IntVector intVector = (IntVector) fieldVector;
281277
fieldValue = intVector.isNull(rowIndex) ? null : intVector.get(rowIndex);
282278
addValueToRow(rowIndex, fieldValue);
283279
break;
284280
case "IPV4":
285-
if (!minorType.equals(Types.MinorType.UINT4)
286-
&& !minorType.equals(Types.MinorType.INT)) {
281+
if (!minorType.equals(MinorType.UINT4)
282+
&& !minorType.equals(MinorType.INT)
283+
&& !minorType.equals(MinorType.VARCHAR)) {
287284
return false;
288285
}
289-
BaseIntVector ipv4Vector;
290-
if (minorType.equals(Types.MinorType.INT)) {
291-
ipv4Vector = (IntVector) fieldVector;
292286

287+
if (fieldVector.isNull(rowIndex)) {
288+
addValueToRow(rowIndex, null);
289+
break;
290+
}
291+
292+
if (minorType.equals(MinorType.VARCHAR)) {
293+
VarCharVector ipv4VarcharVector = (VarCharVector) fieldVector;
294+
String ipv4Str =
295+
new String(ipv4VarcharVector.get(rowIndex), StandardCharsets.UTF_8);
296+
addValueToRow(rowIndex, ipv4Str);
293297
} else {
294-
ipv4Vector = (UInt4Vector) fieldVector;
298+
BaseIntVector ipv4Vector;
299+
if (minorType.equals(MinorType.INT)) {
300+
ipv4Vector = (IntVector) fieldVector;
301+
302+
} else {
303+
ipv4Vector = (UInt4Vector) fieldVector;
304+
}
305+
fieldValue =
306+
ipv4Vector.isNull(rowIndex)
307+
? null
308+
: convertLongToIPv4Address(ipv4Vector.getValueAsLong(rowIndex));
309+
addValueToRow(rowIndex, fieldValue);
295310
}
296-
fieldValue =
297-
ipv4Vector.isNull(rowIndex)
298-
? null
299-
: convertLongToIPv4Address(ipv4Vector.getValueAsLong(rowIndex));
300-
addValueToRow(rowIndex, fieldValue);
301311
break;
302312
case "BIGINT":
303-
if (!minorType.equals(Types.MinorType.BIGINT)) {
313+
if (!minorType.equals(MinorType.BIGINT)) {
304314
return false;
305315
}
306316
BigIntVector bigIntVector = (BigIntVector) fieldVector;
307317
fieldValue = bigIntVector.isNull(rowIndex) ? null : bigIntVector.get(rowIndex);
308318
addValueToRow(rowIndex, fieldValue);
309319
break;
310320
case "FLOAT":
311-
if (!minorType.equals(Types.MinorType.FLOAT4)) {
321+
if (!minorType.equals(MinorType.FLOAT4)) {
312322
return false;
313323
}
314324
Float4Vector float4Vector = (Float4Vector) fieldVector;
@@ -317,15 +327,15 @@ public boolean doConvert(
317327
break;
318328
case "TIME":
319329
case "DOUBLE":
320-
if (!minorType.equals(Types.MinorType.FLOAT8)) {
330+
if (!minorType.equals(MinorType.FLOAT8)) {
321331
return false;
322332
}
323333
Float8Vector float8Vector = (Float8Vector) fieldVector;
324334
fieldValue = float8Vector.isNull(rowIndex) ? null : float8Vector.get(rowIndex);
325335
addValueToRow(rowIndex, fieldValue);
326336
break;
327337
case "BINARY":
328-
if (!minorType.equals(Types.MinorType.VARBINARY)) {
338+
if (!minorType.equals(MinorType.VARBINARY)) {
329339
return false;
330340
}
331341
VarBinaryVector varBinaryVector = (VarBinaryVector) fieldVector;
@@ -339,7 +349,7 @@ public boolean doConvert(
339349
case "DECIMAL64":
340350
case "DECIMAL128I":
341351
case "DECIMAL128":
342-
if (!minorType.equals(Types.MinorType.DECIMAL)) {
352+
if (!minorType.equals(MinorType.DECIMAL)) {
343353
return false;
344354
}
345355
DecimalVector decimalVector = (DecimalVector) fieldVector;
@@ -352,11 +362,10 @@ public boolean doConvert(
352362
break;
353363
case "DATE":
354364
case "DATEV2":
355-
if (!minorType.equals(Types.MinorType.DATEDAY)
356-
&& !minorType.equals(Types.MinorType.VARCHAR)) {
365+
if (!minorType.equals(MinorType.DATEDAY) && !minorType.equals(MinorType.VARCHAR)) {
357366
return false;
358367
}
359-
if (minorType.equals(Types.MinorType.VARCHAR)) {
368+
if (minorType.equals(MinorType.VARCHAR)) {
360369
VarCharVector date = (VarCharVector) fieldVector;
361370
if (date.isNull(rowIndex)) {
362371
addValueToRow(rowIndex, null);
@@ -376,7 +385,7 @@ public boolean doConvert(
376385
}
377386
break;
378387
case "DATETIME":
379-
if (minorType.equals(Types.MinorType.VARCHAR)) {
388+
if (minorType.equals(MinorType.VARCHAR)) {
380389
VarCharVector varCharVector = (VarCharVector) fieldVector;
381390
if (varCharVector.isNull(rowIndex)) {
382391
addValueToRow(rowIndex, null);
@@ -400,7 +409,7 @@ public boolean doConvert(
400409
}
401410
break;
402411
case "DATETIMEV2":
403-
if (minorType.equals(Types.MinorType.VARCHAR)) {
412+
if (minorType.equals(MinorType.VARCHAR)) {
404413
VarCharVector varCharVector = (VarCharVector) fieldVector;
405414
if (varCharVector.isNull(rowIndex)) {
406415
addValueToRow(rowIndex, null);
@@ -424,11 +433,11 @@ public boolean doConvert(
424433
}
425434
break;
426435
case "LARGEINT":
427-
if (!minorType.equals(Types.MinorType.FIXEDSIZEBINARY)
428-
&& !minorType.equals(Types.MinorType.VARCHAR)) {
436+
if (!minorType.equals(MinorType.FIXEDSIZEBINARY)
437+
&& !minorType.equals(MinorType.VARCHAR)) {
429438
return false;
430439
}
431-
if (minorType.equals(Types.MinorType.FIXEDSIZEBINARY)) {
440+
if (minorType.equals(MinorType.FIXEDSIZEBINARY)) {
432441
FixedSizeBinaryVector largeIntVector = (FixedSizeBinaryVector) fieldVector;
433442
if (largeIntVector.isNull(rowIndex)) {
434443
addValueToRow(rowIndex, null);
@@ -464,7 +473,7 @@ public boolean doConvert(
464473
case "JSONB":
465474
case "JSON":
466475
case "VARIANT":
467-
if (!minorType.equals(Types.MinorType.VARCHAR)) {
476+
if (!minorType.equals(MinorType.VARCHAR)) {
468477
return false;
469478
}
470479
VarCharVector varCharVector = (VarCharVector) fieldVector;
@@ -477,21 +486,27 @@ public boolean doConvert(
477486
addValueToRow(rowIndex, stringValue);
478487
break;
479488
case "IPV6":
480-
if (!minorType.equals(Types.MinorType.VARCHAR)) {
489+
if (!minorType.equals(MinorType.VARCHAR)) {
481490
return false;
482491
}
483-
VarCharVector ipv6VarcharVector = (VarCharVector) fieldVector;
484-
if (ipv6VarcharVector.isNull(rowIndex)) {
492+
493+
if (fieldVector.isNull(rowIndex)) {
485494
addValueToRow(rowIndex, null);
486495
break;
487496
}
497+
498+
VarCharVector ipv6VarcharVector = (VarCharVector) fieldVector;
488499
String ipv6Str =
489500
new String(ipv6VarcharVector.get(rowIndex), StandardCharsets.UTF_8);
490-
String ipv6Address = IPUtils.fromBigInteger(new BigInteger(ipv6Str));
491-
addValueToRow(rowIndex, ipv6Address);
501+
if (ipv6Str.contains(":")) {
502+
addValueToRow(rowIndex, ipv6Str);
503+
} else {
504+
String ipv6Address = IPUtils.fromBigInteger(new BigInteger(ipv6Str));
505+
addValueToRow(rowIndex, ipv6Address);
506+
}
492507
break;
493508
case "ARRAY":
494-
if (!minorType.equals(Types.MinorType.LIST)) {
509+
if (!minorType.equals(MinorType.LIST)) {
495510
return false;
496511
}
497512
ListVector listVector = (ListVector) fieldVector;
@@ -501,7 +516,7 @@ public boolean doConvert(
501516
addValueToRow(rowIndex, listValue);
502517
break;
503518
case "MAP":
504-
if (!minorType.equals(Types.MinorType.MAP)) {
519+
if (!minorType.equals(MinorType.MAP)) {
505520
return false;
506521
}
507522
MapVector mapVector = (MapVector) fieldVector;
@@ -522,7 +537,7 @@ public boolean doConvert(
522537
addValueToRow(rowIndex, mapValue);
523538
break;
524539
case "STRUCT":
525-
if (!minorType.equals(Types.MinorType.STRUCT)) {
540+
if (!minorType.equals(MinorType.STRUCT)) {
526541
return false;
527542
}
528543
StructVector structVector = (StructVector) fieldVector;
@@ -627,7 +642,7 @@ public List<Object> next() {
627642
return rowBatch.get(offsetInRowBatch++).getCols();
628643
}
629644

630-
private String typeMismatchMessage(final String flinkType, final Types.MinorType arrowType) {
645+
private String typeMismatchMessage(final String flinkType, final MinorType arrowType) {
631646
final String messageTemplate = "FLINK type is %1$s, but arrow type is %2$s.";
632647
return String.format(messageTemplate, flinkType, arrowType.name());
633648
}

0 commit comments

Comments
 (0)