Skip to content

Commit b0eb3e9

Browse files
dongjoon-hyunhaoyangeng-db
authored andcommitted
[SPARK-52874][CORE] Support o.a.s.util.Pair Java Record
### What changes were proposed in this pull request? This PR aims to add `o.a.s.util.Pair` Java Record for Java code and to enforce it. ```scala scala> val pair = org.apache.spark.util.Pair.of(1,2); val pair: org.apache.spark.util.Pair[Int,Int] = Pair[getLeft=1, getRight=2] scala> pair.getLeft() val res0: Int = 1 scala> pair.getRight() val res1: Int = 2 ``` ### Why are the changes needed? Since Java 16, we can use `Records` officially in Java. | VERSION | JEP | | - | - | | Java 14 | [JEP 359: Records (Preview)](https://openjdk.org/jeps/359) | | Java 15 | [JEP 384: Records (Second Preview)](https://openjdk.org/jeps/384) | | Java 16 | [JEP 395: Records](https://openjdk.org/jeps/395) | ### Does this PR introduce _any_ user-facing change? No behavior change. ### How was this patch tested? Pass the CIs to make it sure all tests passed. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#51568 from dongjoon-hyun/SPARK-52874. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent ae24efe commit b0eb3e9

File tree

10 files changed

+51
-22
lines changed

10 files changed

+51
-22
lines changed

common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@
2626

2727
import com.google.common.annotations.VisibleForTesting;
2828
import io.netty.channel.Channel;
29-
import org.apache.commons.lang3.tuple.ImmutablePair;
30-
import org.apache.commons.lang3.tuple.Pair;
3129

3230
import org.apache.spark.internal.SparkLogger;
3331
import org.apache.spark.internal.SparkLoggerFactory;
@@ -45,6 +43,7 @@
4543
import org.apache.spark.network.server.MessageHandler;
4644
import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
4745
import org.apache.spark.network.util.TransportFrameDecoder;
46+
import org.apache.spark.util.Pair;
4847

