Skip to content

Commit db7d938

Browse files
committed
[FLINK-33463][Connector/JDBC] Support the implementation of dynamic source tables based on the new source
1 parent 0b8be88 commit db7d938

File tree

6 files changed

+131
-27
lines changed

6 files changed

+131
-27
lines changed

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,6 @@ public JdbcSourceBuilder<OUT> setResultExtractor(ResultExtractor<OUT> resultExtr
143143
}
144144

145145
public JdbcSourceBuilder<OUT> setUsername(String username) {
146-
Preconditions.checkArgument(
147-
!StringUtils.isNullOrWhitespaceOnly(username),
148-
"It's required to set the 'username'.");
149146
connOptionsBuilder.withUsername(username);
150147
return this;
151148
}
@@ -180,6 +177,12 @@ public JdbcSourceBuilder<OUT> setTypeInformation(
180177

181178
// ------ Optional ------------------------------------------------------------------
182179

180+
public JdbcSourceBuilder<OUT> setConnectionCheckTimeoutSeconds(
181+
int connectionCheckTimeoutSeconds) {
182+
connOptionsBuilder.withConnectionCheckTimeoutSeconds(connectionCheckTimeoutSeconds);
183+
return this;
184+
}
185+
183186
public JdbcSourceBuilder<OUT> setConnectionProperty(String propKey, String propVal) {
184187
Preconditions.checkNotNull(propKey, "Connection property key mustn't be null");
185188
Preconditions.checkNotNull(propVal, "Connection property value mustn't be null");

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
126126
getJdbcReadOptions(helper.getOptions()),
127127
helper.getOptions().get(LookupOptions.MAX_RETRIES),
128128
getLookupCache(config),
129-
context.getPhysicalRowDataType());
129+
context.getPhysicalRowDataType(),
130+
context.getObjectIdentifier().asSummaryString());
130131
}
131132

132133
private static void validateDataTypeWithJdbcDialect(

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java

+70-16
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,25 @@
1919
package org.apache.flink.connector.jdbc.table;
2020

2121
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
23+
import org.apache.flink.api.connector.source.Boundedness;
2224
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
2325
import org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
2426
import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
27+
import org.apache.flink.connector.jdbc.source.JdbcSource;
28+
import org.apache.flink.connector.jdbc.source.JdbcSourceBuilder;
2529
import org.apache.flink.connector.jdbc.split.CompositeJdbcParameterValuesProvider;
2630
import org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider;
2731
import org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider;
2832
import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
33+
import org.apache.flink.streaming.api.datastream.DataStream;
34+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
35+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2936
import org.apache.flink.table.connector.ChangelogMode;
3037
import org.apache.flink.table.connector.Projection;
38+
import org.apache.flink.table.connector.ProviderContext;
39+
import org.apache.flink.table.connector.source.DataStreamScanProvider;
3140
import org.apache.flink.table.connector.source.DynamicTableSource;
32-
import org.apache.flink.table.connector.source.InputFormatProvider;
3341
import org.apache.flink.table.connector.source.LookupTableSource;
3442
import org.apache.flink.table.connector.source.ScanTableSource;
3543
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
@@ -38,6 +46,7 @@
3846
import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
3947
import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
4048
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
49+
import org.apache.flink.table.data.RowData;
4150
import org.apache.flink.table.expressions.CallExpression;
4251
import org.apache.flink.table.expressions.ResolvedExpression;
4352
import org.apache.flink.table.types.DataType;
@@ -68,6 +77,8 @@ public class JdbcDynamicTableSource
6877
SupportsFilterPushDown {
6978
private static final Logger LOG = LoggerFactory.getLogger(JdbcDynamicTableSource.class);
7079

80+
private static final String JDBC_TRANSFORMATION = "jdbc";
81+
7182
private final InternalJdbcConnectionOptions options;
7283
private final JdbcReadOptions readOptions;
7384
private final int lookupMaxRetryTimes;
@@ -77,19 +88,33 @@ public class JdbcDynamicTableSource
7788
private long limit = -1;
7889
private List<String> resolvedPredicates = new ArrayList<>();
7990
private Serializable[] pushdownParams = new Serializable[0];
91+
// The Nullable for iterative.
92+
@Nullable protected final String tableIdentifier;
8093

94+
@Deprecated
8195
public JdbcDynamicTableSource(
8296
InternalJdbcConnectionOptions options,
8397
JdbcReadOptions readOptions,
8498
int lookupMaxRetryTimes,
8599
@Nullable LookupCache cache,
86100
DataType physicalRowDataType) {
101+
this(options, readOptions, lookupMaxRetryTimes, cache, physicalRowDataType, null);
102+
}
103+
104+
public JdbcDynamicTableSource(
105+
InternalJdbcConnectionOptions options,
106+
JdbcReadOptions readOptions,
107+
int lookupMaxRetryTimes,
108+
@Nullable LookupCache cache,
109+
DataType physicalRowDataType,
110+
String tableIdentifier) {
87111
this.options = options;
88112
this.readOptions = readOptions;
89113
this.lookupMaxRetryTimes = lookupMaxRetryTimes;
90114
this.cache = cache;
91115
this.physicalRowDataType = physicalRowDataType;
92116
this.dialectName = options.getDialect().dialectName();
117+
this.tableIdentifier = tableIdentifier;
93118
}
94119

95120
@Override
@@ -121,17 +146,18 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
121146
}
122147

123148
@Override
124-
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
125-
final JdbcRowDataInputFormat.Builder builder =
126-
JdbcRowDataInputFormat.builder()
127-
.setDrivername(options.getDriverName())
149+
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
150+
151+
final JdbcSourceBuilder<RowData> builder =
152+
JdbcSource.<RowData>builder()
153+
.setDriverName(options.getDriverName())
128154
.setDBUrl(options.getDbURL())
129155
.setUsername(options.getUsername().orElse(null))
130156
.setPassword(options.getPassword().orElse(null))
131157
.setAutoCommit(readOptions.getAutoCommit());
132158

133159
if (readOptions.getFetchSize() != 0) {
134-
builder.setFetchSize(readOptions.getFetchSize());
160+
builder.setResultSetFetchSize(readOptions.getFetchSize());
135161
}
136162
final JdbcDialect dialect = options.getDialect();
137163
String query =
@@ -153,13 +179,13 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
153179
.ofBatchNum(numPartitions),
154180
new JdbcGenericParameterValuesProvider(allPushdownParams));
155181

