Skip to content

Commit

Permalink
Return RetriableRequestException for Netty Max Active Stream error (#…
Browse files Browse the repository at this point in the history
…1001)

* Return RetriableRequestException for Netty Max Active Stream error

* Use Netty H2 Exception in condition checking. Add unit test.

* Refactor code

* Update changelog & bump version

* Update version for testing

* Revert the release-candidate version

---------

Co-authored-by: xintwu <[email protected]>
  • Loading branch information
xt-dev and xintwu authored May 24, 2024
1 parent 5128dd5 commit 63ba831
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 6 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.55.0] - 2024-05-23
- Allow HttpBridge to return RetriableRequestException for the Netty max active stream error

## [29.54.0] - 2024-05-08
- Dual read monitors cluster uris similarity

Expand Down Expand Up @@ -5689,7 +5692,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.54.0...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.55.0...master
[29.55.0]: https://github.com/linkedin/rest.li/compare/v29.54.0...v29.55.0
[29.54.0]: https://github.com/linkedin/rest.li/compare/v29.53.1...v29.54.0
[29.53.1]: https://github.com/linkedin/rest.li/compare/v29.53.0...v29.53.1
[29.53.0]: https://github.com/linkedin/rest.li/compare/v29.52.1...v29.53.0
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=29.54.0
version=29.55.0
group=com.linkedin.pegasus
org.gradle.configureondemand=true
org.gradle.parallel=true
Expand Down
1 change: 1 addition & 0 deletions r2-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ dependencies {
compile externalDependency.servletApi
compile externalDependency.mail
compile externalDependency.javaxActivation
compile externalDependency.netty
testCompile project(':r2-testutils')
testCompile project(':test-util')
testCompile externalDependency.testng
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@


import com.linkedin.r2.RemoteInvocationException;
import com.linkedin.r2.RetriableRequestException;
import com.linkedin.r2.message.Request;
import com.linkedin.r2.message.rest.RestException;
import com.linkedin.r2.message.rest.RestRequest;
Expand All @@ -32,6 +33,7 @@
import com.linkedin.r2.transport.common.bridge.common.TransportResponse;
import com.linkedin.r2.transport.common.bridge.common.TransportResponseImpl;

import io.netty.handler.codec.http2.Http2Exception;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
Expand All @@ -42,6 +44,9 @@
*/
public class HttpBridge
{
public static final String NETTY_MAX_ACTIVE_STREAM_ERROR_MESSAGE =
"Maximum active streams violated for this endpoint";

/**
* Wrap application callback for incoming RestResponse with a "generic" HTTP callback.
*
Expand Down Expand Up @@ -132,6 +137,8 @@ public void onResponse(TransportResponse<RestResponse> response)

/**
* Wrap application callback for incoming StreamResponse with a "generic" HTTP callback.
* If callback returns the error which is in Netty Http2Exception.StreamException type,
* populate RetriableRequestException instead of RemoteInvocationException.
*
* @param callback the callback to receive the incoming RestResponse
* @param request the request, used only to provide useful context in case an error
Expand All @@ -149,11 +156,12 @@ public void onResponse(TransportResponse<StreamResponse> response)
{
if (response.hasError())
{
Throwable responseError = response.getError();
// If the error is due to the netty max active stream error, wrap it with RetriableRequestException instead
RemoteInvocationException exception =
wrapResponseError("Failed to get response from server for URI " + uri, responseError);
response =
TransportResponseImpl.error(new RemoteInvocationException("Failed to get response from server for URI "
+ uri,
response.getError()),
response.getWireAttributes());
TransportResponseImpl.error(exception, response.getWireAttributes());
}
else if (!RestStatus.isOK(response.getResponse().getStatus()))
{
Expand Down Expand Up @@ -209,6 +217,35 @@ public void onResponse(TransportResponse<StreamResponse> response)
};
}

/**
* Check if the error is due to the netty max active stream error.
* @param responseError Throwable error to check
* @return True if the error is due to the netty max active stream error, false otherwise
*/
private static boolean shouldReturnRetriableRequestException(Throwable responseError)
{
return responseError instanceof Http2Exception.StreamException
&& responseError.getMessage().contains(NETTY_MAX_ACTIVE_STREAM_ERROR_MESSAGE);
}

/**
* Wrap the response error with the appropriate exception type.
* If the error is due to the netty max active stream, wrap it with RetriableRequestException.
* @param errorMessage Error message to wrap
* @param responseError Throwable error to wrap
* @return RemoteInvocationException or RetriableRequestException
*/
private static RemoteInvocationException wrapResponseError(String errorMessage, Throwable responseError) {
if (shouldReturnRetriableRequestException(responseError))
{
return new RetriableRequestException(errorMessage, responseError);
}
else
{
return new RemoteInvocationException(errorMessage, responseError);
}
}

/**
* Gets the URI to display in exception messages. The query parameters part of the URI is omitted to prevent
* displaying sensitive information.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

package test.r2.transport.http.common;

import com.linkedin.r2.RetriableRequestException;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Exception;
import java.net.URI;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand All @@ -43,13 +46,18 @@
import com.linkedin.r2.transport.common.bridge.common.TransportResponseImpl;
import com.linkedin.r2.transport.http.common.HttpBridge;

import static com.linkedin.r2.transport.http.common.HttpBridge.NETTY_MAX_ACTIVE_STREAM_ERROR_MESSAGE;


/**
* @author Steven Ihde
* @version $Revision: $
*/

public class TestHttpBridge
{
private static final int REGULAR_STREAM_ID = 2; // Can not be 0 or 1 as they are reserved in Netty

@Test
public void testRestToHttpErrorMessage() throws TimeoutException, InterruptedException
{
Expand Down Expand Up @@ -144,4 +152,31 @@ public void testHttpToStreamErrorMessage() throws TimeoutException, InterruptedE
// propagating the actual exception
Assert.assertSame(resp, streamResponse);
}

@Test
public void testStreamToHttpWithRetriableRequestException() throws TimeoutException, InterruptedException
{
URI uri = URI.create("http://some.host");

RestRequest r = new RestRequestBuilder(uri).build();

FutureCallback<StreamResponse> futureCallback = new FutureCallback<>();
TransportCallback<StreamResponse> callback = new TransportCallbackAdapter<>(futureCallback);
TransportCallback<StreamResponse> bridgeCallback = HttpBridge.streamToHttpCallback(callback,
Messages.toStreamRequest(r));

bridgeCallback.onResponse(TransportResponseImpl.<StreamResponse>error(
Http2Exception.streamError(REGULAR_STREAM_ID, Http2Error.REFUSED_STREAM,
NETTY_MAX_ACTIVE_STREAM_ERROR_MESSAGE + ": 200")));

try
{
futureCallback.get(30, TimeUnit.SECONDS);
Assert.fail("get should have thrown exception");
}
catch (ExecutionException e)
{
Assert.assertTrue(e.getCause() instanceof RetriableRequestException);
}
}
}

0 comments on commit 63ba831

Please sign in to comment.