Skip to content

Commit 0cda3c4

Browse files
authored
Merge pull request #416 from DTStack/feat_1.10_415
支持elasticsearch7 sink以及side,并完善其文档 支持脏数据管理 支持file source impla插件批量写入失败时改为单条insert 支持http sink
2 parents d49fdd4 + fccd660 commit 0cda3c4

File tree

56 files changed

+4097
-251
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+4097
-251
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,5 +41,7 @@ FlinkStreamSQL
4141

4242
## 如何贡献FlinkStreamSQL
4343

44+
[pr规范](docs/pr.md)
45+
4446
## License
4547
FlinkStreamSQL is under the Apache 2.0 license. See the [LICENSE](http://www.apache.org/licenses/LICENSE-2.0) file for details.

core/src/main/java/com/dtstack/flink/sql/enums/ColumnType.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@ public enum ColumnType {
102102
* timestamp
103103
*/
104104
TIMESTAMP,
105+
/**
106+
* time eg: 23:59:59
107+
*/
108+
TIME,
105109
/**
106110
* decimal
107111
*/

core/src/main/java/com/dtstack/flink/sql/exception/ExceptionTrace.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,4 @@ public static String traceOriginalCause(Throwable e) {
2020
}
2121
return errorMsg;
2222
}
23-
24-
/**
25-
* 根据异常的种类来判断是否需要强制跳过Flink的重启{@link SuppressRestartsException}
26-
* @param e exception
27-
* @param errorMsg 需要抛出的异常信息
28-
*/
29-
public static void dealExceptionWithSuppressStart(Exception e, String errorMsg) {
30-
if (e instanceof SuppressRestartsException) {
31-
throw new SuppressRestartsException(
32-
new Throwable(
33-
errorMsg
34-
)
35-
);
36-
} else {
37-
throw new RuntimeException(errorMsg);
38-
}
39-
}
4023
}

core/src/main/java/com/dtstack/flink/sql/side/AbstractSideTableInfo.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,10 @@ public List<PredicateInfo> getPredicateInfoes() {
193193
return predicateInfoes;
194194
}
195195

196+
public void addPredicateInfo(PredicateInfo predicateInfo) {
197+
this.predicateInfoes.add(predicateInfo);
198+
}
199+
196200
public Long getAsyncFailMaxNum(Long defaultValue) {
197201
return Objects.isNull(asyncFailMaxNum) ? defaultValue : asyncFailMaxNum;
198202
}

