Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 2a54048

Browse files
committedMay 17, 2024·
[FLINK-33461][Connector/JDBC] Support streaming related semantics for the new JDBC source
1 parent bde28e6 commit 2a54048

19 files changed

+908
-121
lines changed
 

‎flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java

+21-20
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,12 @@
4040
import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
4141
import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
4242
import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer;
43+
import org.apache.flink.connector.jdbc.utils.ContinuousEnumerationSettings;
4344
import org.apache.flink.core.io.SimpleVersionedSerializer;
4445
import org.apache.flink.util.Preconditions;
4546

47+
import javax.annotation.Nullable;
48+
4649
import java.io.Serializable;
4750
import java.util.ArrayList;
4851
import java.util.Objects;
@@ -55,6 +58,7 @@ public class JdbcSource<OUT>
5558

5659
private final Boundedness boundedness;
5760
private final TypeInformation<OUT> typeInformation;
61+
private final @Nullable ContinuousEnumerationSettings continuousEnumerationSettings;
5862

5963
private final Configuration configuration;
6064
private final JdbcSqlSplitEnumeratorBase.Provider<JdbcSourceSplit> sqlSplitEnumeratorProvider;
@@ -69,29 +73,20 @@ public class JdbcSource<OUT>
6973
JdbcSqlSplitEnumeratorBase.Provider<JdbcSourceSplit> sqlSplitEnumeratorProvider,
7074
ResultExtractor<OUT> resultExtractor,
7175
TypeInformation<OUT> typeInformation,
72-
DeliveryGuarantee deliveryGuarantee) {
76+
@Nullable DeliveryGuarantee deliveryGuarantee,
77+
@Nullable ContinuousEnumerationSettings continuousEnumerationSettings) {
7378
this.configuration = Preconditions.checkNotNull(configuration);
7479
this.connectionProvider = Preconditions.checkNotNull(connectionProvider);
7580
this.sqlSplitEnumeratorProvider = Preconditions.checkNotNull(sqlSplitEnumeratorProvider);
7681
this.resultExtractor = Preconditions.checkNotNull(resultExtractor);
77-
this.deliveryGuarantee = Preconditions.checkNotNull(deliveryGuarantee);
82+
this.deliveryGuarantee =
83+
Objects.isNull(deliveryGuarantee) ? DeliveryGuarantee.NONE : deliveryGuarantee;
7884
this.typeInformation = Preconditions.checkNotNull(typeInformation);
79-
this.boundedness = Boundedness.BOUNDED;
80-
}
81-
82-
JdbcSource(
83-
Configuration configuration,
84-
JdbcConnectionProvider connectionProvider,
85-
JdbcSqlSplitEnumeratorBase.Provider<JdbcSourceSplit> sqlSplitEnumeratorProvider,
86-
ResultExtractor<OUT> resultExtractor,
87-
TypeInformation<OUT> typeInformation) {
88-
this(
89-
configuration,
90-
connectionProvider,
91-
sqlSplitEnumeratorProvider,
92-
resultExtractor,
93-
typeInformation,
94-
DeliveryGuarantee.NONE);
85+
this.continuousEnumerationSettings = continuousEnumerationSettings;
86+
this.boundedness =
87+
Objects.isNull(continuousEnumerationSettings)
88+
? Boundedness.BOUNDED
89+
: Boundedness.CONTINUOUS_UNBOUNDED;
9590
}
9691

9792
@Override
@@ -119,7 +114,10 @@ public SourceReader<OUT, JdbcSourceSplit> createReader(SourceReaderContext reade
119114
public SplitEnumerator<JdbcSourceSplit, JdbcSourceEnumeratorState> createEnumerator(
120115
SplitEnumeratorContext<JdbcSourceSplit> enumContext) throws Exception {
121116
return new JdbcSourceEnumerator(
122-
enumContext, sqlSplitEnumeratorProvider.create(), new ArrayList<>());
117+
enumContext,
118+
sqlSplitEnumeratorProvider.create(),
119+
continuousEnumerationSettings,
120+
new ArrayList<>());
123121
}
124122

