Skip to content

Commit a9ba994

Browse files
committed
[FLINK-33463][Connector/JDBC] Support the implementation of dynamic source tables based on the new source
1 parent 25c2404 commit a9ba994

File tree

5 files changed

+121
-22
lines changed

5 files changed

+121
-22
lines changed

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,6 @@ public JdbcSourceBuilder<OUT> setResultExtractor(ResultExtractor<OUT> resultExtr
143143
}
144144

145145
public JdbcSourceBuilder<OUT> setUsername(String username) {
146-
Preconditions.checkArgument(
147-
!StringUtils.isNullOrWhitespaceOnly(username),
148-
"It's required to set the 'username'.");
149146
connOptionsBuilder.withUsername(username);
150147
return this;
151148
}
@@ -180,6 +177,12 @@ public JdbcSourceBuilder<OUT> setTypeInformation(
180177

181178
// ------ Optional ------------------------------------------------------------------
182179

180+
public JdbcSourceBuilder<OUT> setConnectionCheckTimeoutSeconds(
181+
int connectionCheckTimeoutSeconds) {
182+
connOptionsBuilder.withConnectionCheckTimeoutSeconds(connectionCheckTimeoutSeconds);
183+
return this;
184+
}
185+
183186
public JdbcSourceBuilder<OUT> setConnectionProperty(String propKey, String propVal) {
184187
Preconditions.checkNotNull(propKey, "Connection property key mustn't be null");
185188
Preconditions.checkNotNull(propVal, "Connection property value mustn't be null");

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
126126
getJdbcReadOptions(helper.getOptions()),
127127
helper.getOptions().get(LookupOptions.MAX_RETRIES),
128128
getLookupCache(config),
129-
context.getPhysicalRowDataType());
129+
context.getPhysicalRowDataType(),
130+
context.getObjectIdentifier().asSummaryString());
130131
}
131132