156-
builder.setParametersProvider(allParams);
182+
builder.setJdbcParameterValuesProvider(allParams);
157183

158184
predicates.add(
159185
dialect.quoteIdentifier(readOptions.getPartitionColumnName().get())
160186
+ " BETWEEN ? AND ?");
161187
} else {
162-
builder.setParametersProvider(
188+
builder.setJdbcParameterValuesProvider(
163189
new JdbcGenericParameterValuesProvider(replicatePushdownParamsForN(1)));
164190
}
165191

@@ -179,13 +205,34 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
179205

180206
LOG.debug("Query generated for JDBC scan: " + query);
181207

182-
builder.setQuery(query);
208+
builder.setSql(query);
183209
final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
184-
builder.setRowConverter(dialect.getRowConverter(rowType));
185-
builder.setRowDataTypeInfo(
186-
runtimeProviderContext.createTypeInformation(physicalRowDataType));
210+
builder.setResultExtractor(new RowDataResultExtractor(dialect.getRowConverter(rowType)));
211+
builder.setTypeInformation(scanContext.createTypeInformation(physicalRowDataType));
212+
options.getProperties()
213+
.forEach(
214+
(key, value) ->
215+
builder.setConnectionProperty(key.toString(), value.toString()));
216+
JdbcSource<RowData> source = builder.build();
217+
return new DataStreamScanProvider() {
218+
@Override
219+
public DataStream<RowData> produceDataStream(
220+
ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
221+
String sourceName =
222+
Objects.isNull(tableIdentifier)
223+
? "JdbcSource"
224+
: "JdbcSource-" + tableIdentifier;
225+
DataStreamSource<RowData> sourceStream =
226+
execEnv.fromSource(source, WatermarkStrategy.noWatermarks(), sourceName);
227+
providerContext.generateUid(JDBC_TRANSFORMATION).ifPresent(sourceStream::uid);
228+
return sourceStream;
229+
}
187230

188-
return InputFormatProvider.of(builder.build());
231+
@Override
232+
public boolean isBounded() {
233+
return source.getBoundedness() == Boundedness.BOUNDED;
234+
}
235+
};
189236
}
190237

191238
@Override
@@ -208,7 +255,12 @@ public void applyProjection(int[][] projectedFields, DataType producedDataType)
208255
public DynamicTableSource copy() {
209256
JdbcDynamicTableSource newSource =
210257
new JdbcDynamicTableSource(
211-
options, readOptions, lookupMaxRetryTimes, cache, physicalRowDataType);
258+
options,
259+
readOptions,
260+
lookupMaxRetryTimes,
261+
cache,
262+
physicalRowDataType,
263+
tableIdentifier);
212264
newSource.resolvedPredicates = new ArrayList<>(this.resolvedPredicates);
213265
newSource.pushdownParams = Arrays.copyOf(this.pushdownParams, this.pushdownParams.length);
214266
return newSource;
@@ -236,7 +288,8 @@ public boolean equals(Object o) {
236288
&& Objects.equals(dialectName, that.dialectName)
237289
&& Objects.equals(limit, that.limit)
238290
&& Objects.equals(resolvedPredicates, that.resolvedPredicates)
239-
&& Arrays.deepEquals(pushdownParams, that.pushdownParams);
291+
&& Arrays.deepEquals(pushdownParams, that.pushdownParams)
292+
&& Objects.equals(tableIdentifier, that.tableIdentifier);
240293
}
241294

