Skip to content

Commit 1609917

Browse files
committed
Merge branch '1.10_release_4.2.x' into 1.10_release
# Conflicts: # core/src/main/java/com/dtstack/flink/sql/table/AbstractTableInfo.java # core/src/main/java/com/dtstack/flink/sql/util/DtStringUtil.java # elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/Es6Util.java # impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaOutputFormat.java # pom.xml # rdb/rdb-core/src/main/java/com/dtstack/flink/sql/core/rdb/JdbcResourceCheck.java # rdb/rdb-core/src/main/java/com/dtstack/flink/sql/core/rdb/util/JdbcConnectionUtil.java # rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/JDBCUpsertOutputFormat.java # rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/table/RdbSinkParser.java
2 parents 0cda3c4 + cc32339 commit 1609917

File tree

68 files changed

+2534
-261
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+2534
-261
lines changed

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,7 @@ public static ParamsInfo parseParams(String[] args) throws Exception {
138138
}
139139

140140
if (!pluginLoadMode.equalsIgnoreCase(EPluginLoadMode.LOCALTEST.name()) && Objects.isNull(dirtyProperties.get(PLUGIN_PATH_STR))) {
141-
dirtyProperties.put(PLUGIN_PATH_STR,
142-
Objects.isNull(remoteSqlPluginPath) ? localSqlPluginPath : remoteSqlPluginPath);
141+
dirtyProperties.put(PLUGIN_PATH_STR, localSqlPluginPath);
143142
}
144143

145144
List<URL> jarUrlList = getExternalJarUrls(options.getAddjar());
@@ -179,7 +178,7 @@ public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInf
179178
StreamExecutionEnvironment env = ExecuteProcessHelper.getStreamExeEnv(paramsInfo.getConfProp(), paramsInfo.getDeployMode());
180179
StreamTableEnvironment tableEnv = getStreamTableEnv(env, paramsInfo.getConfProp());
181180

182-
ResourceCheck.NEED_CHECK = Boolean.parseBoolean(paramsInfo.getConfProp().getProperty(ResourceCheck.CHECK_STR, "true"));
181+
ResourceCheck.NEED_CHECK = Boolean.parseBoolean(paramsInfo.getConfProp().getProperty(ResourceCheck.CHECK_STR, "false"));
183182

184183
SqlParser.setLocalSqlPluginRoot(paramsInfo.getLocalSqlPluginPath());
185184
SqlTree sqlTree = SqlParser.parseSql(paramsInfo.getSql(), paramsInfo.getPluginLoadMode());

