Skip to content

Commit e00fe68

Browse files
committed
Updated based on review comments from Boto.
1 parent e14152d commit e00fe68

File tree

8 files changed

+71
-51
lines changed

8 files changed

+71
-51
lines changed

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java

+8-9
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
4141
import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
4242
import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer;
43-
import org.apache.flink.connector.jdbc.utils.ContinuousEnumerationSettings;
43+
import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings;
4444
import org.apache.flink.core.io.SimpleVersionedSerializer;
4545
import org.apache.flink.util.Preconditions;
4646

@@ -58,7 +58,7 @@ public class JdbcSource<OUT>
5858

5959
private final Boundedness boundedness;
6060
private final TypeInformation<OUT> typeInformation;
61-
private final @Nullable ContinuousEnumerationSettings continuousEnumerationSettings;
61+
private final @Nullable ContinuousUnBoundingSettings continuousUnBoundingSettings;
6262

6363
private final Configuration configuration;
6464
private final JdbcSqlSplitEnumeratorBase.Provider<JdbcSourceSplit> sqlSplitEnumeratorProvider;
@@ -74,17 +74,17 @@ public class JdbcSource<OUT>
7474
ResultExtractor<OUT> resultExtractor,
7575
TypeInformation<OUT> typeInformation,
7676
@Nullable DeliveryGuarantee deliveryGuarantee,
77-
@Nullable ContinuousEnumerationSettings continuousEnumerationSettings) {
77+
@Nullable ContinuousUnBoundingSettings continuousUnBoundingSettings) {
7878
this.configuration = Preconditions.checkNotNull(configuration);
7979
this.connectionProvider = Preconditions.checkNotNull(connectionProvider);
8080
this.sqlSplitEnumeratorProvider = Preconditions.checkNotNull(sqlSplitEnumeratorProvider);
8181
this.resultExtractor = Preconditions.checkNotNull(resultExtractor);
8282
this.deliveryGuarantee =
8383
Objects.isNull(deliveryGuarantee) ? DeliveryGuarantee.NONE : deliveryGuarantee;
8484
this.typeInformation = Preconditions.checkNotNull(typeInformation);
85-
this.continuousEnumerationSettings = continuousEnumerationSettings;
85+
this.continuousUnBoundingSettings = continuousUnBoundingSettings;
8686
this.boundedness =
87-
Objects.isNull(continuousEnumerationSettings)
87+
Objects.isNull(continuousUnBoundingSettings)
8888
? Boundedness.BOUNDED
8989
: Boundedness.CONTINUOUS_UNBOUNDED;
9090
}
@@ -116,7 +116,7 @@ public SplitEnumerator<JdbcSourceSplit, JdbcSourceEnumeratorState> createEnumera
116116
return new JdbcSourceEnumerator(
117117
enumContext,
118118
sqlSplitEnumeratorProvider.create(),
119-
continuousEnumerationSettings,
119+
continuousUnBoundingSettings,
120120
new ArrayList<>());
121121
}
122122

@@ -130,7 +130,7 @@ public SplitEnumerator<JdbcSourceSplit, JdbcSourceEnumeratorState> restoreEnumer
130130
return new JdbcSourceEnumerator(
131131
enumContext,
132132
sqlSplitEnumeratorProvider.restore(optionalUserDefinedSplitEnumeratorState),
133-
continuousEnumerationSettings,
133+
continuousUnBoundingSettings,
134134
checkpoint.getRemainingSplits());
135135
}
136136

@@ -193,7 +193,6 @@ public boolean equals(Object o) {
193193
&& Objects.equals(connectionProvider, that.connectionProvider)
194194
&& Objects.equals(resultExtractor, that.resultExtractor)
195195
&& deliveryGuarantee == that.deliveryGuarantee
196-
&& Objects.equals(
197-
continuousEnumerationSettings, that.continuousEnumerationSettings);
196+
&& Objects.equals(continuousUnBoundingSettings, that.continuousUnBoundingSettings);
198197
}
199198
}

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java