132133
private static void validateDataTypeWithJdbcDialect(

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java

+71-16
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,26 @@
1919
package org.apache.flink.connector.jdbc.table;
2020

2121
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.annotation.VisibleForTesting;
23+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
24+
import org.apache.flink.api.connector.source.Boundedness;
2225
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
2326
import org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
2427
import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
28+
import org.apache.flink.connector.jdbc.source.JdbcSource;
29+
import org.apache.flink.connector.jdbc.source.JdbcSourceBuilder;
2530
import org.apache.flink.connector.jdbc.split.CompositeJdbcParameterValuesProvider;
2631
import org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider;
2732
import org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider;
2833
import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
34+
import org.apache.flink.streaming.api.datastream.DataStream;
35+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
36+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2937
import org.apache.flink.table.connector.ChangelogMode;
3038
import org.apache.flink.table.connector.Projection;
39+
import org.apache.flink.table.connector.ProviderContext;
40+
import org.apache.flink.table.connector.source.DataStreamScanProvider;
3141
import org.apache.flink.table.connector.source.DynamicTableSource;
32-
import org.apache.flink.table.connector.source.InputFormatProvider;
3342
import org.apache.flink.table.connector.source.LookupTableSource;
3443
import org.apache.flink.table.connector.source.ScanTableSource;
3544
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
@@ -38,6 +47,7 @@
3847
import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
3948
import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
4049
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
50+
import org.apache.flink.table.data.RowData;
4151
import org.apache.flink.table.expressions.CallExpression;
4252
import org.apache.flink.table.expressions.ResolvedExpression;
4353
import org.apache.flink.table.types.DataType;
@@ -68,6 +78,8 @@ public class JdbcDynamicTableSource
6878
SupportsFilterPushDown {
6979
private static final Logger LOG = LoggerFactory.getLogger(JdbcDynamicTableSource.class);
7080

81+
private static final String JDBC_TRANSFORMATION = "jdbc";
82+
7183
private final InternalJdbcConnectionOptions options;
7284
private final JdbcReadOptions readOptions;
7385
private final int lookupMaxRetryTimes;
@@ -77,19 +89,33 @@ public class JdbcDynamicTableSource
7789
private long limit = -1;
7890
private List<String> resolvedPredicates = new ArrayList<>();
7991
private Serializable[] pushdownParams = new Serializable[0];
92+
// The Nullable for iterative.
93+
@Nullable protected final String tableIdentifier;
8094

95+
@VisibleForTesting
8196
public JdbcDynamicTableSource(
8297
InternalJdbcConnectionOptions options,
8398
JdbcReadOptions readOptions,
8499
int lookupMaxRetryTimes,
85100
@Nullable LookupCache cache,
86101
DataType physicalRowDataType) {
102+
this(options, readOptions, lookupMaxRetryTimes, cache, physicalRowDataType, null);
103+
}
104+
105+
public JdbcDynamicTableSource(
106+
InternalJdbcConnectionOptions options,
107+
JdbcReadOptions readOptions,
108+
int lookupMaxRetryTimes,
109+
@Nullable LookupCache cache,
110+
DataType physicalRowDataType,
111+
String tableIdentifier) {
87112
this.options = options;
88113
this.readOptions = readOptions;
89114
this.lookupMaxRetryTimes = lookupMaxRetryTimes;
90115
this.cache = cache;
91116
this.physicalRowDataType = physicalRowDataType;
92117
this.dialectName = options.getDialect().dialectName();
118+
this.tableIdentifier = tableIdentifier;
93119
}
94120

95121
@Override
@@ -121,17 +147,18 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
121147
}
122148

123149
@Override
124-
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
125-
final JdbcRowDataInputFormat.Builder builder =
126-
JdbcRowDataInputFormat.builder()
127-
.setDrivername(options.getDriverName())
150+
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
151+
152+
final JdbcSourceBuilder<RowData> builder =
153+
JdbcSource.<RowData>builder()
154+
.setDriverName(options.getDriverName())
128155
.setDBUrl(options.getDbURL())
129156
.setUsername(options.getUsername().orElse(null))
130157
.setPassword(options.getPassword().orElse(null))
131158
.setAutoCommit(readOptions.getAutoCommit());
132159

133160
if (readOptions.getFetchSize() != 0) {
134-
builder.setFetchSize(readOptions.getFetchSize());
161+
builder.setResultSetFetchSize(readOptions.getFetchSize());
135162
}
136163
final JdbcDialect dialect = options.getDialect();
137164
String query =
@@ -153,13 +180,13 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
153180
.ofBatchNum(numPartitions),
154181
new JdbcGenericParameterValuesProvider(allPushdownParams));
155182

156-
builder.setParametersProvider(allParams);
183+
builder.setJdbcParameterValuesProvider(allParams);
157184

158185
predicates.add(
159186
dialect.quoteIdentifier(readOptions.getPartitionColumnName().get())
160187
+ " BETWEEN ? AND ?");
161188
} else {
162-
builder.setParametersProvider(
189+
builder.setJdbcParameterValuesProvider(
163190
new JdbcGenericParameterValuesProvider(replicatePushdownParamsForN(1)));
164191
}
165192

@@ -179,13 +206,34 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
179206

180207
LOG.debug("Query generated for JDBC scan: " + query);
181208

182-
builder.setQuery(query);
209+
builder.setSql(query);
183210
final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
184-
builder.setRowConverter(dialect.getRowConverter(rowType));
185-
builder.setRowDataTypeInfo(
186-
runtimeProviderContext.createTypeInformation(physicalRowDataType));
211+
builder.setResultExtractor(new RowDataResultExtractor(dialect.getRowConverter(rowType)));
212+
builder.setTypeInformation(scanContext.createTypeInformation(physicalRowDataType));
213+
options.getProperties()
214+
.forEach(
215+
(key, value) ->
216+
builder.setConnectionProperty(key.toString(), value.toString()));
217+
JdbcSource<RowData> source = builder.build();
218+
return new DataStreamScanProvider() {
219+
@Override
220+
public DataStream<RowData> produceDataStream(
221+
ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
222+
String sourceName =
223+
Objects.isNull(tableIdentifier)
224+
? "JdbcSource"
225+
: "JdbcSource-" + tableIdentifier;
226+
DataStreamSource<RowData> sourceStream =
227+
execEnv.fromSource(source, WatermarkStrategy.noWatermarks(), sourceName);
228+
providerContext.generateUid(JDBC_TRANSFORMATION).ifPresent(sourceStream::uid);
229+
return sourceStream;
230+
}
187231

188-
return InputFormatProvider.of(builder.build());
232+
@Override
233+
public boolean isBounded() {
234+
return source.getBoundedness() == Boundedness.BOUNDED;
235+
}
236+
};
189237
}
190238

