Skip to content

Add general contextual data #96

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/main/asciidoc/index.adoc
Original file line number Diff line number Diff line change
@@ -198,4 +198,3 @@ To make an interceptor available during the WebSocket handshake, use {@link io.v
----
{@link examples.HttpProxyExamples#webSocketInterceptorPath}
----

8 changes: 4 additions & 4 deletions src/main/java/examples/HttpProxyExamples.java
Original file line number Diff line number Diff line change
@@ -46,20 +46,20 @@ public void proxy(Vertx vertx) {
proxyServer.requestHandler(proxy).listen(8080);
}

private SocketAddress resolveOriginAddress(HttpServerRequest request) {
private SocketAddress resolveOriginAddress(ProxyContext proxyContext) {
return null;
}

public void originSelector(HttpProxy proxy) {
proxy.originSelector(request -> Future.succeededFuture(resolveOriginAddress(request)));
proxy.originSelector(proxyContext -> Future.succeededFuture(resolveOriginAddress(proxyContext)));
}

private RequestOptions resolveOriginOptions(HttpServerRequest request) {
private RequestOptions resolveOriginOptions(ProxyContext proxyContext) {
return null;
}

public void originRequestProvider(HttpProxy proxy) {
proxy.originRequestProvider((request, client) -> client.request(resolveOriginOptions(request)));
proxy.originRequestProvider((proxyContext, client) -> client.request(resolveOriginOptions(proxyContext)));
}

public void inboundInterceptor(HttpProxy proxy) {
46 changes: 41 additions & 5 deletions src/main/java/io/vertx/httpproxy/HttpProxy.java
Original file line number Diff line number Diff line change
@@ -22,6 +22,8 @@
import io.vertx.core.net.SocketAddress;
import io.vertx.httpproxy.impl.ReverseProxy;

import java.util.HashMap;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;

@@ -76,16 +78,40 @@ default HttpProxy origin(int port, String host) {
return origin(SocketAddress.inetSocketAddress(port, host));
}

// /**
// * Set a selector that resolves the <i><b>origin</b></i> address based on the incoming HTTP request.
// *
// * @param selector the selector
// * @return a reference to this, so the API can be used fluently
// */
// @Fluent
// default HttpProxy originSelector(Function<HttpServerRequest, Future<SocketAddress>> selector) {
// return originRequestProvider((req, client) -> selector
// .apply(req)
// .flatMap(server -> client.request(new RequestOptions().setServer(server))));
// }
//
// /**
// * Set a provider that creates the request to the <i><b>origin</b></i> server based the incoming HTTP request.
// * Setting a provider overrides any origin selector previously set.
// *
// * @param provider the provider
// * @return a reference to this, so the API can be used fluently
// */
// @GenIgnore()
// @Fluent
// HttpProxy originRequestProvider(BiFunction<HttpServerRequest, HttpClient, Future<HttpClientRequest>> provider);

/**
* Set a selector that resolves the <i><b>origin</b></i> address based on the incoming HTTP request.
*
* @param selector the selector
* @return a reference to this, so the API can be used fluently
*/
@Fluent
default HttpProxy originSelector(Function<HttpServerRequest, Future<SocketAddress>> selector) {
return originRequestProvider((req, client) -> selector
.apply(req)
default HttpProxy originSelector(Function<ProxyContext, Future<SocketAddress>> selector) {
return originRequestProvider((context, client) -> selector
.apply(context)
.flatMap(server -> client.request(new RequestOptions().setServer(server))));
}

@@ -98,7 +124,7 @@ default HttpProxy originSelector(Function<HttpServerRequest, Future<SocketAddres
*/
@GenIgnore()
@Fluent
HttpProxy originRequestProvider(BiFunction<HttpServerRequest, HttpClient, Future<HttpClientRequest>> provider);
HttpProxy originRequestProvider(BiFunction<ProxyContext, HttpClient, Future<HttpClientRequest>> provider);

/**
* Add an interceptor to the interceptor chain.
@@ -131,6 +157,16 @@ default HttpProxy addInterceptor(ProxyInterceptor interceptor) {
*
* @param request the outbound {@code HttpServerRequest}
*/
void handle(HttpServerRequest request);
default void handle(HttpServerRequest request) {
handle(request, new HashMap<>());
}

/**
* Handle the <i><b>outbound</b></i> {@code HttpServerRequest}.
*
* @param request the outbound {@code HttpServerRequest}
* @param attachments the contextual data holder for {@code ProxyContext}. Must be mutable.
*/
void handle(HttpServerRequest request, Map<String, Object> attachments);

}
21 changes: 11 additions & 10 deletions src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java
Original file line number Diff line number Diff line change
@@ -51,7 +51,7 @@ public class ReverseProxy implements HttpProxy {
private final static Logger log = LoggerFactory.getLogger(ReverseProxy.class);
private final HttpClient client;
private final boolean supportWebSocket;
private BiFunction<HttpServerRequest, HttpClient, Future<HttpClientRequest>> selector = (req, client) -> Future.failedFuture("No origin available");
private BiFunction<ProxyContext, HttpClient, Future<HttpClientRequest>> selector = (req, client) -> Future.failedFuture("No origin available");
private final List<ProxyInterceptorEntry> interceptors = new ArrayList<>();

public ReverseProxy(ProxyOptions options, HttpClient client) {
@@ -75,7 +75,7 @@ public Cache newCache(CacheOptions options, Vertx vertx) {
}

@Override
public HttpProxy originRequestProvider(BiFunction<HttpServerRequest, HttpClient, Future<HttpClientRequest>> provider) {
public HttpProxy originRequestProvider(BiFunction<ProxyContext, HttpClient, Future<HttpClientRequest>> provider) {
selector = provider;
return this;
}
@@ -87,7 +87,7 @@ public HttpProxy addInterceptor(ProxyInterceptor interceptor, boolean supportsWe
}

@Override
public void handle(HttpServerRequest request) {
public void handle(HttpServerRequest request, Map<String, Object> attachments) {
ProxyRequest proxyRequest = ProxyRequest.reverseProxy(request);

// Encoding sanity check
@@ -98,7 +98,7 @@ public void handle(HttpServerRequest request) {
}

boolean isWebSocket = supportWebSocket && request.canUpgradeToWebSocket();
Proxy proxy = new Proxy(proxyRequest, isWebSocket);
Proxy proxy = new Proxy(proxyRequest, isWebSocket, attachments);
proxy.sendRequest()
.recover(throwable -> {
log.trace("Error in sending the request", throwable);
@@ -121,22 +121,23 @@ private void end(ProxyRequest proxyRequest, int sc) {
.send();
}

private Future<HttpClientRequest> resolveOrigin(HttpServerRequest proxiedRequest) {
return selector.apply(proxiedRequest, client);
private Future<HttpClientRequest> resolveOrigin(ProxyContext context) {
return selector.apply(context, client);
}

private class Proxy implements ProxyContext {

private final ProxyRequest request;
private ProxyResponse response;
private final Map<String, Object> attachments = new HashMap<>();
private final Map<String, Object> attachments;
private final ListIterator<ProxyInterceptorEntry> filters;
private final boolean isWebSocket;

private Proxy(ProxyRequest request, boolean isWebSocket) {
private Proxy(ProxyRequest request, boolean isWebSocket, Map<String, Object> attachments) {
this.request = request;
this.isWebSocket = isWebSocket;
this.filters = interceptors.listIterator();
this.attachments = attachments;
}

@Override
@@ -171,7 +172,7 @@ public Future<ProxyResponse> sendRequest() {
} else {
if (isWebSocket) {
HttpServerRequest proxiedRequest = request().proxiedRequest();
return resolveOrigin(proxiedRequest).compose(request -> {
return resolveOrigin(this).compose(request -> {
request.setMethod(request().getMethod());
request.setURI(request().getURI());
// Firefox is known to send an unexpected connection header value
@@ -237,7 +238,7 @@ public Future<Void> sendResponse() {
}

private Future<ProxyResponse> sendProxyRequest(ProxyRequest proxyRequest) {
return resolveOrigin(proxyRequest.proxiedRequest()).compose(proxyRequest::send);
return resolveOrigin(this).compose(proxyRequest::send);
}

private Future<Void> sendProxyResponse(ProxyResponse response) {
141 changes: 141 additions & 0 deletions src/test/java/io/vertx/tests/ProxyContextTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package io.vertx.tests;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.http.*;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.httpproxy.*;
import org.junit.Test;

import java.io.Closeable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import java.util.function.Consumer;

public class ProxyContextTest extends ProxyTestBase {

private WebSocketClient wsClient;

public ProxyContextTest(ProxyOptions options) {
super(options);
}

@Override
public void tearDown(TestContext context) {
super.tearDown(context);
wsClient = null;
}

// same in TestBase, but allow to attach contexts
private Closeable startProxy(Consumer<HttpProxy> config, Map<String, Object> attachments) {
CompletableFuture<Closeable> res = new CompletableFuture<>();
vertx.deployVerticle(new AbstractVerticle() {
HttpClient proxyClient;
HttpServer proxyServer;
HttpProxy proxy;
@Override
public void start(Promise<Void> startFuture) {
proxyClient = vertx.createHttpClient(new HttpClientOptions(clientOptions));
proxyServer = vertx.createHttpServer(new HttpServerOptions(serverOptions));
proxy = HttpProxy.reverseProxy(proxyOptions, proxyClient);
config.accept(proxy);
proxyServer.requestHandler(request -> {
proxy.handle(request, attachments);
});
proxyServer.listen().onComplete(ar -> startFuture.handle(ar.mapEmpty()));
}
}).onComplete(ar -> {
if (ar.succeeded()) {
String id = ar.result();
res.complete(() -> {
CountDownLatch latch = new CountDownLatch(1);
vertx.undeploy(id).onComplete(ar2 -> latch.countDown());
try {
latch.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError(e);
}
});
} else {
res.completeExceptionally(ar.cause());
}
});
try {
return res.get(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError(e);
} catch (ExecutionException e) {
throw new AssertionError(e.getMessage());
} catch (TimeoutException e) {
throw new AssertionError(e);
}
}

@Test
public void testOriginSelector(TestContext ctx) {
Async latch = ctx.async();
SocketAddress backend = startHttpBackend(ctx, 8081, req -> {
req.response().end("end");
});
startProxy(proxy -> {
proxy.originSelector(context -> Future.succeededFuture(context.get("backend", SocketAddress.class)));
}, new HashMap<>(Map.of("backend", backend)));
vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
.compose(HttpClientRequest::send)
.onComplete(ctx.asyncAssertSuccess(resp -> {
ctx.assertEquals(resp.statusCode(), 200);
latch.complete();
}));
}

@Test
public void testOriginSelectorWebSocket(TestContext ctx) {
Async latch = ctx.async();
SocketAddress backend = startHttpBackend(ctx, 8081, req -> {
req.toWebSocket().onSuccess(ws -> {
ws.handler(ws::write);
});
});
startProxy(proxy -> {
proxy.originSelector(context -> Future.succeededFuture(context.get("backend", SocketAddress.class)));
}, new HashMap<>(Map.of("backend", backend)));
wsClient = vertx.createWebSocketClient();
wsClient.connect(8080, "localhost", "/")
.onComplete(ctx.asyncAssertSuccess(ws -> {
latch.complete();
}));
}

@Test
public void testInterceptor(TestContext ctx) {
Async latch = ctx.async();
SocketAddress backend = startHttpBackend(ctx, 8081, req -> {
if (!req.uri().equals("/new-uri")) {
req.response().setStatusCode(404).end();
}
req.response().end("end");
});
startProxy(proxy -> {
proxy.origin(backend)
.addInterceptor(new ProxyInterceptor() {
@Override
public Future<ProxyResponse> handleProxyRequest(ProxyContext context) {
context.request().setURI(context.get("uri", String.class));
return context.sendRequest();
}
});
}, new HashMap<>(Map.of("uri", "/new-uri")));
vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
.compose(HttpClientRequest::send)
.onComplete(ctx.asyncAssertSuccess(resp -> {
ctx.assertEquals(resp.statusCode(), 200);
latch.complete();
}));
}
}
1 change: 1 addition & 0 deletions src/test/java/io/vertx/tests/WebSocketCacheTest.java
Original file line number Diff line number Diff line change
@@ -48,3 +48,4 @@ public void testWsWithCache(TestContext ctx) {
latch.awaitSuccess(20_000);
}
}