125123
@Override
@@ -132,6 +130,7 @@ public SplitEnumerator<JdbcSourceSplit, JdbcSourceEnumeratorState> restoreEnumer
132130
return new JdbcSourceEnumerator(
133131
enumContext,
134132
sqlSplitEnumeratorProvider.restore(optionalUserDefinedSplitEnumeratorState),
133+
continuousEnumerationSettings,
135134
checkpoint.getRemainingSplits());
136135
}
137136

@@ -193,6 +192,8 @@ public boolean equals(Object o) {
193192
&& Objects.equals(sqlSplitEnumeratorProvider, that.sqlSplitEnumeratorProvider)
194193
&& Objects.equals(connectionProvider, that.connectionProvider)
195194
&& Objects.equals(resultExtractor, that.resultExtractor)
196-
&& deliveryGuarantee == that.deliveryGuarantee;
195+
&& deliveryGuarantee == that.deliveryGuarantee
196+
&& Objects.equals(
197+
continuousEnumerationSettings, that.continuousEnumerationSettings);
197198
}
198199
}

‎flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java

+72-10
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import org.apache.flink.connector.jdbc.source.enumerator.SqlTemplateSplitEnumerator;
2929
import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
3030
import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
31+
import org.apache.flink.connector.jdbc.split.JdbcSlideTimingParameterProvider;
32+
import org.apache.flink.connector.jdbc.utils.ContinuousEnumerationSettings;
3133
import org.apache.flink.types.Row;
3234
import org.apache.flink.util.Preconditions;
3335
import org.apache.flink.util.StringUtils;
@@ -36,9 +38,13 @@
3638
import org.slf4j.LoggerFactory;
3739

3840
import javax.annotation.Nonnull;
41+
import javax.annotation.Nullable;
3942

43+
import java.io.Serializable;
4044
import java.sql.DriverManager;
4145
import java.sql.PreparedStatement;
46+
import java.sql.ResultSet;
47+
import java.util.Objects;
4248