core/src/main/java/com/dtstack/flink/sql/resource/ResourceCheck.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
* @description:资源检测
3030
**/
3131
public abstract class ResourceCheck {
32-
public static Boolean NEED_CHECK = true;
32+
public static Boolean NEED_CHECK = false;
3333
public static String CHECK_STR = "checkResource";
3434
protected static Logger LOG = LoggerFactory.getLogger(ResourceCheck.class);
3535
public String TABLE_TYPE_KEY = "tableType";

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableInfo.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,13 @@
2121
import com.dtstack.flink.sql.dirtyManager.manager.DirtyKeys;
2222
import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat;
2323
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
24-
2524
import com.google.common.collect.Lists;
2625
import com.google.common.collect.Maps;
2726

2827
import java.io.Serializable;
2928
import java.util.List;
3029
import java.util.Map;
3130
import java.util.Objects;
32-
import java.util.Properties;
3331

3432
/**
3533
* Reason:
@@ -43,6 +41,9 @@ public abstract class AbstractTableInfo implements Serializable {
4341

4442
public static final String PARALLELISM_KEY = "parallelism";
4543
public static final String ERROR_LIMIT = "errorLimit";
44+
public static final Boolean DEFAULT_FALSE = false;
45+
public static final Boolean DEFAULT_TRUE = true;
46+
public static final Object DEFAULT_NULL = null;
4647
private final List<String> fieldList = Lists.newArrayList();
4748
private final List<String> fieldTypeList = Lists.newArrayList();
4849
private final List<Class> fieldClassList = Lists.newArrayList();

core/src/main/java/com/dtstack/flink/sql/util/DtStringUtil.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@
2121
package com.dtstack.flink.sql.util;
2222

2323
import com.dtstack.flink.sql.enums.ColumnType;
24-
import com.fasterxml.jackson.databind.ObjectMapper;
2524
import com.google.common.base.Strings;
2625
import com.google.common.collect.Maps;
2726
import org.apache.commons.lang3.StringUtils;
27+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
28+
import org.apache.flink.util.Preconditions;
2829

2930
import java.math.BigDecimal;
3031
import java.sql.Timestamp;
@@ -46,7 +47,7 @@ public class DtStringUtil {
4647

4748
private static final Pattern NO_VERSION_PATTERN = Pattern.compile("([a-zA-Z]+).*");
4849

49-
private static ObjectMapper objectMapper = new ObjectMapper();
50+
private static final ObjectMapper objectMapper = new ObjectMapper();
5051

5152
/**
5253
* Split the specified string delimiter --- ignored quotes delimiter
@@ -255,10 +256,12 @@ public static String col2string(Object column, String type) {
255256
return result.toString();
256257
}
257258

258-
public static String getPluginTypeWithoutVersion(String engineType){
259+
public static String getPluginTypeWithoutVersion(String engineType) {
260+
Preconditions.checkNotNull(engineType, "type can't be null!");
259261

260262
Matcher matcher = NO_VERSION_PATTERN.matcher(engineType);
261-
if(!matcher.find()){
263+
264+
if (!matcher.find()) {
262265
return engineType;
263266
}
264267

core/src/main/java/com/dtstack/flink/sql/util/MathUtil.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,4 +263,16 @@ public static Timestamp getTimestamp(Object obj) {
263263
throw new RuntimeException("not support type of " + obj.getClass() + " convert to Date.");
264264
}
265265

266+
public static Character getChar(Object obj) {
267+
if (obj == null) {
268+
return null;
269+
}
270+
if (obj instanceof Character) {
271+
return (Character) obj;
272+
}
273+
if (obj instanceof String) {
274+
return String.valueOf(obj).charAt(0);
275+
}
276+
throw new RuntimeException("not support type of " + obj.getClass() + " convert to Char");
277+
}
266278
}

core/src/main/java/com/dtstack/flink/sql/util/PluginUtil.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,9 @@
2222

2323
import com.dtstack.flink.sql.dirtyManager.consumer.DirtyConsumerFactory;
2424
import com.dtstack.flink.sql.enums.EPluginLoadMode;
25-
import com.fasterxml.jackson.core.JsonGenerationException;
26-
import com.fasterxml.jackson.core.JsonParseException;
27-
import com.fasterxml.jackson.databind.JsonMappingException;
28-
import com.fasterxml.jackson.databind.ObjectMapper;
2925
import org.apache.commons.lang3.StringUtils;
26+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
27+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
3028
import org.slf4j.Logger;
3129
import org.slf4j.LoggerFactory;
3230

@@ -128,7 +126,11 @@ public static Map<String,Object> objectToMap(Object obj) throws Exception{
128126
return objectMapper.readValue(objectMapper.writeValueAsBytes(obj), Map.class);
129127
}
130128

131-
public static <T> T jsonStrToObject(String jsonStr, Class<T> clazz) throws JsonParseException, JsonMappingException, JsonGenerationException, IOException{
129+
public static String objectToString(Object obj) throws JsonProcessingException {
130+
return objectMapper.writeValueAsString(obj);
131+
}
132+
133+
public static <T> T jsonStrToObject(String jsonStr, Class<T> clazz) throws IOException{
132134
return objectMapper.readValue(jsonStr, clazz);
133135
}
134136

core/src/main/java/com/dtstack/flink/sql/util/ThreadUtil.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
*/
2828
public class ThreadUtil {
2929
public static final Long DEFAULT_SLEEP_TIME = 10L;
30+
3031
public static void sleepSeconds(long timeout) {
3132
try {
3233
TimeUnit.SECONDS.sleep(timeout);
@@ -50,4 +51,12 @@ public static void sleepMicroseconds(long timeout) {
5051
throw new RuntimeException(ie);
5152
}
5253
}
54+
55+
public static void sleepMilliseconds(long timeout) {
56+
try {
57+
TimeUnit.MILLISECONDS.sleep(timeout);
58+
} catch (InterruptedException e) {
59+
throw new RuntimeException(e);
60+
}
61+
}
5362
}

docs/plugin/filesource.md

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
## 1.格式:
2+
3+
```
4+
CREATE TABLE tableName(
5+
colName colType,
6+
...
7+
)WITH(
8+
type ='file',
9+
format = 'csv',
10+
fieldDelimiter = ',',
11+
fileName = 'xxxx',
12+
filePath = 'xx/xxx',
13+
location = 'local',
14+
nullLiteral = 'null',
15+
allowComment = 'true',
16+
arrayElementDelimiter = ',',
17+
quoteCharacter = '"',
18+
escapeCharacter = '\\',
19+
ignoreParseErrors = 'true',
20+
hdfsSite = 'xxx/hdfs-site.xml',
21+
coreSite = 'xxx/core-site.xml',
22+
hdfsUser = 'root',
23+
charsetName = 'UTF-8'
24+
);
25+
```
26+
27+
## 2.支持的格式
28+
29+
支持 HDFS、 Local 支持 Csv、Json、Arvo 格式文件
30+
31+
## 3.表结构定义
32+
33+
|参数名称|含义|
34+
|----|---|
35+
| tableName | 在 sql 中使用的名称;即注册到flink-table-env上的名称|
36+
| colName | 列名称|
37+
| colType | 列类型 [colType支持的类型](../colType.md)|
38+
39+
## 4.参数
40+
41+
通用参数设置
42+
43+
|参数名称|默认值|是否必填|参数说明|
44+
|----|---|---|---|
45+
|type|file||当前表的类型|
46+
|format|csv||文件格式,仅支持csv,json,Arvo类型|
47+
|fileName|||文件名|
48+
|filePath|||文件绝对路径|
49+
|location|local||文件存储介质,仅支持HDFS、Local|
50+
|charsetName|UTF-8||文件编码格式|
51+
52+
### 4.1 Csv 参数设置
53+
54+
|参数名称|默认值|是否必填|参数说明|
55+
|----|---|---|---|
56+
|ignoreParseErrors|true||是否忽略解析失败的数据|
57+
|fieldDelimiter|,||csv数据的分割符|
58+
|nullLiteral|"null"||填充csv数据中的null值|
59+
|allowComments|true|||
60+
|arrayElementDelimiter|,|||
61+
|quoteCharacter|"|||
62+
|escapeCharacter|\|||
63+
64+
### 4.2 Arvo 参数说明
65+
66+
|参数名称|默认值|是否必填|参数说明|
67+
|----|---|---|---|
68+
|avroFormat|||在format = 'arvo'的情况下是必填项|
69+
70+
### 4.3 HDFS 参数说明
71+
72+
|参数名称|默认值|是否必填|参数说明|
73+
|----|---|---|---|
74+
|hdfsSite|${HADOOP_CONF_HOME}/hdfs-site.xml||hdfs-site.xml所在位置|
75+
|coreSite|${HADOOP_CONF_HOME}/core-site.xml||core-site.xml所在位置|
76+
|hdfsUser|root||HDFS访问用户,默认是[root]用户|
77+
78+
### 4.4 Json 参数说明
79+
80+
Json无特殊参数
81+
82+
## 5.样例
83+
84+
数据展示:
85+
86+
csv
87+
88+
```csv
89+
712382,1/1/2017 0:00,1/1/2017 0:03,223,7051,Wellesley St E / Yonge St Green P,7089,Church St / Wood St,Member
90+
```
91+
92+
json
93+
94+
```json
95+
{
96+
"trip_id": "712382",
97+
"trip_start_time": "1/1/2017 0:00",
98+
"trip_stop_time": "1/1/2017 0:03",
99+
"trip_duration_seconds": "223",
100+
"from_station_id": "7051",
101+
"from_station_name": "Wellesley St E / Yonge St Green P",
102+
"to_station_id": "7089",
103+
"to_station_name": "Church St / Wood St",
104+
"user_type": "Member"
105+
},
106+
107+
```
108+
109+
### 5.1 csv
110+
111+
```sql
112+
CREATE TABLE SourceOne
113+
(
114+
trip_id varchar,
115+
trip_start_time varchar,
116+
trip_stop_time varchar,
117+
trip_duration_seconds varchar,
118+
from_station_id varchar,
119+
from_station_name varchar,
120+
to_station_id varchar,
121+
to_station_name varchar,
122+
user_type varchar
123+
) WITH (
124+
type = 'file',
125+
format = 'csv',
126+
fieldDelimiter = ',',
127+
fileName = '2017-Q1.csv',
128+
filePath = '/data',
129+
location = 'local',
130+
charsetName = 'UTF-8'
131+
);
132+
```
133+
134+
### 5.2 json
135+
136+
```sql
137+
CREATE TABLE SourceOne
138+
(
139+
trip_id varchar,
140+
trip_start_time varchar,
141+
trip_stop_time varchar,
142+
trip_duration_seconds varchar,
143+
from_station_id varchar,
144+
from_station_name varchar,
145+
to_station_id varchar,
146+
to_station_name varchar,
147+
user_type varchar
148+
) WITH (
149+
type = 'file',
150+
format = 'json',
151+
fieldDelimiter = ',',
152+
fileName = '2017-Q1.json',
153+
filePath = '/data',
154+
charsetName = 'UTF-8'
155+
);
156+
```
157+
158+
### 5.3 HDFS
159+
160+
```sql
161+
CREATE TABLE SourceOne
162+
(
163+
trip_id varchar,
164+
trip_start_time varchar,
165+
trip_stop_time varchar,
166+
trip_duration_seconds varchar,
167+
from_station_id varchar,
168+
from_station_name varchar,
169+
to_station_id varchar,
170+
to_station_name varchar,
171+
user_type varchar
172+
) WITH (
173+
type = 'file',
174+
format = 'json',
175+
fieldDelimiter = ',',
176+
fileName = '2017-Q1.json',
177+
filePath = 'hdfs://ns1/data',
178+
location = 'hdfs',
179+
hdfsSite = '/Users/wtz/dtstack/conf/yarn/kudu1/hdfs-site.xml',
180+
coreSite = '/Users/wtz/dtstack/conf/yarn/kudu1/core-site.xml',
181+
hdfsUser = 'admin',
182+
charsetName = 'UTF-8'
183+
);
184+
```

0 commit comments

Comments
 (0)