4948
/**
5049
* Handler that processes server responses, in response to requests issued from a
@@ -96,7 +95,7 @@ public void removeRpcRequest(long requestId) {
9695

9796
public void addStreamCallback(String streamId, StreamCallback callback) {
9897
updateTimeOfLastRequest();
99-
streamCallbacks.offer(ImmutablePair.of(streamId, callback));
98+
streamCallbacks.offer(Pair.of(streamId, callback));
10099
}
101100

102101
@VisibleForTesting
@@ -125,7 +124,7 @@ private void failOutstandingRequests(Throwable cause) {
125124
}
126125
for (Pair<String, StreamCallback> entry : streamCallbacks) {
127126
try {
128-
entry.getValue().onFailure(entry.getKey(), cause);
127+
entry.getRight().onFailure(entry.getLeft(), cause);
129128
} catch (Exception e) {
130129
logger.warn("StreamCallback.onFailure throws exception", e);
131130
}
@@ -236,7 +235,7 @@ public void handle(ResponseMessage message) throws Exception {
236235
} else if (message instanceof StreamResponse resp) {
237236
Pair<String, StreamCallback> entry = streamCallbacks.poll();
238237
if (entry != null) {
239-
StreamCallback callback = entry.getValue();
238+
StreamCallback callback = entry.getRight();
240239
if (resp.byteCount > 0) {
241240
StreamInterceptor<ResponseMessage> interceptor = new StreamInterceptor<>(
242241
this, resp.streamId, resp.byteCount, callback);
@@ -262,7 +261,7 @@ public void handle(ResponseMessage message) throws Exception {
262261
} else if (message instanceof StreamFailure resp) {
263262
Pair<String, StreamCallback> entry = streamCallbacks.poll();
264263
if (entry != null) {
265-
StreamCallback callback = entry.getValue();
264+
StreamCallback callback = entry.getRight();
266265
try {
267266
callback.onFailure(resp.streamId, new RuntimeException(resp.error));
268267
} catch (IOException ioe) {

common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,12 @@
2626
import com.google.common.annotations.VisibleForTesting;
2727
import com.google.common.base.Preconditions;
2828
import io.netty.channel.Channel;
29-
import org.apache.commons.lang3.tuple.ImmutablePair;
30-
import org.apache.commons.lang3.tuple.Pair;
3129

3230
import org.apache.spark.internal.SparkLogger;
3331
import org.apache.spark.internal.SparkLoggerFactory;
3432
import org.apache.spark.network.buffer.ManagedBuffer;
3533
import org.apache.spark.network.client.TransportClient;
34+
import org.apache.spark.util.Pair;
3635

3736
/**
3837
* StreamManager which allows registration of an Iterator&lt;ManagedBuffer&gt;, which are
@@ -127,7 +126,7 @@ public static Pair<Long, Integer> parseStreamChunkId(String streamChunkId) {
127126
"Stream id and chunk index should be specified.";
128127
long streamId = Long.valueOf(array[0]);
129128
int chunkIndex = Integer.valueOf(array[1]);
130-
return ImmutablePair.of(streamId, chunkIndex);
129+
return Pair.of(streamId, chunkIndex);
131130
}
132131

133132
@Override

common/network-common/src/test/java/org/apache/spark/network/ChunkFetchRequestHandlerSuite.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,14 @@
2727

2828
import static org.mockito.Mockito.*;
2929

30-
import org.apache.commons.lang3.tuple.ImmutablePair;
31-
import org.apache.commons.lang3.tuple.Pair;
3230
import org.apache.spark.network.buffer.ManagedBuffer;
3331
import org.apache.spark.network.client.TransportClient;
3432
import org.apache.spark.network.protocol.*;
3533
import org.apache.spark.network.server.ChunkFetchRequestHandler;
3634
import org.apache.spark.network.server.NoOpRpcHandler;
3735
import org.apache.spark.network.server.OneForOneStreamManager;
3836
import org.apache.spark.network.server.RpcHandler;
37+
import org.apache.spark.util.Pair;
3938

4039
public class ChunkFetchRequestHandlerSuite {
4140

@@ -54,7 +53,7 @@ public void handleChunkFetchRequest() throws Exception {
5453
.thenAnswer(invocationOnMock0 -> {
5554
Object response = invocationOnMock0.getArguments()[0];
5655
ExtendedChannelPromise channelFuture = new ExtendedChannelPromise(channel);
57-
responseAndPromisePairs.add(ImmutablePair.of(response, channelFuture));
56+
responseAndPromisePairs.add(Pair.of(response, channelFuture));
5857
return channelFuture;
5958
});
6059

common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@
2626

2727
import com.google.common.collect.Sets;
2828
import com.google.common.io.Files;
29-
import org.apache.commons.lang3.tuple.ImmutablePair;
30-
import org.apache.commons.lang3.tuple.Pair;
3129
import org.junit.jupiter.api.AfterAll;
3230
import org.junit.jupiter.api.BeforeAll;
3331
import org.junit.jupiter.api.Test;
@@ -41,6 +39,7 @@
4139
import org.apache.spark.network.util.JavaUtils;
4240
import org.apache.spark.network.util.MapConfigProvider;
4341
import org.apache.spark.network.util.TransportConf;
42+
import org.apache.spark.util.Pair;
4443

4544
public class RpcIntegrationSuite {
4645
static TransportConf conf;
@@ -408,7 +407,7 @@ private Pair<Set<String>, Set<String>> checkErrorsContain(
408407
notFound.add(contain);
409408
}
410409
}
411-
return new ImmutablePair<>(remainingErrors, notFound);
410+
return new Pair<>(remainingErrors, notFound);
412411
}
413412

414413
private static class VerifyingStreamCallback implements StreamCallbackWithID {

common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828
import static org.junit.jupiter.api.Assertions.*;
2929
import static org.mockito.Mockito.*;
3030

31-
import org.apache.commons.lang3.tuple.ImmutablePair;
32-
import org.apache.commons.lang3.tuple.Pair;
3331
import org.apache.spark.network.buffer.ManagedBuffer;
3432
import org.apache.spark.network.client.RpcResponseCallback;
3533
import org.apache.spark.network.client.TransportClient;
@@ -39,6 +37,7 @@
3937
import org.apache.spark.network.server.RpcHandler;
4038
import org.apache.spark.network.server.StreamManager;
4139
import org.apache.spark.network.server.TransportRequestHandler;
40+
import org.apache.spark.util.Pair;
4241

4342
public class TransportRequestHandlerSuite {
4443

@@ -53,7 +52,7 @@ public void handleStreamRequest() throws Exception {
5352
.thenAnswer(invocationOnMock0 -> {
5453
Object response = invocationOnMock0.getArguments()[0];
5554
ExtendedChannelPromise channelFuture = new ExtendedChannelPromise(channel);
56-
responseAndPromisePairs.add(ImmutablePair.of(response, channelFuture));
55+
responseAndPromisePairs.add(Pair.of(response, channelFuture));
5756
return channelFuture;
5857
});
5958

@@ -145,7 +144,7 @@ public MergedBlockMetaReqHandler getMergedBlockMetaReqHandler() {
145144
when(channel.writeAndFlush(any())).thenAnswer(invocationOnMock0 -> {
146145
Object response = invocationOnMock0.getArguments()[0];
147146
ExtendedChannelPromise channelFuture = new ExtendedChannelPromise(channel);
148-
responseAndPromisePairs.add(ImmutablePair.of(response, channelFuture));
147+
responseAndPromisePairs.add(Pair.of(response, channelFuture));
149148
return channelFuture;
150149
});
151150

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828

2929
import org.apache.commons.lang3.builder.ToStringBuilder;
3030
import org.apache.commons.lang3.builder.ToStringStyle;
31-
import org.apache.commons.lang3.tuple.Pair;
3231
import com.fasterxml.jackson.annotation.JsonCreator;
3332
import com.fasterxml.jackson.annotation.JsonProperty;
3433
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -56,6 +55,7 @@
5655
import org.apache.spark.network.util.JavaUtils;
5756
import org.apache.spark.network.util.NettyUtils;
5857
import org.apache.spark.network.util.TransportConf;
58+
import org.apache.spark.util.Pair;
5959

6060
/**
6161
* Manages converting shuffle BlockIds into physical segments of local files, from a process outside
@@ -400,7 +400,7 @@ public Map<String, String[]> getLocalDirs(String appId, Set<String> execIds) {
400400
}
401401
return Pair.of(exec, info.localDirs);
402402
})
403-
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
403+
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
404404
}
405405

406406
/**
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.util;
19+
20+
/**
21+
* An immutable pair of values. Note that the fields are intentionally designed to be `getLeft` and
22+
* `getRight` instead of `left` and `right` in order to mitigate the migration burden
23+
* from `org.apache.commons.lang3.tuple.Pair`.
24+
*/
25+
public record Pair<L, R>(L getLeft, R getRight) {
26+
public static <L, R> Pair<L, R> of(L left, R right) {
27+
return new Pair<>(left, right);
28+
}
29+
}

