Skip to content

Commit 6d5c69d

Browse files
committed
[Hotfix][Connector/JDBC] Remove the unused 'offset' attribute of JdbcSourceSplit
1 parent 8641e67 commit 6d5c69d

File tree

8 files changed

+12
-32
lines changed

8 files changed

+12
-32
lines changed

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/SqlTemplateSplitEnumerator.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,7 @@ public List<JdbcSourceSplit> enumerateSplits() throws IOException {
5454

5555
if (parameterValuesProvider == null) {
5656
return Collections.singletonList(
57-
new JdbcSourceSplit(
58-
getNextId(), sqlTemplate, null, 0, new CheckpointedOffset()));
57+
new JdbcSourceSplit(getNextId(), sqlTemplate, null, new CheckpointedOffset()));
5958
}
6059

6160
if (optionalSqlSplitEnumeratorState != null) {
@@ -68,15 +67,14 @@ public List<JdbcSourceSplit> enumerateSplits() throws IOException {
6867

6968
if (parameters == null) {
7069
return Collections.singletonList(
71-
new JdbcSourceSplit(
72-
getNextId(), sqlTemplate, null, 0, new CheckpointedOffset()));
70+
new JdbcSourceSplit(getNextId(), sqlTemplate, null, new CheckpointedOffset()));
7371
}
7472

7573
List<JdbcSourceSplit> jdbcSourceSplitList = new ArrayList<>(parameters.length);
7674
for (Serializable[] paramArr : parameters) {
7775
jdbcSourceSplitList.add(
7876
new JdbcSourceSplit(
79-
getNextId(), sqlTemplate, paramArr, 0, new CheckpointedOffset()));
77+
getNextId(), sqlTemplate, paramArr, new CheckpointedOffset()));
8078
}
8179
return jdbcSourceSplitList;
8280
}

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplit.java

