Skip to content

Commit 165a969

Browse files
qingfei1994jennychen
and
jennychen
authored
[ISSUE-154] Support Array[Tuple] clickhouse type (#155)
* [ISSUE-154] Support Array[Tuple] clickhouse type * [ISSUE-154] Support Array[Tuple] clickhouse type * [ISSUE-154] Support Array[Tuple] clickhouse type for clickhousecatalog --------- Co-authored-by: jennychen <[email protected]>
1 parent 9583f2d commit 165a969

File tree

2 files changed

+29
-1
lines changed

2 files changed

+29
-1
lines changed

flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseConverterUtils.java

+21-1
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,16 @@
2121
import org.apache.flink.table.data.DecimalData;
2222
import org.apache.flink.table.data.GenericArrayData;
2323
import org.apache.flink.table.data.GenericMapData;
24+
import org.apache.flink.table.data.GenericRowData;
2425
import org.apache.flink.table.data.MapData;
26+
import org.apache.flink.table.data.RowData;
2527
import org.apache.flink.table.data.StringData;
2628
import org.apache.flink.table.data.TimestampData;
2729
import org.apache.flink.table.types.logical.ArrayType;
2830
import org.apache.flink.table.types.logical.DecimalType;
2931
import org.apache.flink.table.types.logical.LogicalType;
3032
import org.apache.flink.table.types.logical.MapType;
33+
import org.apache.flink.table.types.logical.RowType;
3134

3235
import com.clickhouse.data.value.UnsignedByte;
3336
import com.clickhouse.data.value.UnsignedInteger;
@@ -46,7 +49,9 @@
4649
import java.time.LocalDateTime;
4750
import java.time.LocalTime;
4851
import java.time.OffsetDateTime;
52+
import java.util.ArrayList;
4953
import java.util.HashMap;
54+
import java.util.List;
5055
import java.util.Map;
5156
import java.util.UUID;
5257

@@ -119,9 +124,18 @@ public static Object toExternal(Object value, LogicalType type) {
119124
toExternal(valueGetter.getElementOrNull(valueArrayData, i), valueType));
120125
}
121126
return objectMap;
122-
case MULTISET:
123127
case ROW:
128+
List<Object> result = new ArrayList<>();
129+
for (int i = 0; i < ((RowData) value).getArity(); i++) {
130+
result.add(
131+
toExternal(
132+
RowData.createFieldGetter(((RowType) type).getTypeAt(i), i)
133+
.getFieldOrNull((RowData) value),
134+
((RowType) type).getTypeAt(i)));
135+
}
136+
return result;
124137
case RAW:
138+
case MULTISET:
125139
default:
126140
throw new UnsupportedOperationException("Unsupported type:" + type);
127141
}
@@ -209,6 +223,12 @@ public static Object toInternal(Object value, LogicalType type) throws SQLExcept
209223
}
210224
return new GenericMapData(internalMap);
211225
case ROW:
226+
List<Object> row = (List<Object>) value;
227+
GenericRowData rowData = new GenericRowData(row.size());
228+
for (int i = 0; i < row.size(); i++) {
229+
rowData.setField(i, toInternal(row.get(i), type.getChildren().get(i)));
230+
}
231+
return rowData;
212232
case MULTISET:
213233
case RAW:
214234
default:

flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/util/DataTypeUtil.java

+8
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.flink.connector.clickhouse.util;
1919

20+
import org.apache.flink.api.java.tuple.Tuple2;
2021
import org.apache.flink.table.api.DataTypes;
2122
import org.apache.flink.table.catalog.exceptions.CatalogException;
2223
import org.apache.flink.table.types.DataType;
@@ -25,6 +26,7 @@
2526

2627
import java.util.regex.Matcher;
2728
import java.util.regex.Pattern;
29+
import java.util.stream.Collectors;
2830

2931
import static org.apache.flink.table.types.logical.DecimalType.MAX_PRECISION;
3032

@@ -110,6 +112,12 @@ public static DataType toFlinkType(ClickHouseColumn clickHouseColumnInfo) {
110112
toFlinkType(clickHouseColumnInfo.getKeyInfo()),
111113
toFlinkType(clickHouseColumnInfo.getValueInfo()));
112114
case Tuple:
115+
return DataTypes.ROW(
116+
clickHouseColumnInfo.getNestedColumns().stream()
117+
.map((col) -> new Tuple2<>(col, toFlinkType(col)))
118+
.map(tuple -> DataTypes.FIELD(tuple.f0.getColumnName(), tuple.f1))
119+
.collect(Collectors.toList()));
120+
113121
case Nested:
114122
case AggregateFunction:
115123
default:

0 commit comments

Comments
 (0)