19
19
package org .apache .flink .connector .jdbc .table ;
20
20
21
21
import org .apache .flink .annotation .Internal ;
22
+ import org .apache .flink .api .common .eventtime .WatermarkStrategy ;
23
+ import org .apache .flink .api .connector .source .Boundedness ;
22
24
import org .apache .flink .connector .jdbc .dialect .JdbcDialect ;
23
25
import org .apache .flink .connector .jdbc .internal .options .InternalJdbcConnectionOptions ;
24
26
import org .apache .flink .connector .jdbc .internal .options .JdbcReadOptions ;
27
+ import org .apache .flink .connector .jdbc .source .JdbcSource ;
28
+ import org .apache .flink .connector .jdbc .source .JdbcSourceBuilder ;
25
29
import org .apache .flink .connector .jdbc .split .CompositeJdbcParameterValuesProvider ;
26
30
import org .apache .flink .connector .jdbc .split .JdbcGenericParameterValuesProvider ;
27
31
import org .apache .flink .connector .jdbc .split .JdbcNumericBetweenParametersProvider ;
28
32
import org .apache .flink .connector .jdbc .split .JdbcParameterValuesProvider ;
33
+ import org .apache .flink .streaming .api .datastream .DataStream ;
34
+ import org .apache .flink .streaming .api .datastream .DataStreamSource ;
35
+ import org .apache .flink .streaming .api .environment .StreamExecutionEnvironment ;
29
36
import org .apache .flink .table .connector .ChangelogMode ;
30
37
import org .apache .flink .table .connector .Projection ;
38
+ import org .apache .flink .table .connector .ProviderContext ;
39
+ import org .apache .flink .table .connector .source .DataStreamScanProvider ;
31
40
import org .apache .flink .table .connector .source .DynamicTableSource ;
32
- import org .apache .flink .table .connector .source .InputFormatProvider ;
33
41
import org .apache .flink .table .connector .source .LookupTableSource ;
34
42
import org .apache .flink .table .connector .source .ScanTableSource ;
35
43
import org .apache .flink .table .connector .source .abilities .SupportsFilterPushDown ;
38
46
import org .apache .flink .table .connector .source .lookup .LookupFunctionProvider ;
39
47
import org .apache .flink .table .connector .source .lookup .PartialCachingLookupProvider ;
40
48
import org .apache .flink .table .connector .source .lookup .cache .LookupCache ;
49
+ import org .apache .flink .table .data .RowData ;
41
50
import org .apache .flink .table .expressions .CallExpression ;
42
51
import org .apache .flink .table .expressions .ResolvedExpression ;
43
52
import org .apache .flink .table .types .DataType ;
@@ -68,6 +77,8 @@ public class JdbcDynamicTableSource
68
77
SupportsFilterPushDown {
69
78
private static final Logger LOG = LoggerFactory .getLogger (JdbcDynamicTableSource .class );
70
79
80
+ private static final String JDBC_TRANSFORMATION = "jdbc" ;
81
+
71
82
private final InternalJdbcConnectionOptions options ;
72
83
private final JdbcReadOptions readOptions ;
73
84
private final int lookupMaxRetryTimes ;
@@ -121,17 +132,18 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
121
132
}
122
133
123
134
@ Override
124
- public ScanRuntimeProvider getScanRuntimeProvider (ScanContext runtimeProviderContext ) {
125
- final JdbcRowDataInputFormat .Builder builder =
126
- JdbcRowDataInputFormat .builder ()
127
- .setDrivername (options .getDriverName ())
135
+ public ScanRuntimeProvider getScanRuntimeProvider (ScanContext scanContext ) {
136
+
137
+ final JdbcSourceBuilder <RowData > builder =
138
+ JdbcSource .<RowData >builder ()
139
+ .setDriverName (options .getDriverName ())
128
140
.setDBUrl (options .getDbURL ())
129
141
.setUsername (options .getUsername ().orElse (null ))
130
142
.setPassword (options .getPassword ().orElse (null ))
131
143
.setAutoCommit (readOptions .getAutoCommit ());
132
144
133
145
if (readOptions .getFetchSize () != 0 ) {
134
- builder .setFetchSize (readOptions .getFetchSize ());
146
+ builder .setResultSetFetchSize (readOptions .getFetchSize ());
135
147
}
136
148
final JdbcDialect dialect = options .getDialect ();
137
149
String query =
@@ -153,19 +165,19 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
153
165
.ofBatchNum (numPartitions ),
154
166
new JdbcGenericParameterValuesProvider (allPushdownParams ));
155
167
156
- builder .setParametersProvider (allParams );
168
+ builder .setJdbcParameterValuesProvider (allParams );
157
169
158
170
predicates .add (
159
171
dialect .quoteIdentifier (readOptions .getPartitionColumnName ().get ())
160
172
+ " BETWEEN ? AND ?" );
161
173
} else {
162
- builder .setParametersProvider (
174
+ builder .setJdbcParameterValuesProvider (
163
175
new JdbcGenericParameterValuesProvider (replicatePushdownParamsForN (1 )));
164
176
}
165
177
166
178
predicates .addAll (this .resolvedPredicates );
167
179
168
- if (predicates .size () > 0 ) {
180
+ if (! predicates .isEmpty () ) {
169
181
String joinedConditions =
170
182
predicates .stream ()
171
183
.map (pred -> String .format ("(%s)" , pred ))
@@ -179,13 +191,16 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
179
191
180
192
LOG .debug ("Query generated for JDBC scan: " + query );
181
193
182
- builder .setQuery (query );
194
+ builder .setSql (query );
183
195
final RowType rowType = (RowType ) physicalRowDataType .getLogicalType ();
184
- builder .setRowConverter (dialect .getRowConverter (rowType ));
185
- builder .setRowDataTypeInfo (
186
- runtimeProviderContext .createTypeInformation (physicalRowDataType ));
187
-
188
- return InputFormatProvider .of (builder .build ());
196
+ builder .setResultExtractor (new RowDataResultExtractor (dialect .getRowConverter (rowType )));
197
+ builder .setTypeInformation (scanContext .createTypeInformation (physicalRowDataType ));
198
+ options .getProperties ()
199
+ .forEach (
200
+ (key , value ) ->
201
+ builder .setConnectionProperty (key .toString (), value .toString ()));
202
+ JdbcSource <RowData > source = builder .build ();
203
+ return new JdbcDataStreamScanProvider (source );
189
204
}
190
205
191
206
@ Override
@@ -295,4 +310,30 @@ private Serializable[][] replicatePushdownParamsForN(int n) {
295
310
}
296
311
return allPushdownParams ;
297
312
}
313
+
314
+ private static class JdbcDataStreamScanProvider implements DataStreamScanProvider {
315
+
316
+ private final JdbcSource <RowData > source ;
317
+
318
+ public JdbcDataStreamScanProvider (JdbcSource <RowData > source ) {
319
+ this .source = Preconditions .checkNotNull (source );
320
+ }
321
+
322
+ @ Override
323
+ public DataStream <RowData > produceDataStream (
324
+ ProviderContext providerContext , StreamExecutionEnvironment execEnv ) {
325
+ DataStreamSource <RowData > sourceStream =
326
+ execEnv .fromSource (
327
+ source ,
328
+ WatermarkStrategy .noWatermarks (),
329
+ JdbcDynamicTableSource .class .getSimpleName ());
330
+ providerContext .generateUid (JDBC_TRANSFORMATION ).ifPresent (sourceStream ::uid );
331
+ return sourceStream ;
332
+ }
333
+
334
+ @ Override
335
+ public boolean isBounded () {
336
+ return source .getBoundedness () == Boundedness .BOUNDED ;
337
+ }
338
+ }
298
339
}
0 commit comments