Skip to content

Commit f389ba1

Browse files
author
xuchao
committed
Merge branch '1.8_release_3.9.x' into 1.8_release_3.10.x
# Conflicts: # core/src/main/java/com/dtstack/flink/sql/Main.java # core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java
2 parents 738a565 + 911637a commit f389ba1

File tree

17 files changed

+1215
-740
lines changed

17 files changed

+1215
-740
lines changed

core/src/main/java/com/dtstack/flink/sql/config/CalciteConfig.java

Lines changed: 0 additions & 35 deletions
This file was deleted.

core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,10 @@
2121
package com.dtstack.flink.sql.parser;
2222

2323
import com.dtstack.flink.sql.util.DtStringUtil;
24-
import org.apache.calcite.config.Lex;
2524
import org.apache.calcite.sql.*;
26-
import org.apache.calcite.sql.parser.SqlParseException;
27-
import org.apache.calcite.sql.parser.SqlParser;
2825
import com.google.common.collect.Lists;
26+
import org.apache.flink.table.calcite.FlinkPlannerImpl;
27+
2928
import java.util.List;
3029
import java.util.regex.Matcher;
3130
import java.util.regex.Pattern;
@@ -71,17 +70,12 @@ public void parseSql(String sql, SqlTree sqlTree) {
7170
tableName = matcher.group(1);
7271
selectSql = "select " + matcher.group(2);
7372
}
74-
75-
SqlParser.Config config = SqlParser
76-
.configBuilder()
77-
.setLex(Lex.MYSQL)
78-
.build();
79-
SqlParser sqlParser = SqlParser.create(selectSql,config);
73+
FlinkPlannerImpl flinkPlanner = FlinkPlanner.getFlinkPlanner();
8074

8175
SqlNode sqlNode = null;
8276
try {
83-
sqlNode = sqlParser.parseStmt();
84-
} catch (SqlParseException e) {
77+
sqlNode = flinkPlanner.parse(selectSql);
78+
} catch (Exception e) {
8579
throw new RuntimeException("", e);
8680
}
8781

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.parser;
20+
21+
import org.apache.calcite.plan.RelOptPlanner;
22+
import org.apache.calcite.tools.FrameworkConfig;
23+
import org.apache.flink.table.calcite.FlinkPlannerImpl;
24+
import org.apache.flink.table.calcite.FlinkTypeFactory;
25+
26+
/**
27+
* Date: 2020/3/31
28+
* Company: www.dtstack.com
29+
* @author maqi
30+
*/
31+
public class FlinkPlanner {
32+
33+
public static volatile FlinkPlannerImpl flinkPlanner;
34+
35+
private FlinkPlanner() {
36+
}
37+
38+
public static FlinkPlannerImpl createFlinkPlanner(FrameworkConfig frameworkConfig, RelOptPlanner relOptPlanner, FlinkTypeFactory typeFactory) {
39+
if (flinkPlanner == null) {
40+
synchronized (FlinkPlanner.class) {
41+
if (flinkPlanner == null) {
42+
flinkPlanner = new FlinkPlannerImpl(frameworkConfig, relOptPlanner, typeFactory);
43+
}
44+
}
45+
}
46+
return flinkPlanner;
47+
}
48+
49+
public static FlinkPlannerImpl getFlinkPlanner() {
50+
return flinkPlanner;
51+
}
52+
}

