Skip to content

Commit e14152d

Browse files
committed
[FLINK-33461][Connector/JDBC] Support streaming related semantics for the new JDBC source
1 parent bde28e6 commit e14152d

19 files changed

+943
-130
lines changed

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java

+21-20
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,12 @@
4040
import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
4141
import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
4242
import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer;
43+
import org.apache.flink.connector.jdbc.utils.ContinuousEnumerationSettings;
4344
import org.apache.flink.core.io.SimpleVersionedSerializer;
4445
import org.apache.flink.util.Preconditions;
4546

47+
import javax.annotation.Nullable;
48+
4649
import java.io.Serializable;
4750
import java.util.ArrayList;
4851
import java.util.Objects;
@@ -55,6 +58,7 @@ public class JdbcSource<OUT>
5558

5659
private final Boundedness boundedness;
5760
private final TypeInformation<OUT> typeInformation;
61+
private final @Nullable ContinuousEnumerationSettings continuousEnumerationSettings;
5862

5963
private final Configuration configuration;
6064
private final JdbcSqlSplitEnumeratorBase.Provider<JdbcSourceSplit> sqlSplitEnumeratorProvider;
@@ -69,29 +73,20 @@ public class JdbcSource<OUT>
6973
JdbcSqlSplitEnumeratorBase.Provider<JdbcSourceSplit> sqlSplitEnumeratorProvider,
7074
ResultExtractor<OUT> resultExtractor,
7175
TypeInformation<OUT> typeInformation,
72-
DeliveryGuarantee deliveryGuarantee) {
76+
@Nullable DeliveryGuarantee deliveryGuarantee,
77+
@Nullable ContinuousEnumerationSettings continuousEnumerationSettings) {
7378
this.configuration = Preconditions.checkNotNull(configuration);
7479
this.connectionProvider = Preconditions.checkNotNull(connectionProvider);
7580
this.sqlSplitEnumeratorProvider = Preconditions.checkNotNull(sqlSplitEnumeratorProvider);
7681
this.resultExtractor = Preconditions.checkNotNull(resultExtractor);
77-
this.deliveryGuarantee = Preconditions.checkNotNull(deliveryGuarantee);
82+
this.deliveryGuarantee =
83+
Objects.isNull(deliveryGuarantee) ? DeliveryGuarantee.NONE : deliveryGuarantee;
7884
this.typeInformation = Preconditions.checkNotNull(typeInformation);
79-
this.boundedness = Boundedness.BOUNDED;
80-
}
81-
82-
JdbcSource(
83-
Configuration configuration,
84-
JdbcConnectionProvider connectionProvider,
85-
JdbcSqlSplitEnumeratorBase.Provider<JdbcSourceSplit> sqlSplitEnumeratorProvider,
86-
ResultExtractor<OUT> resultExtractor,
87-
TypeInformation<OUT> typeInformation) {
88-
this(
89-
configuration,
90-
connectionProvider,
91-
sqlSplitEnumeratorProvider,
92-
resultExtractor,
93-
typeInformation,
94-
DeliveryGuarantee.NONE);
85+
this.continuousEnumerationSettings = continuousEnumerationSettings;
86+
this.boundedness =
87+
Objects.isNull(continuousEnumerationSettings)
88+
? Boundedness.BOUNDED
89+
: Boundedness.CONTINUOUS_UNBOUNDED;
9590
}
9691

9792
@Override
@@ -119,7 +114,10 @@ public SourceReader<OUT, JdbcSourceSplit> createReader(SourceReaderContext reade
119114
public SplitEnumerator<JdbcSourceSplit, JdbcSourceEnumeratorState> createEnumerator(
120115
SplitEnumeratorContext<JdbcSourceSplit> enumContext) throws Exception {
121116
return new JdbcSourceEnumerator(
122-
enumContext, sqlSplitEnumeratorProvider.create(), new ArrayList<>());
117+
enumContext,
118+
sqlSplitEnumeratorProvider.create(),
119+
continuousEnumerationSettings,
120+
new ArrayList<>());
123121
}
124122

125123
@Override
@@ -132,6 +130,7 @@ public SplitEnumerator<JdbcSourceSplit, JdbcSourceEnumeratorState> restoreEnumer
132130
return new JdbcSourceEnumerator(
133131
enumContext,
134132
sqlSplitEnumeratorProvider.restore(optionalUserDefinedSplitEnumeratorState),
133+
continuousEnumerationSettings,
135134
checkpoint.getRemainingSplits());
136135
}
137136