+3-14
Original file line numberDiff line numberDiff line change
@@ -39,35 +39,27 @@ public class JdbcSourceSplit implements SourceSplit, Serializable {
3939

4040
private final @Nullable Serializable[] parameters;
4141

42-
private final int offset;
43-
4442
private final @Nullable CheckpointedOffset checkpointedOffset;
4543

4644
public JdbcSourceSplit(
4745
String id,
4846
String sqlTemplate,
4947
@Nullable Serializable[] parameters,
50-
int offset,
5148
@Nullable CheckpointedOffset checkpointedOffset) {
5249
this.id = id;
5350
this.sqlTemplate = sqlTemplate;
5451
this.parameters = parameters;
55-
this.offset = offset;
5652
this.checkpointedOffset = checkpointedOffset;
5753
}
5854

59-
public int getOffset() {
60-
return offset;
61-
}
62-
6355
@Nullable
6456
public CheckpointedOffset getCheckpointedOffset() {
6557
return checkpointedOffset;
6658
}
6759

6860
public JdbcSourceSplit updateWithCheckpointedPosition(
6961
@Nullable CheckpointedOffset checkpointedOffset) {
70-
return new JdbcSourceSplit(id, sqlTemplate, parameters, offset, checkpointedOffset);
62+
return new JdbcSourceSplit(id, sqlTemplate, parameters, checkpointedOffset);
7163
}
7264

7365
public Optional<CheckpointedOffset> getReaderPositionOptional() {
@@ -105,16 +97,15 @@ public boolean equals(Object o) {
10597
return false;
10698
}
10799
JdbcSourceSplit that = (JdbcSourceSplit) o;
108-
return offset == that.offset
109-
&& Objects.equals(id, that.id)
100+
return Objects.equals(id, that.id)
110101
&& Objects.equals(sqlTemplate, that.sqlTemplate)
111102
&& Arrays.equals(parameters, that.parameters)
112103
&& Objects.equals(checkpointedOffset, that.checkpointedOffset);
113104
}
114105

115106
@Override
116107
public int hashCode() {
117-
int result = Objects.hash(id, sqlTemplate, offset, checkpointedOffset);
108+
int result = Objects.hash(id, sqlTemplate, checkpointedOffset);
118109
result = 31 * result + Arrays.hashCode(parameters);
119110
return result;
120111
}
@@ -130,8 +121,6 @@ public String toString() {
130121
+ '\''
131122
+ ", parameters="
132123
+ Arrays.toString(parameters)
133-
+ ", offset="
134-
+ offset
135124
+ ", checkpointedOffset="
136125
+ checkpointedOffset
137126
+ '}';

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitSerializer.java

+1-5
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,6 @@ public void serializeJdbcSourceSplit(DataOutputStream out, JdbcSourceSplit sourc
8181
out.writeInt(paramsBytes.length);
8282
out.write(paramsBytes);
8383

84-
out.writeInt(sourceSplit.getOffset());
85-
8684
CheckpointedOffset checkpointedOffset = sourceSplit.getCheckpointedOffset();
8785
byte[] chkOffset = InstantiationUtil.serializeObject(checkpointedOffset);
8886
out.writeInt(chkOffset.length);
@@ -100,14 +98,12 @@ public JdbcSourceSplit deserializeJdbcSourceSplit(DataInputStream in)
10098
InstantiationUtil.deserializeObject(
10199
parametersBytes, in.getClass().getClassLoader());
102100

103-
int offset = in.readInt();
104-
105101
int chkOffsetBytesLen = in.readInt();
106102
byte[] chkOffsetBytes = new byte[chkOffsetBytesLen];
107103
in.read(chkOffsetBytes);
108104
CheckpointedOffset chkOffset =
109105
InstantiationUtil.deserializeObject(chkOffsetBytes, in.getClass().getClassLoader());
110106

111-
return new JdbcSourceSplit(id, sqlTemplate, params, offset, chkOffset);
107+
return new JdbcSourceSplit(id, sqlTemplate, params, chkOffset);
112108
}
113109
}

flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumStateSerializerTest.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,14 @@ class JdbcSourceEnumStateSerializerTest {
3737

3838
private final JdbcSourceEnumeratorState state =
3939
new JdbcSourceEnumeratorState(
40-
Arrays.asList(new JdbcSourceSplit("1", "select 1", null, 0, null)),
40+
Arrays.asList(new JdbcSourceSplit("1", "select 1", null, null)),
4141
Arrays.asList(
4242
new JdbcSourceSplit(
4343
"1",
4444
"select 1",
4545
new Serializable[] {new Integer(0)},
46-
10,
4746
new CheckpointedOffset(0, 10))),
48-
Arrays.asList(new JdbcSourceSplit("1", "select 1", null, 0, null)),
47+
Arrays.asList(new JdbcSourceSplit("1", "select 1", null, null)),
4948
null);
5049
private final JdbcSourceEnumeratorState mockedState = new MockedJdbcSourceEnumState(state);
5150
private final JdbcSourceEnumStateSerializer serializer =

flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumeratorTest.java

-1
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ private static JdbcSourceSplit createRandomSplit() {
9393
String.valueOf(splitId++),
9494
"select 1",
9595
new Serializable[] {0},
96-
0,
9796
new CheckpointedOffset(0, 0));
9897
}
9998

flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceReaderTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ void testNoSplitRequestWhenSplitRestored() throws Exception {
5151
final TestingReaderContext context = new TestingReaderContext();
5252
final JdbcSourceReader<String> reader = createReader(context);
5353
reader.addSplits(
54-
Collections.singletonList(new JdbcSourceSplit("1", "select 1", null, 0, null)));
54+
Collections.singletonList(new JdbcSourceSplit("1", "select 1", null, null)));
5555
reader.start();
5656
reader.close();
5757
assertThat(context.getNumSplitRequests()).isEqualTo(0);

flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReaderTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class JdbcSourceSplitReaderTest extends JdbcDataTestBase {
4949

5050
private final JdbcSourceSplit split =
5151
new JdbcSourceSplit(
52-
"1", "select id, title, author, price, qty from " + INPUT_TABLE, null, 0, null);
52+
"1", "select id, title, author, price, qty from " + INPUT_TABLE, null, null);
5353
private final JdbcConnectionProvider connectionProvider =
5454
new SimpleJdbcConnectionProvider(
5555
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()

flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitSerializerTest.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
/** Test for {@link JdbcSourceSplitSerializer}. */
3131
class JdbcSourceSplitSerializerTest {
3232

33-
private final JdbcSourceSplit split = new JdbcSourceSplit("1", "select 1", null, 0, null);
33+
private final JdbcSourceSplit split = new JdbcSourceSplit("1", "select 1", null, null);
3434
private final JdbcSourceSplit mockedSplit = new MockedJdbcSourceSplit(split);
3535
private final JdbcSourceSplitSerializer serializer = new JdbcSourceSplitSerializer();
3636
private final JdbcSourceSplitSerializer mockedSerializer =
@@ -73,7 +73,6 @@ public MockedJdbcSourceSplit(JdbcSourceSplit jdbcSourceSplit) {
7373
jdbcSourceSplit.splitId(),
7474
jdbcSourceSplit.getSqlTemplate(),
7575
(Serializable[]) jdbcSourceSplit.getParameters(),
76-
jdbcSourceSplit.getOffset(),
7776
jdbcSourceSplit.getCheckpointedOffset());
7877
}
7978
}

0 commit comments

Comments
 (0)