Skip to content

Commit 50eaddc

Browse files
committed
[FLINK-35176][Connector/JDBC] Support property authentication connection for JDBC dynamic table
1 parent 16154ed commit 50eaddc

File tree

1 file changed

+6
-0
lines changed

1 file changed

+6
-0
lines changed

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java

+6
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.Arrays;
4747
import java.util.HashSet;
4848
import java.util.Optional;
49+
import java.util.Properties;
4950
import java.util.Set;
5051
import java.util.stream.Collectors;
5152
import java.util.stream.Stream;
@@ -71,6 +72,7 @@
7172
import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME;
7273
import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.URL;
7374
import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAME;
75+
import static org.apache.flink.connector.jdbc.utils.JdbcUtils.getConnectionProperties;
7476

7577
/**
7678
* Factory for creating configured instances of {@link JdbcDynamicTableSource} and {@link
@@ -153,6 +155,10 @@ private InternalJdbcConnectionOptions getJdbcOptions(
153155
readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
154156
readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
155157
readableConfig.getOptional(COMPATIBLE_MODE).ifPresent(builder::setCompatibleMode);
158+
final Properties connectionProperties = getConnectionProperties(readableConfig);
159+
connectionProperties
160+
.stringPropertyNames()
161+
.forEach(key -> builder.setProperty(key, connectionProperties.getProperty(key)));
156162
return builder.build();
157163
}
158164

0 commit comments

Comments
 (0)