242295
@Override
@@ -250,7 +303,8 @@ public int hashCode() {
250303
dialectName,
251304
limit,
252305
resolvedPredicates,
253-
pushdownParams);
306+
pushdownParams,
307+
tableIdentifier);
254308
}
255309

256310
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.jdbc.table;
20+
21+
import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
22+
import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
23+
import org.apache.flink.table.data.RowData;
24+
import org.apache.flink.util.Preconditions;
25+
26+
import java.sql.ResultSet;
27+
import java.sql.SQLException;
28+
29+
/** The result extractor for {@link RowData}. */
30+
public class RowDataResultExtractor implements ResultExtractor<RowData> {
31+
32+
private final JdbcRowConverter jdbcRowConverter;
33+
34+
public RowDataResultExtractor(JdbcRowConverter jdbcRowConverter) {
35+
this.jdbcRowConverter = Preconditions.checkNotNull(jdbcRowConverter);
36+
}
37+
38+
@Override
39+
public RowData extract(ResultSet resultSet) throws SQLException {
40+
return jdbcRowConverter.toInternal(resultSet);
41+
}
42+
}

flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilderTest.java

-2
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,6 @@ void testSetResultSetFetchSize() {
116116
void testSetConnectionInfo() {
117117
assertThatThrownBy(() -> JdbcSource.builder().setDriverName(""))
118118
.isInstanceOf(IllegalArgumentException.class);
119-
assertThatThrownBy(() -> JdbcSource.builder().setUsername(""))
120-
.isInstanceOf(IllegalArgumentException.class);
121119
assertThatThrownBy(() -> JdbcSource.builder().setDBUrl(""))
122120
.isInstanceOf(IllegalArgumentException.class);
123121
}

flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java

+11-5
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.flink.table.connector.source.DynamicTableSource;
3232
import org.apache.flink.table.connector.source.lookup.LookupOptions;
3333
import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
34+
import org.apache.flink.table.factories.utils.FactoryMocks;
3435

3536
import org.junit.jupiter.api.Test;
3637

@@ -87,7 +88,8 @@ void testJdbcCommonProperties() {
8788
JdbcReadOptions.builder().build(),
8889
LookupOptions.MAX_RETRIES.defaultValue(),
8990
null,
90-
SCHEMA.toPhysicalRowDataType());
91+
SCHEMA.toPhysicalRowDataType(),
92+
FactoryMocks.IDENTIFIER.asSummaryString());
9193
assertThat(actualSource).isEqualTo(expectedSource);
9294

9395
// validation for sink
@@ -144,7 +146,8 @@ void testJdbcReadProperties() {
144146
readOptions,
145147
LookupOptions.MAX_RETRIES.defaultValue(),
146148
null,
147-
SCHEMA.toPhysicalRowDataType());
149+
SCHEMA.toPhysicalRowDataType(),
150+
FactoryMocks.IDENTIFIER.asSummaryString());
148151

149152
assertThat(actual).isEqualTo(expected);
150153
}
@@ -172,7 +175,8 @@ void testJdbcLookupProperties() {
172175
JdbcReadOptions.builder().build(),
173176
10,
174177
DefaultLookupCache.fromConfig(Configuration.fromMap(properties)),
175-
SCHEMA.toPhysicalRowDataType());
178+
SCHEMA.toPhysicalRowDataType(),
179+
FactoryMocks.IDENTIFIER.asSummaryString());
176180

177181
assertThat(actual).isEqualTo(expected);
178182
}
@@ -200,7 +204,8 @@ void testJdbcLookupPropertiesWithLegacyOptions() {
200204
.maximumSize(1000L)
201205
.expireAfterWrite(Duration.ofSeconds(10))
202206
.build(),
203-
SCHEMA.toPhysicalRowDataType());
207+
SCHEMA.toPhysicalRowDataType(),
208+
FactoryMocks.IDENTIFIER.asSummaryString());
204209

205210
assertThat(actual).isEqualTo(expected);
206211
}
@@ -385,7 +390,8 @@ void testJdbcLookupPropertiesWithExcludeEmptyResult() {
385390
.maximumSize(1000L)
386391
.expireAfterWrite(Duration.ofSeconds(10))
387392
.build(),
388-
SCHEMA.toPhysicalRowDataType());
393+
SCHEMA.toPhysicalRowDataType(),
394+
FactoryMocks.IDENTIFIER.asSummaryString());
389395

390396
assertThat(actual).isEqualTo(expected);
391397
}

0 commit comments

Comments
 (0)