Skip to content

Commit bb43b3c

Browse files
authored
[FLINK-33460][Connector/JDBC] Support property authentication connection. (#115)
1 parent 64c7b75 commit bb43b3c

File tree

7 files changed

+160
-40
lines changed

7 files changed

+160
-40
lines changed

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcConnectionOptions.java

+33-14
Original file line numberDiff line numberDiff line change
@@ -20,35 +20,39 @@
2020
import org.apache.flink.annotation.PublicEvolving;
2121
import org.apache.flink.util.Preconditions;
2222

23+
import javax.annotation.Nonnull;
2324
import javax.annotation.Nullable;
2425

2526
import java.io.Serializable;
27+
import java.util.Objects;
2628
import java.util.Optional;
29+
import java.util.Properties;
2730

2831
/** JDBC connection options. */
2932
@PublicEvolving
3033
public class JdbcConnectionOptions implements Serializable {
3134

35+
public static final String USER_KEY = "user";
36+
public static final String PASSWORD_KEY = "password";
37+
3238
private static final long serialVersionUID = 1L;
3339

3440
protected final String url;
3541
@Nullable protected final String driverName;
3642
protected final int connectionCheckTimeoutSeconds;
37-
@Nullable protected final String username;
38-
@Nullable protected final String password;
43+
@Nonnull protected final Properties properties;
3944

4045
protected JdbcConnectionOptions(
4146
String url,
4247
@Nullable String driverName,
43-
@Nullable String username,
44-
@Nullable String password,
45-
int connectionCheckTimeoutSeconds) {
48+
int connectionCheckTimeoutSeconds,
49+
@Nonnull Properties properties) {
4650
Preconditions.checkArgument(connectionCheckTimeoutSeconds > 0);
4751
this.url = Preconditions.checkNotNull(url, "jdbc url is empty");
4852
this.driverName = driverName;
49-
this.username = username;
50-
this.password = password;
5153
this.connectionCheckTimeoutSeconds = connectionCheckTimeoutSeconds;
54+
this.properties =
55+
Preconditions.checkNotNull(properties, "Connection properties must be non-null");
5256
}
5357

5458
public String getDbURL() {
@@ -61,24 +65,28 @@ public String getDriverName() {
6165
}
6266

6367
public Optional<String> getUsername() {
64-
return Optional.ofNullable(username);
68+
return Optional.ofNullable(properties.getProperty(USER_KEY));
6569
}
6670

6771
public Optional<String> getPassword() {
68-
return Optional.ofNullable(password);
72+
return Optional.ofNullable(properties.getProperty(PASSWORD_KEY));
6973
}
7074

7175
public int getConnectionCheckTimeoutSeconds() {
7276
return connectionCheckTimeoutSeconds;
7377
}
7478

79+
@Nonnull
80+
public Properties getProperties() {
81+
return properties;
82+
}
83+
7584
/** Builder for {@link JdbcConnectionOptions}. */
7685
public static class JdbcConnectionOptionsBuilder {
7786
private String url;
7887
private String driverName;
79-
private String username;
80-
private String password;
8188
private int connectionCheckTimeoutSeconds = 60;
89+
private final Properties properties = new Properties();
8290

8391
public JdbcConnectionOptionsBuilder withUrl(String url) {
8492
this.url = url;
@@ -90,13 +98,24 @@ public JdbcConnectionOptionsBuilder withDriverName(String driverName) {
9098
return this;
9199
}
92100

101+
public JdbcConnectionOptionsBuilder withProperty(String propKey, String propVal) {
102+
Preconditions.checkNotNull(propKey, "Connection property key mustn't be null");
103+
Preconditions.checkNotNull(propVal, "Connection property value mustn't be null");
104+
this.properties.put(propKey, propVal);
105+
return this;
106+
}
107+
93108
public JdbcConnectionOptionsBuilder withUsername(String username) {
94-
this.username = username;
109+
if (Objects.nonNull(username)) {
110+
this.properties.put(USER_KEY, username);
111+
}
95112
return this;
96113
}
97114

98115
public JdbcConnectionOptionsBuilder withPassword(String password) {
99-
this.password = password;
116+
if (Objects.nonNull(password)) {
117+
this.properties.put(PASSWORD_KEY, password);
118+
}
100119
return this;
101120
}
102121

@@ -114,7 +133,7 @@ public JdbcConnectionOptionsBuilder withConnectionCheckTimeoutSeconds(
114133

115134
public JdbcConnectionOptions build() {
116135
return new JdbcConnectionOptions(
117-
url, driverName, username, password, connectionCheckTimeoutSeconds);
136+
url, driverName, connectionCheckTimeoutSeconds, properties);
118137
}
119138
}
120139
}

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/JdbcConnectionProvider.java

+12
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919

2020
import org.apache.flink.annotation.PublicEvolving;
2121

22+
import javax.annotation.Nonnull;
2223
import javax.annotation.Nullable;
2324

2425
import java.io.Serializable;
2526
import java.sql.Connection;
2627
import java.sql.SQLException;
28+
import java.util.Properties;
2729

2830
/** JDBC connection provider. */
2931
@PublicEvolving
@@ -36,6 +38,16 @@ public interface JdbcConnectionProvider extends Serializable, AutoCloseable {
3638
@Nullable
3739
Connection getConnection();
3840

41+
/**
42+
* Get existing connection properties.
43+
*
44+
* @return existing connection properties
45+
*/
46+
@Nonnull
47+
default Properties getProperties() {
48+
throw new UnsupportedOperationException();
49+
}
50+
3951
/**
4052
* Check whether possible existing connection is valid or not through {@link
4153
* Connection#isValid(int)}.

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java

+9-9
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.slf4j.Logger;
2525
import org.slf4j.LoggerFactory;
2626

27+
import javax.annotation.Nonnull;
2728
import javax.annotation.concurrent.NotThreadSafe;
2829

2930
import java.io.Serializable;
@@ -69,6 +70,12 @@ public Connection getConnection() {
6970
return connection;
7071
}
7172

73+
@Nonnull
74+
@Override
75+
public Properties getProperties() {
76+
return jdbcOptions.getProperties();
77+
}
78+
7279
@Override
7380
public boolean isConnectionValid() throws SQLException {
7481
return connection != null
@@ -109,17 +116,10 @@ public Connection getOrEstablishConnection() throws SQLException, ClassNotFoundE
109116
return connection;
110117
}
111118
if (jdbcOptions.getDriverName() == null) {
112-
connection =
113-
DriverManager.getConnection(
114-
jdbcOptions.getDbURL(),
115-
jdbcOptions.getUsername().orElse(null),
116-
jdbcOptions.getPassword().orElse(null));
119+
connection = DriverManager.getConnection(jdbcOptions.getDbURL(), getProperties());
117120
} else {
118121
Driver driver = getLoadedDriver();
119-
Properties info = new Properties();
120-
jdbcOptions.getUsername().ifPresent(user -> info.setProperty("user", user));
121-
jdbcOptions.getPassword().ifPresent(password -> info.setProperty("password", password));
122-
connection = driver.connect(jdbcOptions.getDbURL(), info);
122+
connection = driver.connect(jdbcOptions.getDbURL(), getProperties());
123123
if (connection == null) {
124124
// Throw same exception as DriverManager.getConnection when no driver found to match
125125
// caller expectation.

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/InternalJdbcConnectionOptions.java

+26-17
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,14 @@
2222
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
2323
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
2424
import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
25+
import org.apache.flink.util.Preconditions;
2526

27+
import javax.annotation.Nonnull;
2628
import javax.annotation.Nullable;
2729

2830
import java.util.Objects;
2931
import java.util.Optional;
32+
import java.util.Properties;
3033

3134
import static org.apache.flink.util.Preconditions.checkNotNull;
3235

@@ -44,12 +47,11 @@ private InternalJdbcConnectionOptions(
4447
String dbURL,
4548
String tableName,
4649
@Nullable String driverName,
47-
@Nullable String username,
48-
@Nullable String password,
4950
JdbcDialect dialect,
5051
@Nullable Integer parallelism,
51-
int connectionCheckTimeoutSeconds) {
52-
super(dbURL, driverName, username, password, connectionCheckTimeoutSeconds);
52+
int connectionCheckTimeoutSeconds,
53+
@Nonnull Properties properties) {
54+
super(dbURL, driverName, connectionCheckTimeoutSeconds, properties);
5355
this.tableName = tableName;
5456
this.dialect = dialect;
5557
this.parallelism = parallelism;
@@ -78,13 +80,12 @@ public boolean equals(Object o) {
7880
return Objects.equals(url, options.url)
7981
&& Objects.equals(tableName, options.tableName)
8082
&& Objects.equals(driverName, options.driverName)
81-
&& Objects.equals(username, options.username)
82-
&& Objects.equals(password, options.password)
8383
&& Objects.equals(
8484
dialect.getClass().getName(), options.dialect.getClass().getName())
8585
&& Objects.equals(parallelism, options.parallelism)
8686
&& Objects.equals(
87-
connectionCheckTimeoutSeconds, options.connectionCheckTimeoutSeconds);
87+
connectionCheckTimeoutSeconds, options.connectionCheckTimeoutSeconds)
88+
&& Objects.equals(properties, options.properties);
8889
} else {
8990
return false;
9091
}
@@ -96,11 +97,10 @@ public int hashCode() {
9697
url,
9798
tableName,
9899
driverName,
99-
username,
100-
password,
101100
dialect.getClass().getName(),
102101
parallelism,
103-
connectionCheckTimeoutSeconds);
102+
connectionCheckTimeoutSeconds,
103+
properties);
104104
}
105105

106106
/** Builder of {@link InternalJdbcConnectionOptions}. */
@@ -110,11 +110,10 @@ public static class Builder {
110110
private String tableName;
111111
private String driverName;
112112
private String compatibleMode;
113-
private String username;
114-
private String password;
115113
private JdbcDialect dialect;
116114
private Integer parallelism;
117115
private int connectionCheckTimeoutSeconds = 60;
116+
private final Properties properties = new Properties();
118117

119118
/**
120119
* optional, specifies the classloader to use in the planner for load the class in user jar.
@@ -137,13 +136,17 @@ public Builder setTableName(String tableName) {
137136

138137
/** optional, user name. */
139138
public Builder setUsername(String username) {
140-
this.username = username;
139+
if (Objects.nonNull(username)) {
140+
this.properties.put(USER_KEY, username);
141+
}
141142
return this;
142143
}
143144

144145
/** optional, password. */
145146
public Builder setPassword(String password) {
146-
this.password = password;
147+
if (Objects.nonNull(password)) {
148+
this.properties.put(PASSWORD_KEY, password);
149+
}
147150
return this;
148151
}
149152

@@ -188,6 +191,13 @@ public Builder setParallelism(Integer parallelism) {
188191
return this;
189192
}
190193

194+
public Builder setProperty(String propKey, String propVal) {
195+
Preconditions.checkNotNull(propKey, "Connection property key mustn't be null");
196+
Preconditions.checkNotNull(propVal, "Connection property value mustn't be null");
197+
this.properties.put(propKey, propVal);
198+
return this;
199+
}
200+
191201
public InternalJdbcConnectionOptions build() {
192202
checkNotNull(dbURL, "No dbURL supplied.");
193203
checkNotNull(tableName, "No tableName supplied.");
@@ -208,11 +218,10 @@ public InternalJdbcConnectionOptions build() {
208218
dialect.appendDefaultUrlProperties(dbURL),
209219
tableName,
210220
driverName,
211-
username,
212-
password,
213221
dialect,
214222
parallelism,
215-
connectionCheckTimeoutSeconds);
223+
connectionCheckTimeoutSeconds,
224+
properties);
216225
}
217226
}
218227
}

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java

+7
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,13 @@ public JdbcSourceBuilder<OUT> setTypeInformation(
180180

181181
// ------ Optional ------------------------------------------------------------------
182182

183+
public JdbcSourceBuilder<OUT> setConnectionProperty(String propKey, String propVal) {
184+
Preconditions.checkNotNull(propKey, "Connection property key mustn't be null");
185+
Preconditions.checkNotNull(propVal, "Connection property value mustn't be null");
186+
connOptionsBuilder.withProperty(propKey, propVal);
187+
return this;
188+
}
189+
183190
public JdbcSourceBuilder<OUT> setJdbcParameterValuesProvider(
184191
@Nonnull JdbcParameterValuesProvider parameterValuesProvider) {
185192
this.jdbcParameterValuesProvider = Preconditions.checkNotNull(parameterValuesProvider);

flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcConnectionOptionsTest.java

+29
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import org.junit.jupiter.api.Test;
2323

24+
import static org.apache.flink.connector.jdbc.JdbcConnectionOptions.USER_KEY;
25+
import static org.assertj.core.api.Assertions.assertThat;
2426
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2527

2628
/** Tests for {@link JdbcConnectionOptions}. */
@@ -58,4 +60,31 @@ void testInvalidCheckTimeoutSeconds() {
5860
.build())
5961
.isInstanceOf(IllegalArgumentException.class);
6062
}
63+
64+
@Test
65+
void testConnectionProperty() {
66+
// test for null connection properties
67+
JdbcConnectionOptions options =
68+
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
69+
.withUrl(FakeDBUtils.TEST_DB_URL)
70+
.build();
71+
assertThat(options.getProperties()).isEmpty();
72+
73+
// test for useful connection properties
74+
options =
75+
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
76+
.withUrl(FakeDBUtils.TEST_DB_URL)
77+
.withProperty("keyA", "valueA")
78+
.build();
79+
assertThat(options.getProperties()).hasSize(1);
80+
81+
options =
82+
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
83+
.withUrl(FakeDBUtils.TEST_DB_URL)
84+
.withUsername("user")
85+
.withProperty("keyA", "valueA")
86+
.build();
87+
assertThat(options.getProperties()).hasSize(2);
88+
assertThat(options.getProperties()).hasFieldOrPropertyWithValue(USER_KEY, "user");
89+
}
6190
}

0 commit comments

Comments
 (0)