19
19
package org .apache .flink .connector .jdbc .table ;
20
20
21
21
import org .apache .flink .annotation .Internal ;
22
+ import org .apache .flink .api .common .eventtime .WatermarkStrategy ;
23
+ import org .apache .flink .api .connector .source .Boundedness ;
22
24
import org .apache .flink .connector .jdbc .dialect .JdbcDialect ;
23
25
import org .apache .flink .connector .jdbc .internal .options .InternalJdbcConnectionOptions ;
24
26
import org .apache .flink .connector .jdbc .internal .options .JdbcReadOptions ;
27
+ import org .apache .flink .connector .jdbc .source .JdbcSource ;
28
+ import org .apache .flink .connector .jdbc .source .JdbcSourceBuilder ;
25
29
import org .apache .flink .connector .jdbc .split .CompositeJdbcParameterValuesProvider ;
26
30
import org .apache .flink .connector .jdbc .split .JdbcGenericParameterValuesProvider ;
27
31
import org .apache .flink .connector .jdbc .split .JdbcNumericBetweenParametersProvider ;
28
32
import org .apache .flink .connector .jdbc .split .JdbcParameterValuesProvider ;
33
+ import org .apache .flink .streaming .api .datastream .DataStream ;
34
+ import org .apache .flink .streaming .api .datastream .DataStreamSource ;
35
+ import org .apache .flink .streaming .api .environment .StreamExecutionEnvironment ;
29
36
import org .apache .flink .table .connector .ChangelogMode ;
30
37
import org .apache .flink .table .connector .Projection ;
38
+ import org .apache .flink .table .connector .ProviderContext ;
39
+ import org .apache .flink .table .connector .source .DataStreamScanProvider ;
31
40
import org .apache .flink .table .connector .source .DynamicTableSource ;
32
- import org .apache .flink .table .connector .source .InputFormatProvider ;
33
41
import org .apache .flink .table .connector .source .LookupTableSource ;
34
42
import org .apache .flink .table .connector .source .ScanTableSource ;
35
43
import org .apache .flink .table .connector .source .abilities .SupportsFilterPushDown ;
38
46
import org .apache .flink .table .connector .source .lookup .LookupFunctionProvider ;
39
47
import org .apache .flink .table .connector .source .lookup .PartialCachingLookupProvider ;
40
48
import org .apache .flink .table .connector .source .lookup .cache .LookupCache ;
49
+ import org .apache .flink .table .data .RowData ;
41
50
import org .apache .flink .table .expressions .CallExpression ;
42
51
import org .apache .flink .table .expressions .ResolvedExpression ;
43
52
import org .apache .flink .table .types .DataType ;
@@ -68,6 +77,8 @@ public class JdbcDynamicTableSource
68
77
SupportsFilterPushDown {
69
78
private static final Logger LOG = LoggerFactory .getLogger (JdbcDynamicTableSource .class );
70
79
80
+ private static final String JDBC_TRANSFORMATION = "jdbc" ;
81
+
71
82
private final InternalJdbcConnectionOptions options ;
72
83
private final JdbcReadOptions readOptions ;
73
84
private final int lookupMaxRetryTimes ;
@@ -77,19 +88,33 @@ public class JdbcDynamicTableSource
77
88
private long limit = -1 ;
78
89
private List <String > resolvedPredicates = new ArrayList <>();
79
90
private Serializable [] pushdownParams = new Serializable [0 ];
91
+ // The Nullable for iterative.
92
+ @ Nullable protected final String tableIdentifier ;
80
93
94
+ @ Deprecated
81
95
public JdbcDynamicTableSource (
82
96
InternalJdbcConnectionOptions options ,
83
97
JdbcReadOptions readOptions ,
84
98
int lookupMaxRetryTimes ,
85
99
@ Nullable LookupCache cache ,
86
100
DataType physicalRowDataType ) {
101
+ this (options , readOptions , lookupMaxRetryTimes , cache , physicalRowDataType , null );
102
+ }
103
+
104
+ public JdbcDynamicTableSource (
105
+ InternalJdbcConnectionOptions options ,
106
+ JdbcReadOptions readOptions ,
107
+ int lookupMaxRetryTimes ,
108
+ @ Nullable LookupCache cache ,
109
+ DataType physicalRowDataType ,
110
+ String tableIdentifier ) {
87
111
this .options = options ;
88
112
this .readOptions = readOptions ;
89
113
this .lookupMaxRetryTimes = lookupMaxRetryTimes ;
90
114
this .cache = cache ;
91
115
this .physicalRowDataType = physicalRowDataType ;
92
116
this .dialectName = options .getDialect ().dialectName ();
117
+ this .tableIdentifier = tableIdentifier ;
93
118
}
94
119
95
120
@ Override
@@ -121,17 +146,18 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
121
146
}
122
147
123
148
@ Override
124
- public ScanRuntimeProvider getScanRuntimeProvider (ScanContext runtimeProviderContext ) {
125
- final JdbcRowDataInputFormat .Builder builder =
126
- JdbcRowDataInputFormat .builder ()
127
- .setDrivername (options .getDriverName ())
149
+ public ScanRuntimeProvider getScanRuntimeProvider (ScanContext scanContext ) {
150
+
151
+ final JdbcSourceBuilder <RowData > builder =
152
+ JdbcSource .<RowData >builder ()
153
+ .setDriverName (options .getDriverName ())
128
154
.setDBUrl (options .getDbURL ())
129
155
.setUsername (options .getUsername ().orElse (null ))
130
156
.setPassword (options .getPassword ().orElse (null ))
131
157
.setAutoCommit (readOptions .getAutoCommit ());
132
158
133
159
if (readOptions .getFetchSize () != 0 ) {
134
- builder .setFetchSize (readOptions .getFetchSize ());
160
+ builder .setResultSetFetchSize (readOptions .getFetchSize ());
135
161
}
136
162
final JdbcDialect dialect = options .getDialect ();
137
163
String query =
@@ -153,13 +179,13 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
153
179
.ofBatchNum (numPartitions ),
154
180
new JdbcGenericParameterValuesProvider (allPushdownParams ));
155
181
156
- builder .setParametersProvider (allParams );
182
+ builder .setJdbcParameterValuesProvider (allParams );
157
183
158
184
predicates .add (
159
185
dialect .quoteIdentifier (readOptions .getPartitionColumnName ().get ())
160
186
+ " BETWEEN ? AND ?" );
161
187
} else {
162
- builder .setParametersProvider (
188
+ builder .setJdbcParameterValuesProvider (
163
189
new JdbcGenericParameterValuesProvider (replicatePushdownParamsForN (1 )));
164
190
}
165
191
@@ -179,13 +205,34 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
179
205
180
206
LOG .debug ("Query generated for JDBC scan: " + query );
181
207
182
- builder .setQuery (query );
208
+ builder .setSql (query );
183
209
final RowType rowType = (RowType ) physicalRowDataType .getLogicalType ();
184
- builder .setRowConverter (dialect .getRowConverter (rowType ));
185
- builder .setRowDataTypeInfo (
186
- runtimeProviderContext .createTypeInformation (physicalRowDataType ));
210
+ builder .setResultExtractor (new RowDataResultExtractor (dialect .getRowConverter (rowType )));
211
+ builder .setTypeInformation (scanContext .createTypeInformation (physicalRowDataType ));
212
+ options .getProperties ()
213
+ .forEach (
214
+ (key , value ) ->
215
+ builder .setConnectionProperty (key .toString (), value .toString ()));
216
+ JdbcSource <RowData > source = builder .build ();
217
+ return new DataStreamScanProvider () {
218
+ @ Override
219
+ public DataStream <RowData > produceDataStream (
220
+ ProviderContext providerContext , StreamExecutionEnvironment execEnv ) {
221
+ String sourceName =
222
+ Objects .isNull (tableIdentifier )
223
+ ? "JdbcSource"
224
+ : "JdbcSource-" + tableIdentifier ;
225
+ DataStreamSource <RowData > sourceStream =
226
+ execEnv .fromSource (source , WatermarkStrategy .noWatermarks (), sourceName );
227
+ providerContext .generateUid (JDBC_TRANSFORMATION ).ifPresent (sourceStream ::uid );
228
+ return sourceStream ;
229
+ }
187
230
188
- return InputFormatProvider .of (builder .build ());
231
+ @ Override
232
+ public boolean isBounded () {
233
+ return source .getBoundedness () == Boundedness .BOUNDED ;
234
+ }
235
+ };
189
236
}
190
237
191
238
@ Override
@@ -208,7 +255,12 @@ public void applyProjection(int[][] projectedFields, DataType producedDataType)
208
255
public DynamicTableSource copy () {
209
256
JdbcDynamicTableSource newSource =
210
257
new JdbcDynamicTableSource (
211
- options , readOptions , lookupMaxRetryTimes , cache , physicalRowDataType );
258
+ options ,
259
+ readOptions ,
260
+ lookupMaxRetryTimes ,
261
+ cache ,
262
+ physicalRowDataType ,
263
+ tableIdentifier );
212
264
newSource .resolvedPredicates = new ArrayList <>(this .resolvedPredicates );
213
265
newSource .pushdownParams = Arrays .copyOf (this .pushdownParams , this .pushdownParams .length );
214
266
return newSource ;
@@ -236,7 +288,8 @@ public boolean equals(Object o) {
236
288
&& Objects .equals (dialectName , that .dialectName )
237
289
&& Objects .equals (limit , that .limit )
238
290
&& Objects .equals (resolvedPredicates , that .resolvedPredicates )
239
- && Arrays .deepEquals (pushdownParams , that .pushdownParams );
291
+ && Arrays .deepEquals (pushdownParams , that .pushdownParams )
292
+ && Objects .equals (tableIdentifier , that .tableIdentifier );
240
293
}
241
294
242
295
@ Override
@@ -250,7 +303,8 @@ public int hashCode() {
250
303
dialectName ,
251
304
limit ,
252
305
resolvedPredicates ,
253
- pushdownParams );
306
+ pushdownParams ,
307
+ tableIdentifier );
254
308
}
255
309
256
310
@ Override
0 commit comments