@@ -193,6 +192,8 @@ public boolean equals(Object o) {
193192
&& Objects.equals(sqlSplitEnumeratorProvider, that.sqlSplitEnumeratorProvider)
194193
&& Objects.equals(connectionProvider, that.connectionProvider)
195194
&& Objects.equals(resultExtractor, that.resultExtractor)
196-
&& deliveryGuarantee == that.deliveryGuarantee;
195+
&& deliveryGuarantee == that.deliveryGuarantee
196+
&& Objects.equals(
197+
continuousEnumerationSettings, that.continuousEnumerationSettings);
197198
}
198199
}

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java

+72-10
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import org.apache.flink.connector.jdbc.source.enumerator.SqlTemplateSplitEnumerator;
2929
import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
3030
import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
31+
import org.apache.flink.connector.jdbc.split.JdbcSlideTimingParameterProvider;
32+
import org.apache.flink.connector.jdbc.utils.ContinuousEnumerationSettings;
3133
import org.apache.flink.types.Row;
3234
import org.apache.flink.util.Preconditions;
3335
import org.apache.flink.util.StringUtils;
@@ -36,9 +38,13 @@
3638
import org.slf4j.LoggerFactory;
3739

3840
import javax.annotation.Nonnull;
41+
import javax.annotation.Nullable;
3942

43+
import java.io.Serializable;
4044
import java.sql.DriverManager;
4145
import java.sql.PreparedStatement;
46+
import java.sql.ResultSet;
47+
import java.util.Objects;
4248

