Skip to content

Commit 7c6b310

Browse files
authored
Support nested arrays (#120)
* Support nested arrays * Rename placeholder types to avoid collision * Remove print lines
1 parent 3fa0e59 commit 7c6b310

File tree

6 files changed

+62
-32
lines changed

6 files changed

+62
-32
lines changed

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ undeploy-samples: undeploy
4747
deploy-flink: deploy
4848
kubectl create namespace flink || echo "skipping"
4949
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"
50-
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.9.0/
50+
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.11.0/
5151
helm upgrade --install --atomic --set webhook.create=false,image.pullPolicy=Never,image.repository=docker.io/library/hoptimator-flink-operator,image.tag=latest --set-json='watchNamespaces=["default","flink"]' flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
5252
kubectl apply -f deploy/dev/flink-session-cluster.yaml
5353
kubectl apply -f deploy/dev/flink-sql-gateway.yaml
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
FROM apache/flink-kubernetes-operator:1.9.0
1+
FROM apache/flink-kubernetes-operator:1.11.0
22
COPY ./build/libs/hoptimator-flink-runner-all.jar /opt/hoptimator-flink-runner.jar

hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DataTypeUtils.java

+32-8
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,15 @@
1010
import org.apache.calcite.rel.type.RelDataType;
1111
import org.apache.calcite.rel.type.RelDataTypeFactory;
1212
import org.apache.calcite.rel.type.RelDataTypeField;
13+
import org.apache.calcite.sql.type.BasicSqlType;
1314
import org.apache.calcite.sql.type.SqlTypeName;
1415

1516

1617
public final class DataTypeUtils {
1718

18-
private static final String MAP_KEY_TYPE = "keyType";
19-
private static final String MAP_VALUE_TYPE = "valueType";
19+
private static final String ARRAY_TYPE = "__ARRTYPE__";
20+
private static final String MAP_KEY_TYPE = "__MAPKEYTYPE__";
21+
private static final String MAP_VALUE_TYPE = "__MAPVALUETYPE__";
2022

2123
private DataTypeUtils() {
2224
}
@@ -42,23 +44,39 @@ public static RelDataType flatten(RelDataType dataType, RelDataTypeFactory typeF
4244

4345
private static void flattenInto(RelDataTypeFactory typeFactory, RelDataType dataType,
4446
RelDataTypeFactory.Builder builder, List<String> path) {
45-
if (dataType.getComponentType() != null && dataType.getComponentType().isStruct()) {
46-
builder.add(String.join("$", path), typeFactory.createArrayType(
47-
typeFactory.createSqlType(SqlTypeName.ANY), -1));
48-
for (RelDataTypeField field : dataType.getComponentType().getFieldList()) {
49-
flattenInto(typeFactory, field.getType(), builder, Stream.concat(path.stream(),
50-
Stream.of(field.getName())).collect(Collectors.toList()));
47+
if (dataType.getComponentType() != null) {
48+
// Handles ARRAY types
49+
if (dataType.getComponentType().isStruct()) {
50+
// Handles arrays of record types
51+
builder.add(String.join("$", path), typeFactory.createArrayType(
52+
typeFactory.createSqlType(SqlTypeName.ANY), -1));
53+
for (RelDataTypeField field : dataType.getComponentType().getFieldList()) {
54+
flattenInto(typeFactory, field.getType(), builder, Stream.concat(path.stream(),
55+
Stream.of(field.getName())).collect(Collectors.toList()));
56+
}
57+
} else if (dataType.getComponentType() instanceof BasicSqlType) {
58+
// Handles primitive arrays
59+
builder.add(String.join("$", path), dataType);
60+
} else {
61+
// Handles nested arrays
62+
builder.add(String.join("$", path), typeFactory.createArrayType(
63+
typeFactory.createSqlType(SqlTypeName.ANY), -1));
64+
flattenInto(typeFactory, dataType.getComponentType(), builder, Stream.concat(path.stream(),
65+
Stream.of(ARRAY_TYPE)).collect(Collectors.toList()));
5166
}
5267
} else if (dataType.isStruct()) {
68+
// Handles Record types
5369
for (RelDataTypeField field : dataType.getFieldList()) {
5470
flattenInto(typeFactory, field.getType(), builder, Stream.concat(path.stream(),
5571
Stream.of(field.getName())).collect(Collectors.toList()));
5672
}
5773
} else if (dataType.getKeyType() != null && dataType.getValueType() != null) {
74+
// Handles map types
5875
builder.add(String.join("$", path) + "$" + MAP_KEY_TYPE, dataType.getKeyType());
5976
flattenInto(typeFactory, dataType.getValueType(), builder, Stream.concat(path.stream(),
6077
Stream.of(MAP_VALUE_TYPE)).collect(Collectors.toList()));
6178
} else {
79+
// Handles primitive types
6280
builder.add(String.join("$", path), dataType);
6381
}
6482
}
@@ -93,6 +111,12 @@ private static RelDataType buildRecord(Node node, RelDataTypeFactory typeFactory
93111
return node.dataType;
94112
}
95113
RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory);
114+
115+
// Placeholder to handle nested arrays
116+
if (node.children.size() == 1 && node.children.containsKey(ARRAY_TYPE)) {
117+
RelDataType nestedArrayType = buildRecord(node.children.get(ARRAY_TYPE), typeFactory);
118+
return typeFactory.createArrayType(nestedArrayType, -1);
119+
}
96120
// Placeholders to handle MAP type
97121
if (node.children.size() == 2
98122
&& node.children.containsKey(MAP_KEY_TYPE) && node.children.containsKey(MAP_VALUE_TYPE)) {

hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java

+13-7
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.calcite.sql.fun.SqlRowOperator;
4040
import org.apache.calcite.sql.parser.SqlParserPos;
4141
import org.apache.calcite.sql.pretty.SqlPrettyWriter;
42+
import org.apache.calcite.sql.type.BasicSqlType;
4243
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
4344
import org.apache.calcite.sql.type.SqlTypeName;
4445
import org.apache.calcite.sql.util.SqlShuttle;
@@ -453,14 +454,19 @@ private static SqlDataTypeSpec toSpec(RelDataType dataType) {
453454
return maybeNullable(dataType, new SqlDataTypeSpec(new SqlCollectionTypeNameSpec(
454455
new SqlRowTypeNameSpec(SqlParserPos.ZERO, fieldNames, fieldTypes),
455456
dataType.getSqlTypeName(), SqlParserPos.ZERO), SqlParserPos.ZERO));
457+
} else if (dataType.getComponentType() instanceof BasicSqlType) {
458+
// To handle primitive ARRAY types, e.g. `FLOAT ARRAY`.
459+
return maybeNullable(dataType, new SqlDataTypeSpec(new SqlCollectionTypeNameSpec(new SqlBasicTypeNameSpec(
460+
Optional.ofNullable(dataType.getComponentType())
461+
.map(RelDataType::getSqlTypeName)
462+
.orElseThrow(() -> new IllegalArgumentException("not a collection?")), SqlParserPos.ZERO),
463+
dataType.getSqlTypeName(), SqlParserPos.ZERO), SqlParserPos.ZERO));
464+
} else {
465+
// To handle nested arrays
466+
return maybeNullable(dataType, new SqlDataTypeSpec(new SqlCollectionTypeNameSpec(
467+
toSpec(dataType.getComponentType()).getTypeNameSpec(),
468+
dataType.getComponentType().getSqlTypeName(), SqlParserPos.ZERO), SqlParserPos.ZERO));
456469
}
457-
458-
// To handle primitive ARRAY types, e.g. `FLOAT ARRAY`.
459-
return maybeNullable(dataType, new SqlDataTypeSpec(new SqlCollectionTypeNameSpec(new SqlBasicTypeNameSpec(
460-
Optional.ofNullable(dataType.getComponentType())
461-
.map(RelDataType::getSqlTypeName)
462-
.orElseThrow(() -> new IllegalArgumentException("not a collection?")), SqlParserPos.ZERO),
463-
dataType.getSqlTypeName(), SqlParserPos.ZERO), SqlParserPos.ZERO));
464470
} else if (dataType.getKeyType() != null && dataType.getValueType() != null) {
465471
return maybeNullable(dataType, new SqlDataTypeSpec(new SqlMapTypeNameSpec(
466472
toSpec(dataType.getKeyType()), toSpec(dataType.getValueType()), SqlParserPos.ZERO), SqlParserPos.ZERO));

hoptimator-util/src/test/java/com/linkedin/hoptimator/util/DeploymentServiceTest.java

-4
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
package com.linkedin.hoptimator.util;
22

3-
import java.util.ArrayList;
43
import java.util.Arrays;
5-
import java.util.Collections;
6-
import java.util.List;
74
import java.util.Map;
85
import java.util.Properties;
96

@@ -13,7 +10,6 @@
1310
import static com.linkedin.hoptimator.util.DeploymentService.PIPELINE_OPTION;
1411
import static org.junit.jupiter.api.Assertions.assertEquals;
1512
import static org.junit.jupiter.api.Assertions.assertTrue;
16-
import static org.junit.jupiter.api.Assertions.assertFalse;
1713

1814

1915
class DeploymentServiceTest {

hoptimator-util/src/test/java/com/linkedin/hoptimator/util/TestDataTypeUtils.java

+15-11
Original file line numberDiff line numberDiff line change
@@ -67,21 +67,24 @@ public void flattenUnflattenNestedArrays() {
6767
builder3.add("BAR", typeFactory.createArrayType(builder2.build(), -1));
6868
builder3.add("CAR", typeFactory.createArrayType(
6969
typeFactory.createSqlType(SqlTypeName.FLOAT), -1));
70+
builder3.add("DAY", typeFactory.createArrayType(typeFactory.createArrayType(typeFactory.createArrayType(
71+
typeFactory.createSqlType(SqlTypeName.VARCHAR), -1), -1), -1));
7072
RelDataType rowType = builder3.build();
71-
Assertions.assertEquals(3, rowType.getFieldList().size());
73+
Assertions.assertEquals(4, rowType.getFieldList().size());
7274
RelDataType flattenedType = DataTypeUtils.flatten(rowType, typeFactory);
73-
Assertions.assertEquals(6, flattenedType.getFieldList().size());
75+
Assertions.assertEquals(9, flattenedType.getFieldList().size());
7476
List<String> flattenedNames = flattenedType.getFieldList().stream().map(RelDataTypeField::getName)
7577
.collect(Collectors.toList());
76-
Assertions.assertIterableEquals(Arrays.asList("FOO", "FOO$QUX", "FOO$QIZ", "BAR", "BAR$BAZ", "CAR"),
77-
flattenedNames);
78+
Assertions.assertIterableEquals(Arrays.asList("FOO", "FOO$QUX", "FOO$QIZ", "BAR", "BAR$BAZ", "CAR", "DAY",
79+
"DAY$__ARRTYPE__", "DAY$__ARRTYPE__$__ARRTYPE__"), flattenedNames);
7880
String flattenedConnector = new ScriptImplementor.ConnectorImplementor("S", "T1",
7981
flattenedType, Collections.emptyMap()).sql();
8082
Assertions.assertEquals("CREATE TABLE IF NOT EXISTS `S`.`T1` ("
8183
+ "`FOO` ANY ARRAY, `FOO_QUX` VARCHAR, `FOO_QIZ` VARCHAR ARRAY, "
8284
+ "`BAR` ANY ARRAY, `BAR_BAZ` VARCHAR, "
83-
+ "`CAR` FLOAT ARRAY) WITH ();", flattenedConnector,
84-
"Flattened connector should have simplified arrays");
85+
+ "`CAR` FLOAT ARRAY, "
86+
+ "`DAY` ANY ARRAY, `DAY___ARRTYPE__` ANY ARRAY, `DAY___ARRTYPE_____ARRTYPE__` VARCHAR ARRAY) WITH ();",
87+
flattenedConnector, "Flattened connector should have simplified arrays");
8588

8689
RelDataType unflattenedType = DataTypeUtils.unflatten(flattenedType, typeFactory);
8790
RelOptUtil.eq("original", rowType, "flattened-unflattened", unflattenedType, Litmus.THROW);
@@ -94,7 +97,8 @@ public void flattenUnflattenNestedArrays() {
9497
Assertions.assertEquals("CREATE TABLE IF NOT EXISTS `S`.`T1` ("
9598
+ "`FOO` ROW(`QUX` VARCHAR, `QIZ` VARCHAR ARRAY) ARRAY, "
9699
+ "`BAR` ROW(`BAZ` VARCHAR) ARRAY, "
97-
+ "`CAR` FLOAT ARRAY) WITH ();", unflattenedConnector,
100+
+ "`CAR` FLOAT ARRAY, "
101+
+ "`DAY` VARCHAR ARRAY ARRAY ARRAY) WITH ();", unflattenedConnector,
98102
"Flattened-unflattened connector should be correct");
99103
}
100104

@@ -118,13 +122,13 @@ public void flattenUnflattenComplexMap() {
118122
Assertions.assertEquals(3, flattenedType.getFieldList().size());
119123
List<String> flattenedNames = flattenedType.getFieldList().stream().map(RelDataTypeField::getName)
120124
.collect(Collectors.toList());
121-
Assertions.assertIterableEquals(Arrays.asList("FOO$keyType", "FOO$valueType$QIZ$BAR", "FOO$valueType$QIZ$CAR"), flattenedNames);
125+
Assertions.assertIterableEquals(Arrays.asList("FOO$__MAPKEYTYPE__", "FOO$__MAPVALUETYPE__$QIZ$BAR", "FOO$__MAPVALUETYPE__$QIZ$CAR"), flattenedNames);
122126
String flattenedConnector = new ScriptImplementor.ConnectorImplementor("S", "T1",
123127
flattenedType, Collections.emptyMap()).sql();
124128
Assertions.assertEquals("CREATE TABLE IF NOT EXISTS `S`.`T1` ("
125-
+ "`FOO_keyType` ROW(`QUX` VARCHAR), "
126-
+ "`FOO_valueType_QIZ_BAR` VARCHAR, "
127-
+ "`FOO_valueType_QIZ_CAR` INTEGER) WITH ();", flattenedConnector,
129+
+ "`FOO___MAPKEYTYPE__` ROW(`QUX` VARCHAR), "
130+
+ "`FOO___MAPVALUETYPE___QIZ_BAR` VARCHAR, "
131+
+ "`FOO___MAPVALUETYPE___QIZ_CAR` INTEGER) WITH ();", flattenedConnector,
128132
"Flattened connector should have simplified map");
129133

130134
RelDataType unflattenedType = DataTypeUtils.unflatten(flattenedType, typeFactory);

0 commit comments

Comments
 (0)