Skip to content

Commit fccd660

Browse files
committed
[flinkStremSql][415]
1.支持elasticsearch7 sink以及side,并完善其文档 2.支持脏数据管理 3.支持file source 4.impla插件批量写入失败时改为单条insert 5.支持http sink
1 parent 10dd034 commit fccd660

File tree

7 files changed

+36
-14
lines changed

7 files changed

+36
-14
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,5 +41,7 @@ FlinkStreamSQL
4141

4242
## 如何贡献FlinkStreamSQL
4343

44+
[pr规范](docs/pr.md)
45+
4446
## License
4547
FlinkStreamSQL is under the Apache 2.0 license. See the [LICENSE](http://www.apache.org/licenses/LICENSE-2.0) file for details.

docs/pr.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
## PR规范
2+
3+
1. 建立issue,描述相关问题信息
4+
2. 基于对应的release分支拉取开发分支
5+
3. commit 信息:[type-issueid] [module] msg
6+
1. type 类别
7+
2. feat:表示是一个新功能(feature)
8+
3. hotfix:hotfix,修补bug
9+
4. docs:改动、增加文档
10+
5. opt:修改代码风格及opt imports这些,不改动原有执行的代码
11+
6. test:增加测试
12+
13+
<br/>
14+
15+
eg:
16+
[hotfix-31280][core] 修复bigdecimal转decimal运行失败问题
17+
[feat-31372][rdb] RDB结果表Upsert模式支持选择更新策略
18+
19+
1. 多次提交使用rebase 合并成一个。
20+
2. pr 名称:[flinkx-issueid][module名称] 标题

rdb/rdb-core/src/main/java/com/dtstack/flink/sql/core/rdb/JdbcResourceCheck.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package com.dtstack.flink.sql.core.rdb;
2020

21-
import com.dtstack.flink.sql.core.rdb.util.JdbcConnectUtil;
21+
import com.dtstack.flink.sql.core.rdb.util.JdbcConnectionUtil;
2222
import com.dtstack.flink.sql.resource.ResourceCheck;
2323
import org.apache.commons.lang.StringUtils;
2424
import org.apache.flink.runtime.execution.SuppressRestartsException;
@@ -109,7 +109,7 @@ public void checkPrivilege(
109109
, String schema
110110
, List<String> privilegeList) {
111111
Connection connection =
112-
JdbcConnectUtil.getConnectWithRetry(driverName, url, userName, password);
112+
JdbcConnectionUtil.getConnectWithRetry(driverName, url, userName, password);
113113
Statement statement = null;
114114
String tableInfo = Objects.isNull(schema) ? tableName : schema + "." + tableName;
115115
String privilege = null;
@@ -133,7 +133,7 @@ public void checkPrivilege(
133133

134134
throw new SuppressRestartsException(new IllegalArgumentException(sqlException.getMessage()));
135135
} finally {
136-
JdbcConnectUtil.closeConnectionResource(null, statement, connection, false);
136+
JdbcConnectionUtil.closeConnectionResource(null, statement, connection, false);
137137
}
138138
}
139139
}

