Skip to content

Commit 32f950a

Browse files
committed
[FLINK-33463][Connector/JDBC] Support the implementation of dynamic source tables based on the new source
1 parent 0b8be88 commit 32f950a

File tree

4 files changed

+104
-20
lines changed

4 files changed

+104
-20
lines changed

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,6 @@ public JdbcSourceBuilder<OUT> setResultExtractor(ResultExtractor<OUT> resultExtr
143143
}
144144

145145
public JdbcSourceBuilder<OUT> setUsername(String username) {
146-
Preconditions.checkArgument(
147-
!StringUtils.isNullOrWhitespaceOnly(username),
148-
"It's required to set the 'username'.");
149146
connOptionsBuilder.withUsername(username);
150147
return this;
151148
}
@@ -180,6 +177,12 @@ public JdbcSourceBuilder<OUT> setTypeInformation(
180177

181178
// ------ Optional ------------------------------------------------------------------
182179

180+
public JdbcSourceBuilder<OUT> setConnectionCheckTimeoutSeconds(
181+
int connectionCheckTimeoutSeconds) {
182+
connOptionsBuilder.withConnectionCheckTimeoutSeconds(connectionCheckTimeoutSeconds);
183+
return this;
184+
}
185+
183186
public JdbcSourceBuilder<OUT> setConnectionProperty(String propKey, String propVal) {
184187
Preconditions.checkNotNull(propKey, "Connection property key mustn't be null");
185188
Preconditions.checkNotNull(propVal, "Connection property value mustn't be null");

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java

+56-15
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,25 @@
1919
package org.apache.flink.connector.jdbc.table;
2020

2121
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
23+
import org.apache.flink.api.connector.source.Boundedness;
2224
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
2325
import org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
2426
import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
27+
import org.apache.flink.connector.jdbc.source.JdbcSource;
28+
import org.apache.flink.connector.jdbc.source.JdbcSourceBuilder;
2529
import org.apache.flink.connector.jdbc.split.CompositeJdbcParameterValuesProvider;
2630
import org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider;
2731
import org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider;
2832
import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
33+
import org.apache.flink.streaming.api.datastream.DataStream;
34+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
35+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2936
import org.apache.flink.table.connector.ChangelogMode;
3037
import org.apache.flink.table.connector.Projection;
38+
import org.apache.flink.table.connector.ProviderContext;
39+
import org.apache.flink.table.connector.source.DataStreamScanProvider;
3140
import org.apache.flink.table.connector.source.DynamicTableSource;
32-
import org.apache.flink.table.connector.source.InputFormatProvider;
3341
import org.apache.flink.table.connector.source.LookupTableSource;
3442
import org.apache.flink.table.connector.source.ScanTableSource;
3543
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
@@ -38,6 +46,7 @@
3846
import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
3947
import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
4048
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
49+
import org.apache.flink.table.data.RowData;
4150
import org.apache.flink.table.expressions.CallExpression;
4251
import org.apache.flink.table.expressions.ResolvedExpression;
4352
import org.apache.flink.table.types.DataType;
@@ -68,6 +77,8 @@ public class JdbcDynamicTableSource
6877
SupportsFilterPushDown {
6978
private static final Logger LOG = LoggerFactory.getLogger(JdbcDynamicTableSource.class);
7079

80+
private static final String JDBC_TRANSFORMATION = "jdbc";
81+
7182
private final InternalJdbcConnectionOptions options;
7283
private final JdbcReadOptions readOptions;
7384
private final int lookupMaxRetryTimes;
@@ -121,17 +132,18 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
121132
}
122133

123134
@Override
124-
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
125-
final JdbcRowDataInputFormat.Builder builder =
126-
JdbcRowDataInputFormat.builder()
127-
.setDrivername(options.getDriverName())
135+
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
136+
137+
final JdbcSourceBuilder<RowData> builder =
138+
JdbcSource.<RowData>builder()
139+
.setDriverName(options.getDriverName())
128140
.setDBUrl(options.getDbURL())
129141
.setUsername(options.getUsername().orElse(null))
130142
.setPassword(options.getPassword().orElse(null))
131143
.setAutoCommit(readOptions.getAutoCommit());
132144

133145
if (readOptions.getFetchSize() != 0) {
134-
builder.setFetchSize(readOptions.getFetchSize());
146+
builder.setResultSetFetchSize(readOptions.getFetchSize());
135147
}
136148
final JdbcDialect dialect = options.getDialect();
137149
String query =
@@ -153,19 +165,19 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
153165
.ofBatchNum(numPartitions),
154166
new JdbcGenericParameterValuesProvider(allPushdownParams));
155167

156-
builder.setParametersProvider(allParams);
168+
builder.setJdbcParameterValuesProvider(allParams);
157169

158170
predicates.add(
159171
dialect.quoteIdentifier(readOptions.getPartitionColumnName().get())
160172
+ " BETWEEN ? AND ?");
161173
} else {
162-
builder.setParametersProvider(
174+
builder.setJdbcParameterValuesProvider(
163175
new JdbcGenericParameterValuesProvider(replicatePushdownParamsForN(1)));
164176
}
165177

166178
predicates.addAll(this.resolvedPredicates);
167179

168-
if (predicates.size() > 0) {
180+
if (!predicates.isEmpty()) {
169181
String joinedConditions =
170182
predicates.stream()
171183
.map(pred -> String.format("(%s)", pred))
@@ -179,13 +191,16 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
179191

180192
LOG.debug("Query generated for JDBC scan: " + query);
181193

182-
builder.setQuery(query);
194+
builder.setSql(query);
183195
final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
184-
builder.setRowConverter(dialect.getRowConverter(rowType));
185-
builder.setRowDataTypeInfo(
186-
runtimeProviderContext.createTypeInformation(physicalRowDataType));
187-
188-
return InputFormatProvider.of(builder.build());
196+
builder.setResultExtractor(new RowDataResultExtractor(dialect.getRowConverter(rowType)));
197+
builder.setTypeInformation(scanContext.createTypeInformation(physicalRowDataType));
198+
options.getProperties()
199+
.forEach(
200+
(key, value) ->
201+
builder.setConnectionProperty(key.toString(), value.toString()));
202+
JdbcSource<RowData> source = builder.build();
203+
return new JdbcDataStreamScanProvider(source);
189204
}
190205

191206
@Override
@@ -295,4 +310,30 @@ private Serializable[][] replicatePushdownParamsForN(int n) {
295310
}
296311
return allPushdownParams;
297312
}
313+
314+
private static class JdbcDataStreamScanProvider implements DataStreamScanProvider {
315+
316+
private final JdbcSource<RowData> source;
317+
318+
public JdbcDataStreamScanProvider(JdbcSource<RowData> source) {
319+
this.source = Preconditions.checkNotNull(source);
320+
}
321+
322+
@Override
323+
public DataStream<RowData> produceDataStream(
324+
ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
325+
DataStreamSource<RowData> sourceStream =
326+
execEnv.fromSource(
327+
source,
328+
WatermarkStrategy.noWatermarks(),
329+
JdbcDynamicTableSource.class.getSimpleName());
330+
providerContext.generateUid(JDBC_TRANSFORMATION).ifPresent(sourceStream::uid);
331+
return sourceStream;
332+
}
333+
334+
@Override
335+
public boolean isBounded() {
336+
return source.getBoundedness() == Boundedness.BOUNDED;
337+
}
338+
}
298339
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.jdbc.table;
20+
21+
import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
22+
import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
23+
import org.apache.flink.table.data.RowData;
24+
import org.apache.flink.util.Preconditions;
25+
26+
import java.sql.ResultSet;
27+
import java.sql.SQLException;
28+
29+
/** The result extractor for {@link RowData}. */
30+
public class RowDataResultExtractor implements ResultExtractor<RowData> {
31+
32+
private final JdbcRowConverter jdbcRowConverter;
33+
34+
public RowDataResultExtractor(JdbcRowConverter jdbcRowConverter) {
35+
this.jdbcRowConverter = Preconditions.checkNotNull(jdbcRowConverter);
36+
}
37+
38+
@Override
39+
public RowData extract(ResultSet resultSet) throws SQLException {
40+
return jdbcRowConverter.toInternal(resultSet);
41+
}
42+
}

flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilderTest.java

-2
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,6 @@ void testSetResultSetFetchSize() {
116116
void testSetConnectionInfo() {
117117
assertThatThrownBy(() -> JdbcSource.builder().setDriverName(""))
118118
.isInstanceOf(IllegalArgumentException.class);
119-
assertThatThrownBy(() -> JdbcSource.builder().setUsername(""))
120-
.isInstanceOf(IllegalArgumentException.class);
121119
assertThatThrownBy(() -> JdbcSource.builder().setDBUrl(""))
122120
.isInstanceOf(IllegalArgumentException.class);
123121
}

0 commit comments

Comments
 (0)