core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public void parseSql(String sql, SqlTree sqlTree) {
5757
.configBuilder()
5858
.setLex(Lex.MYSQL)
5959
.build();
60+
6061
SqlParser sqlParser = SqlParser.create(sql,config);
6162
SqlNode sqlNode = null;
6263
try {

core/src/main/java/com/dtstack/flink/sql/side/FieldInfo.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,13 @@ public boolean equals(Object o) {
8585
public int hashCode() {
8686
return Objects.hash(table, fieldName);
8787
}
88+
89+
@Override
90+
public String toString() {
91+
return "FieldInfo{" +
92+
"table='" + table + '\'' +
93+
", fieldName='" + fieldName + '\'' +
94+
", typeInformation=" + typeInformation +
95+
'}';
96+
}
8897
}

core/src/main/java/com/dtstack/flink/sql/side/FieldReplaceInfo.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.common.collect.HashBasedTable;
2424
import org.apache.commons.lang3.StringUtils;
2525

26+
2627
/**
2728
* 用于记录转换之后的表和原来表直接字段的关联关系
2829
* Date: 2018/8/30
@@ -78,7 +79,7 @@ public void setTargetTableAlias(String targetTableAlias) {
7879
* @param fieldName
7980
* @return
8081
*/
81-
public String getTargetFieldName(String tableName, String fieldName){
82+
public String getTargetFieldName(String tableName, String fieldName) {
8283
String targetFieldName = mappingTable.get(tableName, fieldName);
8384
if(StringUtils.isNotBlank(targetFieldName)){
8485
return targetFieldName;

core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
package com.dtstack.flink.sql.side;
2222

23+
import com.google.common.collect.HashBasedTable;
24+
import com.google.common.collect.Maps;
2325
import org.apache.calcite.sql.JoinType;
2426
import org.apache.calcite.sql.SqlNode;
2527
import com.google.common.base.Strings;
@@ -31,7 +33,6 @@
3133
* Join信息
3234
* Date: 2018/7/24
3335
* Company: www.dtstack.com
34-
*
3536
* @author xuchao
3637
*/
3738

@@ -40,9 +41,7 @@ public class JoinInfo implements Serializable {
4041
private static final long serialVersionUID = -1L;
4142

4243
//左表是否是维表
43-
private boolean leftIsSideTable;
44-
45-
private boolean leftIsTmpTable = false;
44+
private boolean leftIsSideTable = false;
4645

4746
//右表是否是维表
4847
private boolean rightIsSideTable;
@@ -67,6 +66,16 @@ public class JoinInfo implements Serializable {
6766

6867
private JoinType joinType;
6968

69+
/**
70+
* 左表需要查询的字段信息和output的时候对应的列名称
71+
*/
72+
private Map<String, String> leftSelectFieldInfo = Maps.newHashMap();
73+
74+
/**
75+
* 右表需要查询的字段信息和output的时候对应的列名称
76+
*/
77+
private Map<String, String> rightSelectFieldInfo = Maps.newHashMap();
78+
7079
public String getSideTableName(){
7180
if(leftIsSideTable){
7281
return leftTableAlias;
@@ -195,19 +204,39 @@ public void setJoinType(JoinType joinType) {
195204
this.joinType = joinType;
196205
}
197206

198-
public boolean isLeftIsTmpTable() {
199-
return leftIsTmpTable;
207+
public Map<String, String> getLeftSelectFieldInfo() {
208+
return leftSelectFieldInfo;
209+
}
210+
211+
public void setLeftSelectFieldInfo(Map<String, String> leftSelectFieldInfo) {
212+
this.leftSelectFieldInfo = leftSelectFieldInfo;
200213
}
201214

202-
public void setLeftIsTmpTable(boolean leftIsTmpTable) {
203-
this.leftIsTmpTable = leftIsTmpTable;
215+
public Map<String, String> getRightSelectFieldInfo() {
216+
return rightSelectFieldInfo;
217+
}
218+
219+
public void setRightSelectFieldInfo(Map<String, String> rightSelectFieldInfo) {
220+
this.rightSelectFieldInfo = rightSelectFieldInfo;
221+
}
222+
223+
public HashBasedTable<String, String, String> getTableFieldRef(){
224+
HashBasedTable<String, String, String> mappingTable = HashBasedTable.create();
225+
getLeftSelectFieldInfo().forEach((key, value) -> {
226+
mappingTable.put(getLeftTableAlias(), key, value);
227+
});
228+
229+
getRightSelectFieldInfo().forEach((key, value) -> {
230+
mappingTable.put(getRightTableAlias(), key, value);
231+
});
232+
233+
return mappingTable;
204234
}
205235

206236
@Override
207237
public String toString() {
208238
return "JoinInfo{" +
209239
"leftIsSideTable=" + leftIsSideTable +
210-
", leftIsTmpTable=" + leftIsTmpTable +
211240
", rightIsSideTable=" + rightIsSideTable +
212241
", leftTableName='" + leftTableName + '\'' +
213242
", leftTableAlias='" + leftTableAlias + '\'' +

0 commit comments

Comments
 (0)