rdb/rdb-core/src/main/java/com/dtstack/flink/sql/core/rdb/util/JdbcConnectUtil.java renamed to rdb/rdb-core/src/main/java/com/dtstack/flink/sql/core/rdb/util/JdbcConnectionUtil.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,11 @@
3838
* Date 2020-12-25
3939
* Company dtstack
4040
*/
41-
public class JdbcConnectUtil {
41+
public class JdbcConnectionUtil {
4242
private static final int DEFAULT_RETRY_NUM = 3;
4343
private static final long DEFAULT_RETRY_TIME_WAIT = 3L;
4444
private static final int DEFAULT_VALID_TIME = 10;
45-
private static final Logger LOG = LoggerFactory.getLogger(JdbcConnectUtil.class);
45+
private static final Logger LOG = LoggerFactory.getLogger(JdbcConnectionUtil.class);
4646

4747
/**
4848
* 关闭连接资源
@@ -137,7 +137,7 @@ public static Connection getConnectWithRetry(
137137
+ "\nerror message: ");
138138
String errorCause = null;
139139

140-
ClassLoaderManager.forName(driverName, JdbcConnectUtil.class.getClassLoader());
140+
ClassLoaderManager.forName(driverName, JdbcConnectionUtil.class.getClassLoader());
141141
Preconditions.checkNotNull(url, "url can't be null!");
142142

143143
for (int i = 0; i < DEFAULT_RETRY_NUM; i++) {

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package com.dtstack.flink.sql.side.rdb.all;
2020

2121
import com.dtstack.flink.sql.core.rdb.JdbcResourceCheck;
22-
import com.dtstack.flink.sql.core.rdb.util.JdbcConnectUtil;
22+
import com.dtstack.flink.sql.core.rdb.util.JdbcConnectionUtil;
2323
import com.dtstack.flink.sql.side.BaseAllReqRow;
2424
import com.dtstack.flink.sql.side.BaseSideInfo;
2525
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
@@ -208,7 +208,7 @@ private void queryAndFillData(Map<String, List<Map<String, Object>>> tmpCache, C
208208
tmpCache.computeIfAbsent(cacheKey, key -> Lists.newArrayList())
209209
.add(oneRow);
210210
}
211-
JdbcConnectUtil.closeConnectionResource(resultSet, statement, connection, false);
211+
JdbcConnectionUtil.closeConnectionResource(resultSet, statement, connection, false);
212212
}
213213

214214
public int getFetchSize() {

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/JDBCUpsertOutputFormat.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121

2222
import com.dtstack.flink.sql.core.rdb.JdbcResourceCheck;
23-
import com.dtstack.flink.sql.core.rdb.util.JdbcConnectUtil;
23+
import com.dtstack.flink.sql.core.rdb.util.JdbcConnectionUtil;
2424
import com.dtstack.flink.sql.enums.EUpdateMode;
2525
import com.dtstack.flink.sql.exception.ExceptionTrace;
2626
import com.dtstack.flink.sql.factory.DTThreadFactory;
@@ -135,7 +135,7 @@ public void open(int taskNumber, int numTasks) throws IOException {
135135

136136
public void openJdbc() throws IOException {
137137
try {
138-
connection = JdbcConnectUtil.getConnectWithRetry(
138+
connection = JdbcConnectionUtil.getConnectWithRetry(
139139
driverName,
140140
dbURL,
141141
username,
@@ -194,7 +194,7 @@ private void checkConnectionOpen() {
194194
try {
195195
if (!connection.isValid(10)) {
196196
LOG.info("db connection reconnect..");
197-
connection = JdbcConnectUtil.getConnectWithRetry(
197+
connection = JdbcConnectionUtil.getConnectWithRetry(
198198
driverName,
199199
dbURL,
200200
username,

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/writer/JDBCWriter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package com.dtstack.flink.sql.sink.rdb.writer;
2020

21-
import com.dtstack.flink.sql.core.rdb.util.JdbcConnectUtil;
21+
import com.dtstack.flink.sql.core.rdb.util.JdbcConnectionUtil;
2222
import com.dtstack.flink.sql.exception.ExceptionTrace;
2323
import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat;
2424
import org.apache.flink.api.java.tuple.Tuple2;
@@ -82,8 +82,8 @@ default void dealExecuteError(Connection connection,
8282
Row row,
8383
long errorLimit,
8484
Logger LOG) {
85-
JdbcConnectUtil.rollBack(connection);
86-
JdbcConnectUtil.commit(connection);
85+
JdbcConnectionUtil.rollBack(connection);
86+
JdbcConnectionUtil.commit(connection);
8787

8888
if (metricOutputFormat.outDirtyRecords.getCount() % DIRTYDATA_PRINT_FREQUENTY == 0 ||
8989
LOG.isDebugEnabled()) {

0 commit comments

Comments
 (0)