Skip to content

Commit 41b96a8

Browse files
libailinzoudaokoulife
authored andcommitted
[hotfix-#1849][jdbc] fixed load data using lru method is empty, hide clear text passwords in logs, fixed build cache key when GenericRowData
1 parent 2d5f489 commit 41b96a8

File tree

4 files changed

+50
-8
lines changed

4 files changed

+50
-8
lines changed

chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/converter/JdbcSqlConverter.java

+20
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,26 @@ public RowData toInternalLookup(JsonArray jsonArray) throws Exception {
130130
GenericRowData genericRowData = new GenericRowData(rowType.getFieldCount());
131131
for (int pos = 0; pos < rowType.getFieldCount(); pos++) {
132132
Object field = jsonArray.getValue(pos);
133+
// 当sql里声明的字段类型为BIGINT时,将BigInteger (BIGINT UNSIGNED) 转换为Long
134+
if (rowType.getFields()
135+
.get(pos)
136+
.getType()
137+
.getTypeRoot()
138+
.name()
139+
.equalsIgnoreCase("BIGINT")
140+
&& field instanceof BigInteger) {
141+
field = ((BigInteger) field).longValue();
142+
}
143+
// 当sql里声明的字段类型为INT时,将Long (INT UNSIGNED) 转换为Integer
144+
if (rowType.getFields()
145+
.get(pos)
146+
.getType()
147+
.getTypeRoot()
148+
.name()
149+
.equalsIgnoreCase("INTEGER")
150+
&& field instanceof Long) {
151+
field = ((Long) field).intValue();
152+
}
133153
genericRowData.setField(pos, toInternalConverters.get(pos).deserialize(field));
134154
}
135155
return genericRowData;

chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/lookup/JdbcLruTableFunction.java

+17-7
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@
2929
import com.dtstack.chunjun.lookup.config.LookupConfig;
3030
import com.dtstack.chunjun.throwable.NoRestartException;
3131
import com.dtstack.chunjun.util.DateUtil;
32+
import com.dtstack.chunjun.util.JsonUtil;
3233
import com.dtstack.chunjun.util.ThreadUtil;
3334

35+
import org.apache.flink.table.data.GenericRowData;
3436
import org.apache.flink.table.data.RowData;
3537
import org.apache.flink.table.functions.FunctionContext;
3638
import org.apache.flink.table.types.logical.RowType;
@@ -138,7 +140,8 @@ public void open(FunctionContext context) throws Exception {
138140
new LinkedBlockingQueue<>(MAX_TASK_QUEUE_SIZE.defaultValue()),
139141
new ChunJunThreadFactory("rdbAsyncExec"),
140142
new ThreadPoolExecutor.CallerRunsPolicy());
141-
log.info("async dim table JdbcOptions info: {} ", jdbcConfig.toString());
143+
// 隐藏日志中明文密码
144+
log.info("async dim table JdbcOptions info: {} ", JsonUtil.toPrintJson(jdbcConfig));
142145
}
143146

144147
@Override
@@ -153,12 +156,19 @@ public void handleAsyncInvoke(CompletableFuture<Collection<RowData>> future, Obj
153156
Thread.sleep(100);
154157
}
155158

156-
executor.execute(
157-
() ->
158-
connectWithRetry(
159-
future,
160-
rdbSqlClient,
161-
Stream.of(keys).map(this::convertDataType).toArray(Object[]::new)));
159+
List<Object> keyList = new ArrayList<>();
160+
for (Object key : keys) {
161+
if (key instanceof GenericRowData) {
162+
GenericRowData genericRowData = (GenericRowData) key;
163+
for (int i = 0; i < genericRowData.getArity(); i++) {
164+
keyList.add(this.convertDataType(genericRowData.getField(i)));
165+
}
166+
} else {
167+
keyList.add(this.convertDataType(key));
168+
}
169+
}
170+
171+
executor.execute(() -> connectWithRetry(future, rdbSqlClient, keyList.toArray()));
162172
}
163173

164174
private Object convertDataType(Object val) {

chunjun-core/src/main/java/com/dtstack/chunjun/lookup/AbstractLruTableFunction.java

+11
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import org.apache.flink.metrics.Counter;
3131
import org.apache.flink.runtime.execution.SuppressRestartsException;
32+
import org.apache.flink.table.data.GenericRowData;
3233
import org.apache.flink.table.data.RowData;
3334
import org.apache.flink.table.functions.AsyncLookupFunction;
3435
import org.apache.flink.table.functions.FunctionContext;
@@ -279,6 +280,16 @@ public abstract void handleAsyncInvoke(
279280
* @return
280281
*/
281282
public String buildCacheKey(Object... keys) {
283+
if (keys != null && keys.length == 1 && keys[0] instanceof GenericRowData) {
284+
GenericRowData rowData = (GenericRowData) keys[0];
285+
int[] keyIndexes = new int[rowData.getArity()];
286+
for (int i = 0; i < rowData.getArity(); i++) {
287+
keyIndexes[i] = i;
288+
}
289+
return Arrays.stream(keyIndexes)
290+
.mapToObj(index -> String.valueOf(rowData.getField(index)))
291+
.collect(Collectors.joining("_"));
292+
}
282293
return Arrays.stream(keys).map(String::valueOf).collect(Collectors.joining("_"));
283294
}
284295

chunjun-core/src/main/java/com/dtstack/chunjun/util/JsonUtil.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ public static String toPrintJson(Object obj) {
109109
try {
110110
Map<String, Object> result =
111111
objectMapper.readValue(objectMapper.writeValueAsString(obj), HashMap.class);
112-
MapUtil.replaceAllElement(result, Lists.newArrayList("pwd", "password"), "******");
112+
MapUtil.replaceAllElement(
113+
result, Lists.newArrayList("pwd", "password", "druid.password"), "******");
113114
return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(result);
114115
} catch (Exception e) {
115116
throw new RuntimeException("error parse [" + obj + "] to json", e);

0 commit comments

Comments
 (0)