Skip to content

Commit bae4709

Browse files
gk0916kanata163
authored andcommitted
fix: 添加reader配置中column数组长度和查询结果列长度的校验
1 parent 8a0f6da commit bae4709

File tree

8 files changed

+21
-8
lines changed

8 files changed

+21
-8
lines changed

flinkx-clickhouse/flinkx-clickhouse-reader/src/main/java/com/dtstack/flinkx/clickhouse/format/ClickhouseInputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public void openInternal(InputSplit inputSplit) throws IOException {
7272
if(splitWithRowCol){
7373
columnCount = columnCount-1;
7474
}
75-
75+
checkSize(columnCount, metaColumns);
7676
hasNext = resultSet.next();
7777

7878
descColumnTypeList = DbUtil.analyzeColumnType(resultSet);

flinkx-gbase/flinkx-gbase-reader/src/main/java/com/dtstack/flinkx/gbase/format/GbaseInputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public void openInternal(InputSplit inputSplit) throws IOException {
6868
if(splitWithRowCol){
6969
columnCount = columnCount-1;
7070
}
71-
71+
checkSize(columnCount, metaColumns);
7272
descColumnTypeList = DbUtil.analyzeColumnType(resultSet);
7373

7474
} catch (SQLException se) {

flinkx-mysql/flinkx-mysql-reader/src/main/java/com/dtstack/flinkx/mysql/format/MysqlInputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public void openInternal(InputSplit inputSplit) throws IOException {
7272
if(splitWithRowCol){
7373
columnCount = columnCount-1;
7474
}
75-
75+
checkSize(columnCount, metaColumns);
7676
descColumnTypeList = DbUtil.analyzeColumnType(resultSet);
7777

7878
} catch (SQLException se) {

flinkx-phoenix/flinkx-phoenix-reader/src/main/java/com/dtstack/flinkx/phoenix/format/PhoenixInputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public void openInternal(InputSplit inputSplit) throws IOException {
7878
if(splitWithRowCol){
7979
columnCount = columnCount-1;
8080
}
81-
81+
checkSize(columnCount, metaColumns);
8282
hasNext = resultSet.next();
8383

8484
descColumnTypeList = DbUtil.analyzeColumnType(resultSet);

flinkx-postgresql/flinkx-postgresql-reader/src/main/java/com/dtstack/flinkx/postgresql/format/PostgresqlInputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public void openInternal(InputSplit inputSplit) throws IOException {
6868
if(splitWithRowCol){
6969
columnCount = columnCount-1;
7070
}
71-
71+
checkSize(columnCount, metaColumns);
7272
descColumnTypeList = DbUtil.analyzeColumnType(resultSet);
7373

7474
} catch (SQLException se) {

flinkx-rdb/flinkx-rdb-reader/src/main/java/com.dtstack.flinkx.rdb.inputformat/JdbcInputFormat.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ public void openInternal(InputSplit inputSplit) throws IOException {
186186
if (splitWithRowCol) {
187187
columnCount = columnCount - 1;
188188
}
189-
189+
checkSize(columnCount, metaColumns);
190190
descColumnTypeList = DbUtil.analyzeColumnType(resultSet);
191191

192192
} catch (SQLException se) {
@@ -832,4 +832,17 @@ private void queryStartLocation() throws SQLException{
832832
}
833833
}
834834

835+
/**
836+
* 校验columnCount和metaColumns的长度是否相等
837+
* @param columnCount
838+
* @param metaColumns
839+
*/
840+
protected void checkSize(int columnCount, List<MetaColumn> metaColumns) {
841+
if (!ConstantValue.STAR_SYMBOL.equals(metaColumns.get(0).getName()) && columnCount != metaColumns.size()) {
842+
LOG.error("error config: column size for reader is {},but columns size for query result is {}." +
843+
" And the query sql is '{}'.",
844+
metaColumns.size(), columnCount, querySql);
845+
throw new RuntimeException("");
846+
}
847+
}
835848
}

flinkx-saphana/flinkx-saphana-reader/src/main/java/com/dtstack/flinkx/saphana/format/SaphanaInputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public void openInternal(InputSplit inputSplit) throws IOException {
7676
if(splitWithRowCol){
7777
columnCount = columnCount-1;
7878
}
79-
79+
checkSize(columnCount, metaColumns);
8080
hasNext = resultSet.next();
8181

8282
descColumnTypeList = DbUtil.analyzeColumnType(resultSet);

flinkx-teradata/flinkx-teradata-reader/src/main/java/com/dtstack/flinkx/teradata/format/TeradataInputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public void openInternal(InputSplit inputSplit) throws IOException {
8080
if(splitWithRowCol){
8181
columnCount = columnCount-1;
8282
}
83-
83+
checkSize(columnCount, metaColumns);
8484
hasNext = resultSet.next();
8585

8686
descColumnTypeList = DbUtil.analyzeColumnType(resultSet);

0 commit comments

Comments
 (0)