Skip to content

Commit 613b697

Browse files
yanghuaiGitFlechazoW
authored andcommitted
[hotfix]oracle type modify on sql & adjust to get unique key method (#1262)
1 parent 3f0c72e commit 613b697

File tree

4 files changed

+29
-7
lines changed

4 files changed

+29
-7
lines changed

chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ protected void openInternal(int taskNumber, int numTasks) {
9898
List<String> updateKey = jdbcConf.getUniqueKey();
9999
if (CollectionUtils.isEmpty(updateKey)) {
100100
List<String> tableIndex =
101-
JdbcUtil.getTableIndex(
101+
JdbcUtil.getTableUniqueIndex(
102102
jdbcConf.getSchema(), jdbcConf.getTable(), dbConn);
103103
jdbcConf.setUniqueKey(tableIndex);
104104
LOG.info("updateKey = {}", JsonUtil.toJson(tableIndex));

chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/util/JdbcUtil.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
import org.apache.flink.table.types.logical.LogicalType;
3434

35+
import org.apache.commons.collections.CollectionUtils;
3536
import org.apache.commons.lang.StringUtils;
3637
import org.apache.commons.lang3.tuple.Pair;
3738
import org.slf4j.Logger;
@@ -177,6 +178,22 @@ public static List<String> getTableIndex(String schema, String tableName, Connec
177178
return indexList;
178179
}
179180

181+
public static List<String> getTableUniqueIndex(
182+
String schema, String tableName, Connection dbConn) throws SQLException {
183+
List<String> tablePrimaryKey = getTablePrimaryKey(schema, tableName, dbConn);
184+
if (CollectionUtils.isNotEmpty(tablePrimaryKey)) {
185+
return tablePrimaryKey;
186+
}
187+
188+
ResultSet rs = dbConn.getMetaData().getIndexInfo(null, schema, tableName, true, false);
189+
List<String> indexList = new LinkedList<>();
190+
while (rs.next()) {
191+
String index = rs.getString(9);
192+
if (StringUtils.isNotBlank(index)) indexList.add(index);
193+
}
194+
return indexList;
195+
}
196+
180197
/**
181198
* get primarykey
182199
*

chunjun-connectors/chunjun-connector-oracle/src/main/java/com/dtstack/chunjun/connector/oracle/converter/OracleRowConverter.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,15 +152,17 @@ protected IDeserializationConverter createInternalConverter(LogicalType type) {
152152
}
153153
};
154154
case CHAR:
155-
if (((CharType) type).getLength() > CLOB_LENGTH) {
155+
if (((CharType) type).getLength() > CLOB_LENGTH
156+
&& ((VarCharType) type).getLength() != Integer.MAX_VALUE) {
156157
return val -> {
157158
oracle.sql.CLOB clob = (oracle.sql.CLOB) val;
158159
return StringData.fromString(ConvertUtil.convertClob(clob));
159160
};
160161
}
161162
return val -> StringData.fromString(val.toString());
162163
case VARCHAR:
163-
if (((VarCharType) type).getLength() > CLOB_LENGTH) {
164+
if (((VarCharType) type).getLength() > CLOB_LENGTH
165+
&& ((VarCharType) type).getLength() != Integer.MAX_VALUE) {
164166
return val -> {
165167
oracle.sql.CLOB clob = (oracle.sql.CLOB) val;
166168
return StringData.fromString(ConvertUtil.convertClob(clob));
@@ -200,7 +202,8 @@ protected ISerializationConverter<FieldNamedPreparedStatement> createExternalCon
200202
case DOUBLE:
201203
return (val, index, statement) -> statement.setDouble(index, val.getDouble(index));
202204
case CHAR:
203-
if (((CharType) type).getLength() > CLOB_LENGTH) {
205+
if (((CharType) type).getLength() > CLOB_LENGTH
206+
&& ((VarCharType) type).getLength() != Integer.MAX_VALUE) {
204207
return (val, index, statement) -> {
205208
try (StringReader reader =
206209
new StringReader(val.getString(index).toString())) {
@@ -213,7 +216,8 @@ protected ISerializationConverter<FieldNamedPreparedStatement> createExternalCon
213216
};
214217
case VARCHAR:
215218
// value is BinaryString
216-
if (((VarCharType) type).getLength() > CLOB_LENGTH) {
219+
if (((VarCharType) type).getLength() > CLOB_LENGTH
220+
&& ((VarCharType) type).getLength() != Integer.MAX_VALUE) {
217221
return (val, index, statement) -> {
218222
try (StringReader reader =
219223
new StringReader(val.getString(index).toString())) {

chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/converter/OracleRawTypeConverter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,15 @@ public static DataType apply(String type) {
5252
case "NCHAR":
5353
case "NVARCHAR2":
5454
case "LONG":
55-
case "RAW":
56-
case "LONG RAW":
5755
case "BLOB":
5856
case "CLOB":
5957
case "NCLOB":
6058
case "INTERVAL YEAR":
6159
case "INTERVAL DAY":
6260
return DataTypes.STRING();
61+
case "RAW":
62+
case "LONG RAW":
63+
return DataTypes.BYTES();
6364
case "INT":
6465
case "INTEGER":
6566
return DataTypes.INT();

0 commit comments

Comments
 (0)