191239
@Override
@@ -208,7 +256,12 @@ public void applyProjection(int[][] projectedFields, DataType producedDataType)
208256
public DynamicTableSource copy() {
209257
JdbcDynamicTableSource newSource =
210258
new JdbcDynamicTableSource(
211-
options, readOptions, lookupMaxRetryTimes, cache, physicalRowDataType);
259+
options,
260+
readOptions,
261+
lookupMaxRetryTimes,
262+
cache,
263+
physicalRowDataType,
264+
tableIdentifier);
212265
newSource.resolvedPredicates = new ArrayList<>(this.resolvedPredicates);
213266
newSource.pushdownParams = Arrays.copyOf(this.pushdownParams, this.pushdownParams.length);
214267
return newSource;
@@ -236,7 +289,8 @@ public boolean equals(Object o) {
236289
&& Objects.equals(dialectName, that.dialectName)
237290
&& Objects.equals(limit, that.limit)
238291
&& Objects.equals(resolvedPredicates, that.resolvedPredicates)
239-
&& Arrays.deepEquals(pushdownParams, that.pushdownParams);
292+
&& Arrays.deepEquals(pushdownParams, that.pushdownParams)
293+
&& Objects.equals(tableIdentifier, that.tableIdentifier);
240294
}
241295

242296
@Override
@@ -250,7 +304,8 @@ public int hashCode() {
250304
dialectName,
251305
limit,
252306
resolvedPredicates,
253-
pushdownParams);
307+
pushdownParams,
308+
tableIdentifier);
254309
}
255310

256311
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.jdbc.table;
20+
21+
import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
22+
import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
23+
import org.apache.flink.table.data.RowData;
24+
import org.apache.flink.util.Preconditions;
25+
26+
import java.sql.ResultSet;
27+
import java.sql.SQLException;
28+
29+
/** The result extractor for {@link RowData}. */
30+
public class RowDataResultExtractor implements ResultExtractor<RowData> {
31+
32+
private final JdbcRowConverter jdbcRowConverter;
33+
34+
public RowDataResultExtractor(JdbcRowConverter jdbcRowConverter) {
35+
this.jdbcRowConverter = Preconditions.checkNotNull(jdbcRowConverter);
36+
}
37+
38+
@Override
39+
public RowData extract(ResultSet resultSet) throws SQLException {
40+
return jdbcRowConverter.toInternal(resultSet);
41+
}
42+
}

flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilderTest.java

-2
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,6 @@ void testSetResultSetFetchSize() {
116116
void testSetConnectionInfo() {
117117
assertThatThrownBy(() -> JdbcSource.builder().setDriverName(""))
118118
.isInstanceOf(IllegalArgumentException.class);
119-
assertThatThrownBy(() -> JdbcSource.builder().setUsername(""))
120-
.isInstanceOf(IllegalArgumentException.class);
121119
assertThatThrownBy(() -> JdbcSource.builder().setDBUrl(""))
122120
.isInstanceOf(IllegalArgumentException.class);
123121
}

0 commit comments

Comments
 (0)