4349
import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.AUTO_COMMIT;
4450
import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE;
@@ -94,6 +100,11 @@ public class JdbcSourceBuilder<OUT> {
94100

95101
public static final Logger LOG = LoggerFactory.getLogger(JdbcSourceBuilder.class);
96102

103+
public static final String INVALID_CONTINUOUS_SLIDE_TIMING_HINT =
104+
"The 'jdbcParameterValuesProvider' must be specified with in type of 'JdbcSlideTimingParameterProvider' when using 'continuousEnumerationSettings'.";
105+
public static final String INVALID_SLIDE_TIMING_CONTINUOUS_HINT =
106+
"The 'continuousEnumerationSettings' must be specified with in type of 'ContinuousEnumerationSettings' when using 'jdbcParameterValuesProvider' in type of 'JdbcSlideTimingParameterProvider'.";
107+
97108
private final Configuration configuration;
98109

99110
private int splitReaderFetchBatchSize;
@@ -103,15 +114,15 @@ public class JdbcSourceBuilder<OUT> {
103114
// Boolean to distinguish between default value and explicitly set autoCommit mode.
104115
private Boolean autoCommit;
105116

106-
// TODO It would be used to introduce streaming semantic and tracked in
107-
// https://issues.apache.org/jira/browse/FLINK-33461
108117
private DeliveryGuarantee deliveryGuarantee;
118+
private @Nullable ContinuousEnumerationSettings continuousEnumerationSettings;
109119

110120
private TypeInformation<OUT> typeInformation;
111121

112122
private final JdbcConnectionOptions.JdbcConnectionOptionsBuilder connOptionsBuilder;
113123
private String sql;
114124
private JdbcParameterValuesProvider jdbcParameterValuesProvider;
125+
private @Nullable Serializable optionalSqlSplitEnumeratorState;
115126
private ResultExtractor<OUT> resultExtractor;
116127

117128
private JdbcConnectionProvider connectionProvider;
@@ -177,6 +188,32 @@ public JdbcSourceBuilder<OUT> setTypeInformation(
177188

178189
// ------ Optional ------------------------------------------------------------------
179190

191+
/**
192+
* The continuousEnumerationSettings to discovery the next available batch splits. Note: If the
193+
* value was set, the {@link #jdbcParameterValuesProvider} must specified with the {@link
194+
* org.apache.flink.connector.jdbc.split.JdbcSlideTimingParameterProvider}.
195+
*/
196+
public JdbcSourceBuilder<OUT> setContinuousEnumerationSettings(
197+
ContinuousEnumerationSettings continuousEnumerationSettings) {
198+
this.continuousEnumerationSettings = continuousEnumerationSettings;
199+
return this;
200+
}
201+
202+
/**
203+
* If the value was set as an instance of {@link JdbcSlideTimingParameterProvider}, it's
204+
* required to specify the {@link #continuousEnumerationSettings}.
205+
*/
206+
public JdbcSourceBuilder<OUT> setJdbcParameterValuesProvider(
207+
@Nonnull JdbcParameterValuesProvider parameterValuesProvider) {
208+
this.jdbcParameterValuesProvider = Preconditions.checkNotNull(parameterValuesProvider);
209+
return this;
210+
}
211+
212+
public JdbcSourceBuilder<OUT> setDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) {
213+
this.deliveryGuarantee = Preconditions.checkNotNull(deliveryGuarantee);
214+
return this;
215+
}
216+
180217
public JdbcSourceBuilder<OUT> setConnectionCheckTimeoutSeconds(
181218
int connectionCheckTimeoutSeconds) {
182219
connOptionsBuilder.withConnectionCheckTimeoutSeconds(connectionCheckTimeoutSeconds);
@@ -190,12 +227,6 @@ public JdbcSourceBuilder<OUT> setConnectionProperty(String propKey, String propV
190227
return this;
191228
}
192229

193-
public JdbcSourceBuilder<OUT> setJdbcParameterValuesProvider(
194-
@Nonnull JdbcParameterValuesProvider parameterValuesProvider) {
195-
this.jdbcParameterValuesProvider = Preconditions.checkNotNull(parameterValuesProvider);
196-
return this;
197-
}
198-
199230
public JdbcSourceBuilder<OUT> setSplitReaderFetchBatchSize(int splitReaderFetchBatchSize) {
200231
Preconditions.checkArgument(
201232
splitReaderFetchBatchSize > 0,
@@ -235,11 +266,26 @@ public JdbcSourceBuilder<OUT> setConnectionProvider(
235266
return this;
236267
}
237268

269+
public JdbcSourceBuilder<OUT> setOptionalSqlSplitEnumeratorState(
270+
Serializable optionalSqlSplitEnumeratorState) {
271+
this.optionalSqlSplitEnumeratorState = optionalSqlSplitEnumeratorState;
272+
return this;
273+
}
274+
238275
public JdbcSource<OUT> build() {
239276
this.connectionProvider = new SimpleJdbcConnectionProvider(connOptionsBuilder.build());
240277
if (resultSetFetchSize > 0) {
241278
this.configuration.set(RESULTSET_FETCH_SIZE, resultSetFetchSize);
242279
}
280+
281+
if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
282+
Preconditions.checkArgument(
283+
this.resultSetType == ResultSet.TYPE_SCROLL_INSENSITIVE
284+
|| this.resultSetType == ResultSet.CONCUR_READ_ONLY,
285+
"The 'resultSetType' must be ResultSet.TYPE_SCROLL_INSENSITIVE or ResultSet.CONCUR_READ_ONLY when using %s",
286+
DeliveryGuarantee.EXACTLY_ONCE);
287+
}
288+
243289
this.configuration.set(RESULTSET_CONCURRENCY, resultSetConcurrency);
244290
this.configuration.set(RESULTSET_TYPE, resultSetType);
245291
this.configuration.set(READER_FETCH_BATCH_SIZE, splitReaderFetchBatchSize);
@@ -250,15 +296,31 @@ public JdbcSource<OUT> build() {
250296
Preconditions.checkNotNull(resultExtractor, "'resultExtractor' mustn't be null.");
251297
Preconditions.checkNotNull(typeInformation, "'typeInformation' mustn't be null.");
252298

299+
if (Objects.nonNull(continuousEnumerationSettings)) {
300+
Preconditions.checkArgument(
301+
Objects.nonNull(jdbcParameterValuesProvider)
302+
&& jdbcParameterValuesProvider
303+
instanceof JdbcSlideTimingParameterProvider,
304+
INVALID_SLIDE_TIMING_CONTINUOUS_HINT);
305+
}
306+
307+
if (Objects.nonNull(jdbcParameterValuesProvider)
308+
&& jdbcParameterValuesProvider instanceof JdbcSlideTimingParameterProvider) {
309+
Preconditions.checkArgument(
310+
Objects.nonNull(continuousEnumerationSettings),
311+
INVALID_CONTINUOUS_SLIDE_TIMING_HINT);
312+
}
313+
253314
return new JdbcSource<>(
254315
configuration,
255316
connectionProvider,
256317
new SqlTemplateSplitEnumerator.TemplateSqlSplitEnumeratorProvider()
257-
.setOptionalSqlSplitEnumeratorState(null)
318+
.setOptionalSqlSplitEnumeratorState(optionalSqlSplitEnumeratorState)
258319
.setSqlTemplate(sql)
259320
.setParameterValuesProvider(jdbcParameterValuesProvider),
260321
resultExtractor,
261322
typeInformation,
262-
deliveryGuarantee);
323+
deliveryGuarantee,
324+
continuousEnumerationSettings);
263325
}
264326
}

0 commit comments

Comments
 (0)