common/utils/src/test/java/org/apache/spark/util/SparkLoggerSuiteBase.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.nio.file.Files;
2323
import java.util.List;
2424

25-
import org.apache.commons.lang3.tuple.Pair;
2625
import org.apache.logging.log4j.Level;
2726
import org.junit.jupiter.api.Test;
2827

dev/checkstyle.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@
183183
<module name="IllegalImport">
184184
<property name="illegalPkgs" value="org.apache.log4j" />
185185
<property name="illegalPkgs" value="org.apache.commons.lang" />
186+
<property name="illegalPkgs" value="org.apache.commons.lang3.tuple" />
186187
<property name="illegalClasses" value="org.apache.commons.lang3.JavaVersion" />
187188
</module>
188189
<module name="RegexpSinglelineJava">

scalastyle-config.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,11 @@ This file is divided into 3 sections:
294294
Commons Lang 3 JavaVersion (org.apache.commons.lang3.JavaVersion)</customMessage>
295295
</check>
296296

297+
<check customId="commonslang3tuple" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
298+
<parameters><parameter name="regex">org\.apache\.commons\.lang3\.tuple</parameter></parameters>
299+
<customMessage>Use org.apache.spark.util.Pair instead</customMessage>
300+
</check>
301+
297302
<check customId="uribuilder" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
298303
<parameters><parameter name="regex">UriBuilder\.fromUri</parameter></parameters>
299304
<customMessage>Use Utils.getUriBuilder instead.</customMessage>

0 commit comments

Comments
 (0)