+12-12
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
3030
import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
3131
import org.apache.flink.connector.jdbc.split.JdbcSlideTimingParameterProvider;
32-
import org.apache.flink.connector.jdbc.utils.ContinuousEnumerationSettings;
32+
import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings;
3333
import org.apache.flink.types.Row;
3434
import org.apache.flink.util.Preconditions;
3535
import org.apache.flink.util.StringUtils;
@@ -101,9 +101,9 @@ public class JdbcSourceBuilder<OUT> {
101101
public static final Logger LOG = LoggerFactory.getLogger(JdbcSourceBuilder.class);
102102

103103
public static final String INVALID_CONTINUOUS_SLIDE_TIMING_HINT =
104-
"The 'jdbcParameterValuesProvider' must be specified with in type of 'JdbcSlideTimingParameterProvider' when using 'continuousEnumerationSettings'.";
104+
"The 'jdbcParameterValuesProvider' must be specified with in type of 'JdbcSlideTimingParameterProvider' when using 'continuousUnBoundingSettings'.";
105105
public static final String INVALID_SLIDE_TIMING_CONTINUOUS_HINT =
106-
"The 'continuousEnumerationSettings' must be specified with in type of 'ContinuousEnumerationSettings' when using 'jdbcParameterValuesProvider' in type of 'JdbcSlideTimingParameterProvider'.";
106+
"The 'continuousUnBoundingSettings' must be specified with in type of 'continuousUnBoundingSettings' when using 'jdbcParameterValuesProvider' in type of 'JdbcSlideTimingParameterProvider'.";
107107

108108
private final Configuration configuration;
109109

@@ -115,7 +115,7 @@ public class JdbcSourceBuilder<OUT> {
115115
private Boolean autoCommit;
116116

117117
private DeliveryGuarantee deliveryGuarantee;
118-
private @Nullable ContinuousEnumerationSettings continuousEnumerationSettings;
118+
private @Nullable ContinuousUnBoundingSettings continuousUnBoundingSettings;
119119

120120
private TypeInformation<OUT> typeInformation;
121121

@@ -189,19 +189,19 @@ public JdbcSourceBuilder<OUT> setTypeInformation(
189189
// ------ Optional ------------------------------------------------------------------
190190

191191
/**
192-
* The continuousEnumerationSettings to discovery the next available batch splits. Note: If the
192+
* The continuousUnBoundingSettings to discovery the next available batch splits. Note: If the
193193
* value was set, the {@link #jdbcParameterValuesProvider} must specified with the {@link
194194
* org.apache.flink.connector.jdbc.split.JdbcSlideTimingParameterProvider}.
195195
*/
196-
public JdbcSourceBuilder<OUT> setContinuousEnumerationSettings(
197-
ContinuousEnumerationSettings continuousEnumerationSettings) {
198-
this.continuousEnumerationSettings = continuousEnumerationSettings;
196+
public JdbcSourceBuilder<OUT> setContinuousUnBoundingSettings(
197+
ContinuousUnBoundingSettings continuousUnBoundingSettings) {
198+
this.continuousUnBoundingSettings = continuousUnBoundingSettings;
199199
return this;
200200
}
201201

202202
/**
203203
* If the value was set as an instance of {@link JdbcSlideTimingParameterProvider}, it's
204-
* required to specify the {@link #continuousEnumerationSettings}.
204+
* required to specify the {@link #continuousUnBoundingSettings}.
205205
*/
206206
public JdbcSourceBuilder<OUT> setJdbcParameterValuesProvider(
207207
@Nonnull JdbcParameterValuesProvider parameterValuesProvider) {
@@ -296,7 +296,7 @@ public JdbcSource<OUT> build() {
296296
Preconditions.checkNotNull(resultExtractor, "'resultExtractor' mustn't be null.");
297297
Preconditions.checkNotNull(typeInformation, "'typeInformation' mustn't be null.");
298298

299-
if (Objects.nonNull(continuousEnumerationSettings)) {
299+
if (Objects.nonNull(continuousUnBoundingSettings)) {
300300
Preconditions.checkArgument(
301301
Objects.nonNull(jdbcParameterValuesProvider)
302302
&& jdbcParameterValuesProvider
@@ -307,7 +307,7 @@ public JdbcSource<OUT> build() {
307307
if (Objects.nonNull(jdbcParameterValuesProvider)
308308
&& jdbcParameterValuesProvider instanceof JdbcSlideTimingParameterProvider) {
309309
Preconditions.checkArgument(
310-
Objects.nonNull(continuousEnumerationSettings),
310+
Objects.nonNull(continuousUnBoundingSettings),
311311
INVALID_CONTINUOUS_SLIDE_TIMING_HINT);
312312
}
313313

@@ -321,6 +321,6 @@ public JdbcSource<OUT> build() {
321321
resultExtractor,
322322
typeInformation,
323323
deliveryGuarantee,
324-
continuousEnumerationSettings);
324+
continuousUnBoundingSettings);
325325
}
326326
}

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumerator.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.apache.flink.api.connector.source.SplitEnumerator;
2424
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
2525
import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
26-
import org.apache.flink.connector.jdbc.utils.ContinuousEnumerationSettings;
26+
import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings;
2727
import org.apache.flink.util.Preconditions;
2828

2929
import org.slf4j.Logger;
@@ -51,18 +51,18 @@ public class JdbcSourceEnumerator
5151
private final LinkedHashMap<Integer, String> readersAwaitingSplit;
5252
private final List<JdbcSourceSplit> unassigned;
5353
private final JdbcSqlSplitEnumeratorBase<JdbcSourceSplit> sqlSplitEnumerator;
54-
private final @Nullable ContinuousEnumerationSettings continuousEnumerationSettings;
54+
private final @Nullable ContinuousUnBoundingSettings continuousUnBoundingSettings;
5555

5656
public JdbcSourceEnumerator(
5757
SplitEnumeratorContext<JdbcSourceSplit> context,
5858
JdbcSqlSplitEnumeratorBase<JdbcSourceSplit> sqlSplitEnumerator,
59-
ContinuousEnumerationSettings continuousEnumerationSettings,
59+
ContinuousUnBoundingSettings continuousUnBoundingSettings,
6060
List<JdbcSourceSplit> unassigned) {
6161
this.context = Preconditions.checkNotNull(context);
6262
this.sqlSplitEnumerator = Preconditions.checkNotNull(sqlSplitEnumerator);
63-
this.continuousEnumerationSettings = continuousEnumerationSettings;
63+
this.continuousUnBoundingSettings = continuousUnBoundingSettings;
6464
this.boundedness =
65-
Objects.isNull(continuousEnumerationSettings)
65+
Objects.isNull(continuousUnBoundingSettings)
6666
? Boundedness.BOUNDED
6767
: Boundedness.CONTINUOUS_UNBOUNDED;
6868
this.unassigned = Preconditions.checkNotNull(unassigned);
@@ -73,12 +73,12 @@ public JdbcSourceEnumerator(
7373
public void start() {
7474
sqlSplitEnumerator.open();
7575
if (boundedness == Boundedness.CONTINUOUS_UNBOUNDED
76-
&& Objects.nonNull(continuousEnumerationSettings)) {
76+
&& Objects.nonNull(continuousUnBoundingSettings)) {
7777
context.callAsync(
7878
() -> sqlSplitEnumerator.enumerateSplits(() -> 1024 - unassigned.size() > 0),
7979
this::processNewSplits,
80-
continuousEnumerationSettings.getInitialDiscoveryDelay().toMillis(),
81-
continuousEnumerationSettings.getDiscoveryInterval().toMillis());
80+
continuousUnBoundingSettings.getInitialDiscoveryDelay().toMillis(),
81+
continuousUnBoundingSettings.getDiscoveryInterval().toMillis());
8282
} else {
8383
try {
8484
unassigned.addAll(sqlSplitEnumerator.enumerateSplits(() -> true));

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReader.java

+16-1
Original file line numberDiff line numberDiff line change
@@ -170,14 +170,26 @@ private RecordsWithSplitIds<RecordAndOffset<T>> finishSplit() {
170170
}
171171

172172
private void closeResultSetAndStatement() {
173+
closeResultSetIfNeeded();
174+
closeStatementIfNeeded();
175+
}
176+
177+
private void closeResultSetIfNeeded() {
173178
try {
174179
if (resultSet != null && !resultSet.isClosed()) {
175180
resultSet.close();
176181
}
182+
resultSet = null;
183+
} catch (SQLException e) {
184+
throw new RuntimeException(e);
185+
}
186+
}
187+
188+
private void closeStatementIfNeeded() {
189+
try {
177190
if (statement != null && !statement.isClosed()) {
178191
statement.close();
179192
}
180-
resultSet = null;
181193
statement = null;
182194
} catch (SQLException e) {
183195
throw new RuntimeException(e);
@@ -298,6 +310,7 @@ private void openResultSetForSplitWhenAtMostOnce(JdbcSourceSplit split)
298310
private void openResultSetForSplitWhenExactlyOnce(JdbcSourceSplit split)
299311
throws SQLException, ClassNotFoundException {
300312
getOrEstablishConnection();
313+
closeResultSetIfNeeded();
301314
prepareStatement(split);
302315
resultSet = statement.executeQuery();
303316
currentSplitOffset = 0;
@@ -330,6 +343,7 @@ private void moveResultSetCursorByOffset() throws SQLException {
330343
private void openResultSetForSplitWhenAtLeastOnce(JdbcSourceSplit split)
331344
throws SQLException, ClassNotFoundException {
332345
getOrEstablishConnection();
346+
closeResultSetIfNeeded();
333347
prepareStatement(split);
334348
resultSet = statement.executeQuery();
335349
// AT_LEAST_ONCE
@@ -338,6 +352,7 @@ private void openResultSetForSplitWhenAtLeastOnce(JdbcSourceSplit split)
338352
}
339353

340354
private void prepareStatement(JdbcSourceSplit split) throws SQLException {
355+
closeStatementIfNeeded();
341356
statement =
342357
connection.prepareStatement(
343358
split.getSqlTemplate(), resultSetType, resultSetConcurrency);

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/ContinuousEnumerationSettings.java flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/ContinuousUnBoundingSettings.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,14 @@
3131
* continuous discovery and streaming mode.
3232
*/
3333
@PublicEvolving
34-
public final class ContinuousEnumerationSettings implements Serializable {
34+
public final class ContinuousUnBoundingSettings implements Serializable {
3535

3636
private static final long serialVersionUID = 1L;
3737

3838
private final Duration initialDiscoveryDelay;
3939
private final Duration discoveryInterval;
4040

41-
public ContinuousEnumerationSettings(
41+
public ContinuousUnBoundingSettings(
4242
Duration initialDiscoveryDelay, Duration discoveryInterval) {
4343
this.initialDiscoveryDelay = initialDiscoveryDelay;
4444
this.discoveryInterval = checkNotNull(discoveryInterval);
@@ -56,7 +56,7 @@ public Duration getInitialDiscoveryDelay() {
5656

5757
@Override
5858
public String toString() {
59-
return "ContinuousEnumerationSettings{"
59+
return "ContinuousUnBoundingSettings{"
6060
+ "initialDiscoveryDelay="
6161
+ initialDiscoveryDelay
6262
+ ", discoveryInterval="
@@ -74,7 +74,7 @@ public boolean equals(Object object) {
7474
return false;
7575
}
7676

77-
ContinuousEnumerationSettings that = (ContinuousEnumerationSettings) object;
77+
ContinuousUnBoundingSettings that = (ContinuousUnBoundingSettings) object;
7878
return Objects.equals(initialDiscoveryDelay, that.initialDiscoveryDelay)
7979
&& Objects.equals(discoveryInterval, that.discoveryInterval);
8080
}

flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilderTest.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider;
2727
import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
2828
import org.apache.flink.connector.jdbc.split.JdbcSlideTimingParameterProvider;
29-
import org.apache.flink.connector.jdbc.utils.ContinuousEnumerationSettings;
29+
import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings;
3030
import org.apache.flink.types.Row;
3131

3232
import org.junit.jupiter.api.Test;
@@ -152,8 +152,8 @@ void testSetStreamingSemantic() {
152152
assertThatThrownBy(
153153
() ->
154154
sourceBuilder
155-
.setContinuousEnumerationSettings(
156-
new ContinuousEnumerationSettings(
155+
.setContinuousUnBoundingSettings(
156+
new ContinuousUnBoundingSettings(
157157
Duration.ofMillis(1L),
158158
Duration.ofMillis(1L)))
159159
.build())
@@ -170,7 +170,7 @@ void testSetStreamingSemantic() {
170170
.isInstanceOf(IllegalArgumentException.class)
171171
.hasMessage(JdbcSourceBuilder.INVALID_SLIDE_TIMING_CONTINUOUS_HINT);
172172

173-
sourceBuilder.setContinuousEnumerationSettings(null);
173+
sourceBuilder.setContinuousUnBoundingSettings(null);
174174
assertThatThrownBy(
175175
() ->
176176
sourceBuilder

flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceStreamRelatedITCase.java

+14-8
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,18 @@
2727
import org.apache.flink.client.program.ClusterClient;
2828
import org.apache.flink.configuration.Configuration;
2929
import org.apache.flink.connector.base.DeliveryGuarantee;
30-
import org.apache.flink.connector.jdbc.databases.mysql.MySqlTestBase;
30+
import org.apache.flink.connector.jdbc.databases.derby.DerbyTestBase;
3131
import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
3232
import org.apache.flink.connector.jdbc.split.JdbcSlideTimingParameterProvider;
3333
import org.apache.flink.connector.jdbc.testutils.JdbcITCaseBase;
34-
import org.apache.flink.connector.jdbc.utils.ContinuousEnumerationSettings;
34+
import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings;
3535
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3636
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
3737
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
3838
import org.apache.flink.test.junit5.InjectClusterClient;
3939
import org.apache.flink.util.Collector;
4040

41+
import org.junit.jupiter.api.AfterEach;
4142
import org.junit.jupiter.api.BeforeEach;
4243
import org.junit.jupiter.api.Test;
4344
import org.junit.jupiter.params.ParameterizedTest;
@@ -65,7 +66,7 @@
6566
import static org.assertj.core.api.Assertions.assertThat;
6667

6768
/** Test for streaming semantic related cases of {@link JdbcSource}. */
68-
class JdbcSourceStreamRelatedITCase implements MySqlTestBase, JdbcITCaseBase {
69+
class JdbcSourceStreamRelatedITCase implements DerbyTestBase, JdbcITCaseBase {
6970

7071
private static final long ONE_SECOND = Duration.ofSeconds(1L).toMillis();
7172
private static final int TESTING_PARALLELISM = 2;
@@ -75,14 +76,14 @@ class JdbcSourceStreamRelatedITCase implements MySqlTestBase, JdbcITCaseBase {
7576
(int) (ONE_SECOND / INTERVAL_OF_GENERATING + 1);
7677
private static final String testingTable = "t_testing";
7778
private static final String CREATE_SQL =
78-
"CREATE TABLE if not exists "
79+
"CREATE TABLE "
7980
+ testingTable
8081
+ " ("
8182
+ "id bigint NOT NULL, "
8283
+ "ts bigint NOT NULL, "
8384
+ "PRIMARY KEY (id))";
84-
private static final ContinuousEnumerationSettings CONTINUOUS_SETTINGS =
85-
new ContinuousEnumerationSettings(Duration.ofMillis(10L), Duration.ofSeconds(1L));
85+
private static final ContinuousUnBoundingSettings CONTINUOUS_SETTINGS =
86+
new ContinuousUnBoundingSettings(Duration.ofMillis(10L), Duration.ofSeconds(1L));
8687
private static final ResultExtractor<TestEntry> EXTRACTOR =
8788
resultSet -> new TestEntry(resultSet.getLong("id"), resultSet.getLong("ts"));
8889
private static final List<TestEntry> testEntries = new ArrayList<>(TESTING_ENTRIES_SIZE);
@@ -97,7 +98,6 @@ class JdbcSourceStreamRelatedITCase implements MySqlTestBase, JdbcITCaseBase {
9798
void initData() {
9899
testEntries.clear();
99100
quickExecutionSQL(CREATE_SQL);
100-
quickExecutionSQL("delete from " + testingTable);
101101
generateTestEntries();
102102
String insertSQL = generateInsertSQL();
103103
quickExecutionSQL(insertSQL);
@@ -112,14 +112,20 @@ void initData() {
112112
.setDBUrl(getMetadata().getJdbcUrl())
113113
.setUsername(getMetadata().getUsername())
114114
.setPassword(getMetadata().getPassword())
115-
.setContinuousEnumerationSettings(CONTINUOUS_SETTINGS)
115+
.setContinuousUnBoundingSettings(CONTINUOUS_SETTINGS)
116116
.setJdbcParameterValuesProvider(slideTimingParamsProvider)
117117
.setDriverName(getMetadata().getDriverClass())
118118
.setResultExtractor(EXTRACTOR);
119119

120120
collectedRecords = new ConcurrentLinkedDeque<>();
121121
}
122122

123+
@AfterEach
124+
void clearData() {
125+
quickExecutionSQL("delete from " + testingTable);
126+
quickExecutionSQL("DROP TABLE " + testingTable);
127+
}
128+
123129
@ParameterizedTest
124130
@EnumSource(DeliveryGuarantee.class)
125131
void testForNormalCaseWithoutFailure(

0 commit comments

Comments
 (0)