4349
import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.AUTO_COMMIT;
4450
import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE;
@@ -94,6 +100,11 @@ public class JdbcSourceBuilder<OUT> {
94100

95101
public static final Logger LOG = LoggerFactory.getLogger(JdbcSourceBuilder.class);
96102

103+
public static final String INVALID_CONTINUOUS_SLIDE_TIMING_HINT =
104+
"The 'jdbcParameterValuesProvider' must be specified with in type of 'JdbcSlideTimingParameterProvider' when using 'continuousEnumerationSettings'.";
105+
public static final String INVALID_SLIDE_TIMING_CONTINUOUS_HINT =
106+
"The 'continuousEnumerationSettings' must be specified with in type of 'ContinuousEnumerationSettings' when using 'jdbcParameterValuesProvider' in type of 'JdbcSlideTimingParameterProvider'.";
107+
97108
private final Configuration configuration;
98109

99110
private int splitReaderFetchBatchSize;
@@ -103,15 +114,15 @@ public class JdbcSourceBuilder<OUT> {
103114
// Boolean to distinguish between default value and explicitly set autoCommit mode.
104115
private Boolean autoCommit;
105116

106-
// TODO It would be used to introduce streaming semantic and tracked in
107-
// https://issues.apache.org/jira/browse/FLINK-33461
108117
private DeliveryGuarantee deliveryGuarantee;
118+
private @Nullable ContinuousEnumerationSettings continuousEnumerationSettings;
109119

110120
private TypeInformation<OUT> typeInformation;
111121

112122
private final JdbcConnectionOptions.JdbcConnectionOptionsBuilder connOptionsBuilder;
113123
private String sql;
114124
private JdbcParameterValuesProvider jdbcParameterValuesProvider;
125+
private @Nullable Serializable optionalSqlSplitEnumeratorState;
115126
private ResultExtractor<OUT> resultExtractor;
116127

117128
private JdbcConnectionProvider connectionProvider;
@@ -177,6 +188,32 @@ public JdbcSourceBuilder<OUT> setTypeInformation(
177188

178189
// ------ Optional ------------------------------------------------------------------
179190

191+
/**
192+
* The continuousEnumerationSettings to discovery the next available batch splits. Note: If the
193+
* value was set, the {@link #jdbcParameterValuesProvider} must specified with the {@link
194+
* org.apache.flink.connector.jdbc.split.JdbcSlideTimingParameterProvider}.
195+
*/
196+
public JdbcSourceBuilder<OUT> setContinuousEnumerationSettings(
197+
ContinuousEnumerationSettings continuousEnumerationSettings) {
198+
this.continuousEnumerationSettings = continuousEnumerationSettings;
199+
return this;
200+
}
201+
202+
/**
203+
* If the value was set as an instance of {@link JdbcSlideTimingParameterProvider}, it's
204+
* required to specify the {@link #continuousEnumerationSettings}.
205+
*/
206+
public JdbcSourceBuilder<OUT> setJdbcParameterValuesProvider(
207+
@Nonnull JdbcParameterValuesProvider parameterValuesProvider) {
208+
this.jdbcParameterValuesProvider = Preconditions.checkNotNull(parameterValuesProvider);
209+
return this;
210+
}
211+
212+
public JdbcSourceBuilder<OUT> setDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) {
213+
this.deliveryGuarantee = Preconditions.checkNotNull(deliveryGuarantee);
214+
return this;
215+
}
216+
180217
public JdbcSourceBuilder<OUT> setConnectionCheckTimeoutSeconds(
181218
int connectionCheckTimeoutSeconds) {
182219
connOptionsBuilder.withConnectionCheckTimeoutSeconds(connectionCheckTimeoutSeconds);
@@ -190,12 +227,6 @@ public JdbcSourceBuilder<OUT> setConnectionProperty(String propKey, String propV
190227
return this;
191228
}
192229

193-
public JdbcSourceBuilder<OUT> setJdbcParameterValuesProvider(
194-
@Nonnull JdbcParameterValuesProvider parameterValuesProvider) {
195-
this.jdbcParameterValuesProvider = Preconditions.checkNotNull(parameterValuesProvider);
196-
return this;
197-
}
198-
199230
public JdbcSourceBuilder<OUT> setSplitReaderFetchBatchSize(int splitReaderFetchBatchSize) {
200231
Preconditions.checkArgument(
201232
splitReaderFetchBatchSize > 0,
@@ -235,11 +266,26 @@ public JdbcSourceBuilder<OUT> setConnectionProvider(
235266
return this;
236267
}
237268

269+
public JdbcSourceBuilder<OUT> setOptionalSqlSplitEnumeratorState(
270+
Serializable optionalSqlSplitEnumeratorState) {
271+
this.optionalSqlSplitEnumeratorState = optionalSqlSplitEnumeratorState;
272+
return this;
273+
}
274+
238275
public JdbcSource<OUT> build() {
239276
this.connectionProvider = new SimpleJdbcConnectionProvider(connOptionsBuilder.build());
240277
if (resultSetFetchSize > 0) {
241278
this.configuration.set(RESULTSET_FETCH_SIZE, resultSetFetchSize);
242279
}
280+
281+
if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
282+
Preconditions.checkArgument(
283+
this.resultSetType == ResultSet.TYPE_SCROLL_INSENSITIVE
284+
|| this.resultSetType == ResultSet.CONCUR_READ_ONLY,
285+
"The 'resultSetType' must be ResultSet.TYPE_SCROLL_INSENSITIVE or ResultSet.CONCUR_READ_ONLY when using %s",
286+
DeliveryGuarantee.EXACTLY_ONCE);
287+
}
288+
243289
this.configuration.set(RESULTSET_CONCURRENCY, resultSetConcurrency);
244290
this.configuration.set(RESULTSET_TYPE, resultSetType);
245291
this.configuration.set(READER_FETCH_BATCH_SIZE, splitReaderFetchBatchSize);
@@ -250,15 +296,31 @@ public JdbcSource<OUT> build() {
250296
Preconditions.checkNotNull(resultExtractor, "'resultExtractor' mustn't be null.");
251297
Preconditions.checkNotNull(typeInformation, "'typeInformation' mustn't be null.");
252298

299+
if (Objects.nonNull(continuousEnumerationSettings)) {
300+
Preconditions.checkArgument(
301+
Objects.nonNull(jdbcParameterValuesProvider)
302+
&& jdbcParameterValuesProvider
303+
instanceof JdbcSlideTimingParameterProvider,
304+
INVALID_SLIDE_TIMING_CONTINUOUS_HINT);
305+
}
306+
307+
if (Objects.nonNull(jdbcParameterValuesProvider)
308+
&& jdbcParameterValuesProvider instanceof JdbcSlideTimingParameterProvider) {
309+
Preconditions.checkArgument(
310+
Objects.nonNull(continuousEnumerationSettings),
311+
INVALID_CONTINUOUS_SLIDE_TIMING_HINT);
312+
}
313+
253314
return new JdbcSource<>(
254315
configuration,
255316
connectionProvider,
256317
new SqlTemplateSplitEnumerator.TemplateSqlSplitEnumeratorProvider()
257-
.setOptionalSqlSplitEnumeratorState(null)
318+
.setOptionalSqlSplitEnumeratorState(optionalSqlSplitEnumeratorState)
258319
.setSqlTemplate(sql)
259320
.setParameterValuesProvider(jdbcParameterValuesProvider),
260321
resultExtractor,
261322
typeInformation,
262-
deliveryGuarantee);
323+
deliveryGuarantee,
324+
continuousEnumerationSettings);
263325
}
264326
}

‎flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumerator.java

+66-11
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.api.connector.source.SplitEnumerator;
2424
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
2525
import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
26+
import org.apache.flink.connector.jdbc.utils.ContinuousEnumerationSettings;
2627
import org.apache.flink.util.Preconditions;
2728

2829
import org.slf4j.Logger;
@@ -34,7 +35,10 @@
3435
import java.util.ArrayList;
3536
import java.util.Collections;
3637
import java.util.Iterator;
38+
import java.util.LinkedHashMap;
3739
import java.util.List;
40+
import java.util.Map;
41+
import java.util.Objects;
3842
import java.util.Optional;
3943

4044
/** JDBC source enumerator. */
@@ -44,26 +48,42 @@ public class JdbcSourceEnumerator
4448

4549
private final SplitEnumeratorContext<JdbcSourceSplit> context;
4650
private final Boundedness boundedness;
51+
private final LinkedHashMap<Integer, String> readersAwaitingSplit;
4752
private final List<JdbcSourceSplit> unassigned;
4853
private final JdbcSqlSplitEnumeratorBase<JdbcSourceSplit> sqlSplitEnumerator;
54+
private final @Nullable ContinuousEnumerationSettings continuousEnumerationSettings;
4955

5056
public JdbcSourceEnumerator(
5157
SplitEnumeratorContext<JdbcSourceSplit> context,
5258
JdbcSqlSplitEnumeratorBase<JdbcSourceSplit> sqlSplitEnumerator,
59+
ContinuousEnumerationSettings continuousEnumerationSettings,
5360
List<JdbcSourceSplit> unassigned) {
5461
this.context = Preconditions.checkNotNull(context);
5562
this.sqlSplitEnumerator = Preconditions.checkNotNull(sqlSplitEnumerator);
56-
this.boundedness = Boundedness.BOUNDED;
63+
this.continuousEnumerationSettings = continuousEnumerationSettings;
64+
this.boundedness =
65+
Objects.isNull(continuousEnumerationSettings)
66+
? Boundedness.BOUNDED
67+
: Boundedness.CONTINUOUS_UNBOUNDED;
5768
this.unassigned = Preconditions.checkNotNull(unassigned);
69+
this.readersAwaitingSplit = new LinkedHashMap<>();
5870
}
5971

6072
@Override
6173
public void start() {
6274
sqlSplitEnumerator.open();
63-
try {
64-
unassigned.addAll(sqlSplitEnumerator.enumerateSplits());
65-
} catch (IOException e) {
66-
throw new RuntimeException(e);
75+
if (boundedness == Boundedness.CONTINUOUS_UNBOUNDED) {
76+
context.callAsync(
77+
() -> sqlSplitEnumerator.enumerateSplits(() -> 1024 - unassigned.size() > 0),
78+
this::processNewSplits,
79+
continuousEnumerationSettings.getDiscoveryInterval().toMillis(),
80+
continuousEnumerationSettings.getDiscoveryInterval().toMillis());
81+
} else {
82+
try {
83+
unassigned.addAll(sqlSplitEnumerator.enumerateSplits(() -> true));
84+
} catch (IOException e) {
85+
throw new RuntimeException(e);
86+
}
6787
}
6888
}
6989

@@ -81,6 +101,9 @@ public void addReader(int subtaskId) {
81101
public void handleSplitRequest(int subtask, @Nullable String hostname) {
82102
if (boundedness == Boundedness.BOUNDED) {
83103
assignSplitsForBounded(subtask, hostname);
104+
} else {
105+
readersAwaitingSplit.put(subtask, hostname);
106+
assignSplitsForUnbounded();
84107
}
85108
}
86109

@@ -110,12 +133,9 @@ private Optional<JdbcSourceSplit> getNextSplit() {
110133
return Optional.empty();
111134
}
112135
Iterator<JdbcSourceSplit> iterator = unassigned.iterator();
113-
JdbcSourceSplit next = null;
114-
if (iterator.hasNext()) {
115-
next = iterator.next();
116-
iterator.remove();
117-
}
118-
return Optional.ofNullable(next);
136+
JdbcSourceSplit next = iterator.next();
137+
iterator.remove();
138+
return Optional.of(next);
119139
}
120140

121141
private void assignSplitsForBounded(int subtask, @Nullable String hostname) {
@@ -137,4 +157,39 @@ private void assignSplitsForBounded(int subtask, @Nullable String hostname) {
137157
LOG.info("No more splits available for subtask {}", subtask);
138158
}
139159
}
160+
161+
private void processNewSplits(List<JdbcSourceSplit> splits, Throwable error) {
162+
if (error != null) {
163+
LOG.error("Failed to enumerate sql splits.", error);
164+
return;
165+
}
166+
this.unassigned.addAll(splits);
167+
168+
assignSplitsForUnbounded();
169+
}
170+
171+
private void assignSplitsForUnbounded() {
172+
final Iterator<Map.Entry<Integer, String>> awaitingReader =
173+
readersAwaitingSplit.entrySet().iterator();
174+
175+
while (awaitingReader.hasNext()) {
176+
final Map.Entry<Integer, String> nextAwaiting = awaitingReader.next();
177+
178+
// if the reader that requested another split has failed in the meantime, remove
179+
// it from the list of waiting readers
180+
if (!context.registeredReaders().containsKey(nextAwaiting.getKey())) {
181+
awaitingReader.remove();
182+
continue;
183+
}
184+
185+
final int awaitingSubtask = nextAwaiting.getKey();
186+
final Optional<JdbcSourceSplit> nextSplit = getNextSplit();
187+
if (nextSplit.isPresent()) {
188+
context.assignSplit(nextSplit.get(), awaitingSubtask);
189+
awaitingReader.remove();
190+
} else {
191+
break;
192+
}
193+
}
194+
}
140195
}

0 commit comments

Comments
 (0)
Please sign in to comment.