Skip to content

Commit

Permalink
Support cancelling in flight REST requests
Browse files Browse the repository at this point in the history
Fixes: #41971
  • Loading branch information
geoand committed Feb 20, 2025
1 parent 6bc9e3c commit df4da57
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ public void handle(RestClientRequestContext requestContext) {
future.subscribe().with(new Consumer<>() {
@Override
public void accept(HttpClientRequest httpClientRequest) {
requestContext.setHttpClientRequest(httpClientRequest);

// adapt headers to HTTP/2 depending on the underlying HTTP connection
ClientSendRequestHandler.this.adaptRequest(httpClientRequest);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.jboss.resteasy.reactive.client.impl;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;

Expand All @@ -10,10 +12,11 @@
import jakarta.ws.rs.core.Response;

import io.smallrye.mutiny.Uni;
import io.vertx.core.http.HttpClientRequest;

public class UniInvoker extends AbstractRxInvoker<Uni<?>> {

private InvocationBuilderImpl invocationBuilder;
private final InvocationBuilderImpl invocationBuilder;

public UniInvoker(InvocationBuilderImpl invocationBuilder) {
this.invocationBuilder = invocationBuilder;
Expand All @@ -22,10 +25,16 @@ public UniInvoker(InvocationBuilderImpl invocationBuilder) {
@Override
public <R> Uni<R> method(String name, Entity<?> entity, GenericType<R> responseType) {
AsyncInvokerImpl invoker = (AsyncInvokerImpl) invocationBuilder.rx();
AtomicReference<RestClientRequestContext> restClientRequestContextRef = new AtomicReference<>();
return Uni.createFrom().completionStage(new Supplier<CompletionStage<R>>() {
@Override
public CompletionStage<R> get() {
return invoker.method(name, entity, responseType);
RestClientRequestContext restClientRequestContext = invoker.performRequestInternal(name, entity,
responseType == null ? new GenericType<>(String.class) : responseType,
true);
restClientRequestContextRef.set(restClientRequestContext);
CompletableFuture response = restClientRequestContext.getResult();
return invoker.mapResponse(response, responseType == null ? String.class : responseType.getRawType());
}
}).onFailure().transform(new Function<>() {
@Override
Expand All @@ -35,6 +44,18 @@ public Throwable apply(Throwable t) {
}
return t;
}
}).onCancellation().invoke(new Runnable() {
@Override
public void run() {
// be very defensive here as things could have been nulled out when the application is being torn down
RestClientRequestContext restClientRequestContext = restClientRequestContextRef.get();
if (restClientRequestContext != null) {
HttpClientRequest httpClientRequest = restClientRequestContext.getHttpClientRequest();
if (httpClientRequest != null) {
httpClientRequest.reset();
}
}
}
});
}

Expand Down

0 comments on commit df4da57

Please sign in to comment.