core/src/main/java/com/dtstack/flink/sql/side/BaseSideInfo.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -137,12 +137,14 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) {
137137
if (leftNode.getKind() == SqlKind.LITERAL) {
138138
evalConstantEquation(
139139
(SqlLiteral) leftNode,
140-
(SqlIdentifier) rightNode
140+
(SqlIdentifier) rightNode,
141+
sqlNode.getKind()
141142
);
142143
} else if (rightNode.getKind() == SqlKind.LITERAL) {
143144
evalConstantEquation(
144145
(SqlLiteral) rightNode,
145-
(SqlIdentifier) leftNode
146+
(SqlIdentifier) leftNode,
147+
sqlNode.getKind()
146148
);
147149
} else {
148150
SqlIdentifier left = (SqlIdentifier) leftNode;
@@ -179,20 +181,19 @@ private void evalEquation(SqlIdentifier left, SqlIdentifier right, String sideTa
179181
* @param literal
180182
* @param identifier
181183
*/
182-
private void evalConstantEquation(SqlLiteral literal, SqlIdentifier identifier) {
184+
private void evalConstantEquation(SqlLiteral literal, SqlIdentifier identifier, SqlKind sqlKind) {
183185
String tableName = identifier.getComponent(0).getSimple();
184186
checkSupport(identifier);
185187
String fieldName = identifier.getComponent(1).getSimple();
186188
Object constant = literal.getValue();
187-
List<PredicateInfo> predicateInfos = sideTableInfo.getPredicateInfoes();
188189
PredicateInfo predicate = PredicateInfo.builder()
189-
.setOperatorName("=")
190-
.setOperatorKind("EQUALS")
190+
.setOperatorName(sqlKind.sql)
191+
.setOperatorKind(sqlKind.name())
191192
.setOwnerTable(tableName)
192193
.setFieldName(fieldName)
193194
.setCondition(constant.toString())
194195
.build();
195-
predicateInfos.add(predicate);
196+
sideTableInfo.addPredicateInfo(predicate);
196197
}
197198

198199
private void checkSupport(SqlIdentifier identifier) {

core/src/main/java/com/dtstack/flink/sql/util/DtStringUtil.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,17 @@
2121
package com.dtstack.flink.sql.util;
2222

2323
import com.dtstack.flink.sql.enums.ColumnType;
24-
import org.apache.commons.lang3.StringUtils;
24+
import com.fasterxml.jackson.databind.ObjectMapper;
2525
import com.google.common.base.Strings;
2626
import com.google.common.collect.Maps;
27-
import com.fasterxml.jackson.databind.ObjectMapper;
28-
import java.sql.Timestamp;
27+
import org.apache.commons.lang3.StringUtils;
28+
2929
import java.math.BigDecimal;
30+
import java.sql.Timestamp;
3031
import java.util.ArrayList;
3132
import java.util.List;
3233
import java.util.Map;
34+
import java.util.Objects;
3335
import java.util.regex.Matcher;
3436
import java.util.regex.Pattern;
3537

@@ -241,6 +243,9 @@ public static String col2string(Object column, String type) {
241243
case DATE:
242244
result = DateUtil.dateToString((java.util.Date)column);
243245
break;
246+
case TIME:
247+
result = DateUtil.getTimeFromStr(String.valueOf(column));
248+
break;
244249
case TIMESTAMP:
245250
result = DateUtil.timestampToString((java.util.Date)column);
246251
break;
@@ -411,4 +416,14 @@ public static String removeStartAndEndQuota(String str) {
411416
String removeStart = StringUtils.removeStart(str, "'");
412417
return StringUtils.removeEnd(removeStart, "'");
413418
}
419+
420+
/**
421+
* 判断当前对象是null 还是空格
422+
*
423+
* @param obj 需要判断的对象
424+
* @return 返回true 如果对象是空格或者为null
425+
*/
426+
public static boolean isEmptyOrNull(Object obj) {
427+
return Objects.isNull(obj) || obj.toString().isEmpty();
428+
}
414429
}

core/src/main/java/com/dtstack/flink/sql/util/KrbUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flink.sql.util;
2020

21+
import com.esotericsoftware.minlog.Log;
2122
import org.apache.hadoop.conf.Configuration;
2223
import org.apache.hadoop.security.UserGroupInformation;
2324
import org.apache.hadoop.security.authentication.util.KerberosName;

docs/plugin/elasticsearchSink.md renamed to docs/plugin/elasticsearch5Sink.md

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ CREATE TABLE tableName(
99
cluster='clusterName',
1010
estype ='esType',
1111
index ='index',
12-
id ='num[,num]',
12+
id ='num[,num]'(id = 'field[,field]'),
1313
parallelism ='1'
1414
)
1515
```
@@ -27,19 +27,19 @@ CREATE TABLE tableName(
2727
## 4.参数:
2828
|参数名称|含义|是否必填|默认值|
2929
|----|---|---|----|
30-
|type|表明 输出表类型[mysq&#124;hbase&#124;elasticsearch]|||
31-
|address | 连接ES Transport地址(tcp地址)|||
30+
|type|表明 输出表类型[mysq&#124;hbase&#124;elasticsearch]||elasticsearch|
31+
|address | 连接ES Transport地址(tcp地址)||9300|
3232
|cluster | ES 集群名称 |||
3333
|index | 选择的ES上的index名称|||
3434
|estype | 选择ES上的type名称|||
35-
|id | 生成id的规则(当前是根据指定的字段pos获取字段信息,拼接生成id)|||
36-
|authMesh | 是否进行用户名密码认证 || false|
35+
|id | 生成id的规则(当前是根据指定的字段名称(或者字段position)获取字段信息,拼接生成id)|||
36+
|authMesh | 是否进行用户名密码认证(xpack认证) || false|
3737
|userName | 用户名 | 否,authMesh='true'时为必填 ||
3838
|password | 密码 | 否,authMesh='true'时为必填 ||
3939
|parallelism | 并行度设置||1|
4040

4141
## 5.样例:
42-
```
42+
```sql
4343
CREATE TABLE MyTable(
4444
channel varchar,
4545
pv varchar
@@ -61,14 +61,15 @@ CREATE TABLE MyResult(
6161
channel varchar
6262
)WITH(
6363
type ='elasticsearch',
64-
address ='172.16.8.193:9200',
64+
address ='172.16.8.193:9300',
6565
authMesh='true',
6666
username='elastic',
6767
password='abc123',
6868
estype ='external',
6969
cluster ='docker-cluster',
7070
index ='myresult',
71-
id ='1',
71+
id ='pv',
72+
-- id = '1' # 在支持position方式和属性名方式
7273
updateMode ='append',
7374
parallelism ='1'
7475
);

docs/plugin/elasticsearch6Side.md

Lines changed: 32 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
2-
## 1.格式:
1+
## 1.格式
32
```
43
CREATE TABLE tableName(
54
colName cloType,
@@ -24,45 +23,49 @@
2423
```
2524

2625
# 2.支持版本
27-
elasticsearch 6.8.6
26+
27+
elasticsearch `6.x`
2828

2929
## 3.表结构定义
30-
31-
|参数名称|含义|
32-
|----|---|
33-
| tableName | elasticsearch表名称|
34-
| colName | 列名称|
35-
| colType | 列类型 [colType支持的类型](../colType.md)|
36-
| PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息|
37-
| PRIMARY KEY(keyInfo) | 维表主键定义;多个列之间用逗号隔开|
30+
31+
|参数名称|含义|
32+
|---|---|
33+
| tableName | elasticsearch表名称|
34+
| colName | 列名称|
35+
| colType | 列类型 [colType支持的类型](../colType.md)|
36+
| PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息|
37+
| PRIMARY KEY(keyInfo) | 维表主键定义;多个列之间用逗号隔开|
3838

3939
## 4.参数
4040

41-
|参数名称|含义|是否必填|默认值|
42-
|----|---|---|----|
43-
type|表明 输出表类型[elasticsearch6]|是||
44-
|address | 连接ES Transport地址(tcp地址)|||
45-
|cluster | ES 集群名称 |||
46-
|index | 选择的ES上的index名称|||
47-
|esType | 选择ES上的type名称|||
48-
|authMesh | 是否进行用户名密码认证 || false|
49-
|userName | 用户名 | 否,authMesh='true'时为必填 ||
50-
|password | 密码 | 否,authMesh='true'时为必填 ||
51-
| cache | 维表缓存策略(NONE/LRU)||NONE|
52-
| partitionedJoin | 是否在維表join之前先根据 設定的key 做一次keyby操作(可以減少维表的数据缓存量)||false|
53-
|parallelism | 并行度设置||1|
41+
|参数名称|含义|是否必填|默认值|
42+
|----|---|---|----|
43+
|type|表明 输出表类型[elasticsearch6]|||
44+
|address | 连接ES Http地址|||
45+
|cluster | ES 集群名称 |||
46+
|index | 选择的ES上的index名称|||
47+
|esType | 选择ES上的type名称|||
48+
|authMesh | 是否进行用户名密码认证 || false|
49+
|userName | 用户名 | 否,authMesh='true'时为必填 ||
50+
|password | 密码 | 否,authMesh='true'时为必填 ||
51+
| cache | 维表缓存策略(NONE/LRU)||NONE|
52+
| partitionedJoin | 是否在維表join之前先根据 設定的key 做一次keyby操作(可以減少维表的数据缓存量)||false|
53+
|parallelism | 并行度设置||1|
5454

55-
----------
56-
> 缓存策略
57-
* NONE: 不做内存缓存
58-
* LRU:
55+
56+
----------
57+
> 缓存策略
58+
59+
* NONE: 不做内存缓存
60+
* LRU:
5961
* cacheSize: 缓存的条目数量
6062
* cacheTTLMs:缓存的过期时间(ms)
6163
* cacheMode: (unordered|ordered)异步加载是有序还是无序,默认有序。
6264
* asyncCapacity:异步请求容量,默认1000
6365
* asyncTimeout:异步请求超时时间,默认10000毫秒
6466

6567
## 5.样例
68+
6669
```
6770
create table sideTable(
6871
channel varchar,
@@ -71,7 +74,7 @@ create table sideTable(
7174
PERIOD FOR SYSTEM_TIME
7275
)WITH(
7376
type ='elasticsearch6',
74-
address ='172.16.10.47:9500',
77+
address ='localhost:9200',
7578
cluster='es_47_menghan',
7679
estype ='type1',
7780
index ='xc_es_test',

docs/plugin/elasticsearch6Sink.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,15 @@ CREATE TABLE tableName(
99
cluster='clusterName',
1010
esType ='esType',
1111
index ='index',
12-
id ='num[,num]',
12+
id ='num[,num]'(id = 'field[,field]'),
1313
authMesh = 'true',
1414
userName = 'userName',
1515
password = 'password',
1616
parallelism ='1'
1717
)
1818
```
1919
## 2.支持的版本
20-
elasticsearch 6.8.6
20+
elasticsearch `6.x`
2121

2222
## 3.表结构定义
2323

@@ -31,11 +31,11 @@ CREATE TABLE tableName(
3131
|参数名称|含义|是否必填|默认值|
3232
|----|---|---|----|
3333
|type|表明 输出表类型[elasticsearch6]|||
34-
|address | 连接ES Transport地址(tcp地址)|||
34+
|address | 连接ES Http地址|||
3535
|cluster | ES 集群名称 |||
3636
|index | 选择的ES上的index名称|||
3737
|esType | 选择ES上的type名称|||
38-
|id | 生成id的规则(当前是根据指定的字段pos获取字段信息,拼接生成id;|||
38+
|id | 生成id的规则(当前是根据指定的字段名称(或者字段position)获取字段信息,拼接生成id)|||
3939
| |若id为空字符串或索引都超出范围,则随机生成id值)|||
4040
|authMesh | 是否进行用户名密码认证 || false|
4141
|userName | 用户名 | 否,authMesh='true'时为必填 ||
@@ -73,8 +73,8 @@ CREATE TABLE MyResult(
7373
estype ='external',
7474
cluster ='docker-cluster',
7575
index ='myresult',
76-
id ='1',
77-
updateMode ='append',
76+
id ='pv',
77+
-- id = '1' # 在支持position方式和属性名方式
7878
parallelism ='1'
7979
);
8080

0 commit comments

Comments
 (0)