Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-17709. [ARR] Add async responder and async handler performance metrics. #7292

Open
wants to merge 18 commits into
base: HDFS-17531
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
a3e5258
HDFS-17543. [ARR] AsyncUtil makes asynchronous code more concise and …
KeeProMise Jul 1, 2024
ff2a574
HADOOP-19235. IPC client uses CompletableFuture to support asynchrono…
KeeProMise Jul 24, 2024
9217317
HDFS-17544. [ARR] The router client rpc protocol PB supports asynchro…
KeeProMise Aug 2, 2024
71c7466
HDFS-17545. [ARR] router async rpc client. (#6871). Contributed by Ji…
KeeProMise Sep 27, 2024
724c7d3
HDFS-17594. [ARR] RouterCacheAdmin supports asynchronous rpc. (#6986)…
Archie-wang Oct 15, 2024
1152442
HDFS-17597. [ARR] RouterSnapshot supports asynchronous rpc. (#6994). …
LeoLeeeeee Oct 16, 2024
2aaa117
HDFS-17595. [ARR] ErasureCoding supports asynchronous rpc. (#6983). C…
hfutatzhanghb Oct 16, 2024
118061c
HDFS-17601. [ARR] RouterRpcServer supports asynchronous rpc. (#7108).…
hfutatzhanghb Nov 9, 2024
ea3c4c8
HDFS-17596. [ARR] RouterStoragePolicy supports asynchronous rpc. (#6…
hfutatzhanghb Nov 11, 2024
33c661b
HDFS-17656. [ARR] RouterNamenodeProtocol and RouterUserProtocol suppo…
KeeProMise Nov 25, 2024
d8768cf
HDFS-17659. [ARR]Router Quota supports asynchronous rpc. (#7157). Con…
hfutatzhanghb Nov 25, 2024
d4a6a27
HDFS-17672. [ARR] Move asynchronous related classes to the async pack…
KeeProMise Nov 28, 2024
be06adc
HADOOP-19361. RPC DeferredMetrics bugfix. (#7220). Contributed by hfu…
hfutatzhanghb Dec 10, 2024
c48db62
HDFS-17640.[ARR] RouterClientProtocol supports asynchronous rpc. (#7188)
hfutatzhanghb Dec 31, 2024
273673c
HDFS-17650. [ARR] The router server-side rpc protocol PB supports asy…
hfutatzhanghb Jan 9, 2025
02381aa
HDFS-17709. [ARR] Add async responder performance metrics.
hfutatzhanghb Jan 16, 2025
ce79a24
fix checkstyle.
hfutatzhanghb Jan 16, 2025
1073071
trigger yetus.
hfutatzhanghb Jan 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ public class Client implements AutoCloseable {
private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
private static final ThreadLocal<Object> EXTERNAL_CALL_HANDLER
= new ThreadLocal<>();
private static final ThreadLocal<AsyncGet<? extends Writable, IOException>>
ASYNC_RPC_RESPONSE = new ThreadLocal<>();
private static final ThreadLocal<CompletableFuture<Writable>> ASYNC_RPC_RESPONSE
= new ThreadLocal<>();
private static final ThreadLocal<Boolean> asynchronousMode =
new ThreadLocal<Boolean>() {
@Override
Expand All @@ -110,7 +110,46 @@ protected Boolean initialValue() {
@Unstable
public static <T extends Writable> AsyncGet<T, IOException>
getAsyncRpcResponse() {
return (AsyncGet<T, IOException>) ASYNC_RPC_RESPONSE.get();
CompletableFuture<Writable> responseFuture = ASYNC_RPC_RESPONSE.get();
return new AsyncGet<T, IOException>() {
@Override
public T get(long timeout, TimeUnit unit)
throws IOException, TimeoutException, InterruptedException {
try {
if (unit == null || timeout < 0) {
return (T) responseFuture.get();
}
return (T) responseFuture.get(timeout, unit);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw (IOException) cause;
}
throw new IllegalStateException(e);
}
}

@Override
public boolean isDone() {
return responseFuture.isDone();
}
};
}

/**
* Retrieves the current response future from the thread-local storage.
*
* @return A {@link CompletableFuture} of type T that represents the
* asynchronous operation. If no response future is present in
* the thread-local storage, this method returns {@code null}.
* @param <T> The type of the value completed by the returned
* {@link CompletableFuture}. It must be a subclass of
* {@link Writable}.
* @see CompletableFuture
* @see Writable
*/
public static <T extends Writable> CompletableFuture<T> getResponseFuture() {
return (CompletableFuture<T>) ASYNC_RPC_RESPONSE.get();
}

/**
Expand Down Expand Up @@ -277,10 +316,8 @@ static class Call {
final int id; // call id
final int retry; // retry count
final Writable rpcRequest; // the serialized rpc request
Writable rpcResponse; // null if rpc has error
IOException error; // exception, null if success
private final CompletableFuture<Writable> rpcResponseFuture;
final RPC.RpcKind rpcKind; // Rpc EngineKind
boolean done; // true when call is done
private final Object externalHandler;
private AlignmentContext alignmentContext;

Expand All @@ -304,6 +341,7 @@ private Call(RPC.RpcKind rpcKind, Writable param) {
}

this.externalHandler = EXTERNAL_CALL_HANDLER.get();
this.rpcResponseFuture = new CompletableFuture<>();
}

@Override
Expand All @@ -314,9 +352,6 @@ public String toString() {
/** Indicate when the call is complete and the
* value or error are available. Notifies by default. */
protected synchronized void callComplete() {
this.done = true;
notify(); // notify caller

if (externalHandler != null) {
synchronized (externalHandler) {
externalHandler.notify();
Expand All @@ -339,7 +374,7 @@ public synchronized void setAlignmentContext(AlignmentContext ac) {
* @param error exception thrown by the call; either local or remote
*/
public synchronized void setException(IOException error) {
this.error = error;
rpcResponseFuture.completeExceptionally(error);
callComplete();
}

Expand All @@ -349,13 +384,9 @@ public synchronized void setException(IOException error) {
* @param rpcResponse return value of the rpc call.
*/
public synchronized void setRpcResponse(Writable rpcResponse) {
this.rpcResponse = rpcResponse;
rpcResponseFuture.complete(rpcResponse);
callComplete();
}

public synchronized Writable getRpcResponse() {
return rpcResponse;
}
}

/** Thread that reads responses and notifies callers. Each connection owns a
Expand Down Expand Up @@ -1495,39 +1526,19 @@ Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
}

if (isAsynchronousMode()) {
final AsyncGet<Writable, IOException> asyncGet
= new AsyncGet<Writable, IOException>() {
@Override
public Writable get(long timeout, TimeUnit unit)
throws IOException, TimeoutException{
boolean done = true;
try {
final Writable w = getRpcResponse(call, connection, timeout, unit);
if (w == null) {
done = false;
throw new TimeoutException(call + " timed out "
+ timeout + " " + unit);
}
return w;
} finally {
if (done) {
releaseAsyncCall();
CompletableFuture<Writable> result = call.rpcResponseFuture.handle(
(rpcResponse, e) -> {
releaseAsyncCall();
if (e != null) {
IOException ioe = (IOException) e;
throw new CompletionException(warpIOException(ioe, connection));
}
}
}

@Override
public boolean isDone() {
synchronized (call) {
return call.done;
}
}
};

ASYNC_RPC_RESPONSE.set(asyncGet);
return rpcResponse;
});
ASYNC_RPC_RESPONSE.set(result);
return null;
} else {
return getRpcResponse(call, connection, -1, null);
return getRpcResponse(call, connection);
}
}

Expand Down Expand Up @@ -1564,37 +1575,34 @@ int getAsyncCallCount() {
}

/** @return the rpc response or, in case of timeout, null. */
private Writable getRpcResponse(final Call call, final Connection connection,
final long timeout, final TimeUnit unit) throws IOException {
synchronized (call) {
while (!call.done) {
try {
AsyncGet.Util.wait(call, timeout, unit);
if (timeout >= 0 && !call.done) {
return null;
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new InterruptedIOException("Call interrupted");
}
private Writable getRpcResponse(final Call call, final Connection connection)
throws IOException {
try {
return call.rpcResponseFuture.get();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new InterruptedIOException("Call interrupted");
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw warpIOException((IOException) cause, connection);
}
throw new IllegalStateException(e);
}
}

if (call.error != null) {
if (call.error instanceof RemoteException ||
call.error instanceof SaslException) {
call.error.fillInStackTrace();
throw call.error;
} else { // local exception
InetSocketAddress address = connection.getRemoteAddress();
throw NetUtils.wrapException(address.getHostName(),
address.getPort(),
NetUtils.getHostname(),
0,
call.error);
}
} else {
return call.getRpcResponse();
}
private IOException warpIOException(IOException ioe, Connection connection) {
if (ioe instanceof RemoteException ||
ioe instanceof SaslException) {
ioe.fillInStackTrace();
return ioe;
} else { // local exception
InetSocketAddress address = connection.getRemoteAddress();
return NetUtils.wrapException(address.getHostName(),
address.getPort(),
NetUtils.getHostname(),
0,
ioe);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public Message invoke(Object proxy, final Method method, Object[] args)
throws ServiceException {
long startTime = 0;
if (LOG.isDebugEnabled()) {
startTime = Time.now();
startTime = Time.monotonicNow();
}

if (args.length != 2) { // RpcController + Message
Expand Down Expand Up @@ -267,7 +267,7 @@ public Message invoke(Object proxy, final Method method, Object[] args)
}

if (LOG.isDebugEnabled()) {
long callTime = Time.now() - startTime;
long callTime = Time.monotonicNow() - startTime;
LOG.debug("Call: " + method.getName() + " took " + callTime + "ms");
}

Expand Down Expand Up @@ -399,19 +399,19 @@ public ProtobufRpcEngineCallbackImpl() {
this.server = CURRENT_CALL_INFO.get().getServer();
this.call = Server.getCurCall().get();
this.methodName = CURRENT_CALL_INFO.get().getMethodName();
this.setupTime = Time.now();
this.setupTime = Time.monotonicNow();
}

@Override
public void setResponse(Message message) {
long processingTime = Time.now() - setupTime;
long processingTime = Time.monotonicNow() - setupTime;
call.setDeferredResponse(RpcWritable.wrap(message));
server.updateDeferredMetrics(methodName, processingTime);
}

@Override
public void error(Throwable t) {
long processingTime = Time.now() - setupTime;
long processingTime = Time.monotonicNow() - setupTime;
String detailedMetricsName = t.getClass().getSimpleName();
server.updateDeferredMetrics(detailedMetricsName, processingTime);
call.setDeferredError(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public Message invoke(Object proxy, final Method method, Object[] args)
throws ServiceException {
long startTime = 0;
if (LOG.isDebugEnabled()) {
startTime = Time.now();
startTime = Time.monotonicNow();
}

if (args.length != 2) { // RpcController + Message
Expand Down Expand Up @@ -277,7 +277,7 @@ public Message invoke(Object proxy, final Method method, Object[] args)
}

if (LOG.isDebugEnabled()) {
long callTime = Time.now() - startTime;
long callTime = Time.monotonicNow() - startTime;
LOG.debug("Call: " + method.getName() + " took " + callTime + "ms");
}

Expand Down Expand Up @@ -431,19 +431,19 @@ static class ProtobufRpcEngineCallbackImpl
this.server = CURRENT_CALL_INFO.get().getServer();
this.call = Server.getCurCall().get();
this.methodName = CURRENT_CALL_INFO.get().getMethodName();
this.setupTime = Time.now();
this.setupTime = Time.monotonicNow();
}

@Override
public void setResponse(Message message) {
long processingTime = Time.now() - setupTime;
long processingTime = Time.monotonicNow() - setupTime;
call.setDeferredResponse(RpcWritable.wrap(message));
server.updateDeferredMetrics(methodName, processingTime);
}

@Override
public void error(Throwable t) {
long processingTime = Time.now() - setupTime;
long processingTime = Time.monotonicNow() - setupTime;
String detailedMetricsName = t.getClass().getSimpleName();
server.updateDeferredMetrics(detailedMetricsName, processingTime);
call.setDeferredError(t);
Expand Down
Loading
Loading