Skip to content

Commit a8ce427

Browse files
committed
add general context data
1 parent 4379afe commit a8ce427

File tree

4 files changed

+197
-19
lines changed

4 files changed

+197
-19
lines changed

src/main/java/examples/HttpProxyExamples.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,20 +46,20 @@ public void proxy(Vertx vertx) {
4646
proxyServer.requestHandler(proxy).listen(8080);
4747
}
4848

49-
private SocketAddress resolveOriginAddress(HttpServerRequest request) {
49+
private SocketAddress resolveOriginAddress(ProxyContext proxyContext) {
5050
return null;
5151
}
5252

5353
public void originSelector(HttpProxy proxy) {
54-
proxy.originSelector(request -> Future.succeededFuture(resolveOriginAddress(request)));
54+
proxy.originSelector(proxyContext -> Future.succeededFuture(resolveOriginAddress(proxyContext)));
5555
}
5656

57-
private RequestOptions resolveOriginOptions(HttpServerRequest request) {
57+
private RequestOptions resolveOriginOptions(ProxyContext proxyContext) {
5858
return null;
5959
}
6060

6161
public void originRequestProvider(HttpProxy proxy) {
62-
proxy.originRequestProvider((request, client) -> client.request(resolveOriginOptions(request)));
62+
proxy.originRequestProvider((proxyContext, client) -> client.request(resolveOriginOptions(proxyContext)));
6363
}
6464

6565
public void inboundInterceptor(HttpProxy proxy) {

src/main/java/io/vertx/httpproxy/HttpProxy.java

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import io.vertx.core.net.SocketAddress;
2323
import io.vertx.httpproxy.impl.ReverseProxy;
2424

25+
import java.util.HashMap;
26+
import java.util.Map;
2527
import java.util.function.BiFunction;
2628
import java.util.function.Function;
2729

@@ -76,16 +78,40 @@ default HttpProxy origin(int port, String host) {
7678
return origin(SocketAddress.inetSocketAddress(port, host));
7779
}
7880

81+
// /**
82+
// * Set a selector that resolves the <i><b>origin</b></i> address based on the incoming HTTP request.
83+
// *
84+
// * @param selector the selector
85+
// * @return a reference to this, so the API can be used fluently
86+
// */
87+
// @Fluent
88+
// default HttpProxy originSelector(Function<HttpServerRequest, Future<SocketAddress>> selector) {
89+
// return originRequestProvider((req, client) -> selector
90+
// .apply(req)
91+
// .flatMap(server -> client.request(new RequestOptions().setServer(server))));
92+
// }
93+
//
94+
// /**
95+
// * Set a provider that creates the request to the <i><b>origin</b></i> server based the incoming HTTP request.
96+
// * Setting a provider overrides any origin selector previously set.
97+
// *
98+
// * @param provider the provider
99+
// * @return a reference to this, so the API can be used fluently
100+
// */
101+
// @GenIgnore()
102+
// @Fluent
103+
// HttpProxy originRequestProvider(BiFunction<HttpServerRequest, HttpClient, Future<HttpClientRequest>> provider);
104+
79105
/**
80106
* Set a selector that resolves the <i><b>origin</b></i> address based on the incoming HTTP request.
81107
*
82108
* @param selector the selector
83109
* @return a reference to this, so the API can be used fluently
84110
*/
85111
@Fluent
86-
default HttpProxy originSelector(Function<HttpServerRequest, Future<SocketAddress>> selector) {
87-
return originRequestProvider((req, client) -> selector
88-
.apply(req)
112+
default HttpProxy originSelector(Function<ProxyContext, Future<SocketAddress>> selector) {
113+
return originRequestProvider((context, client) -> selector
114+
.apply(context)
89115
.flatMap(server -> client.request(new RequestOptions().setServer(server))));
90116
}
91117

@@ -98,7 +124,7 @@ default HttpProxy originSelector(Function<HttpServerRequest, Future<SocketAddres
98124
*/
99125
@GenIgnore()
100126
@Fluent
101-
HttpProxy originRequestProvider(BiFunction<HttpServerRequest, HttpClient, Future<HttpClientRequest>> provider);
127+
HttpProxy originRequestProvider(BiFunction<ProxyContext, HttpClient, Future<HttpClientRequest>> provider);
102128

103129
/**
104130
* Add an interceptor to the interceptor chain.
@@ -131,6 +157,16 @@ default HttpProxy addInterceptor(ProxyInterceptor interceptor) {
131157
*
132158
* @param request the outbound {@code HttpServerRequest}
133159
*/
134-
void handle(HttpServerRequest request);
160+
default void handle(HttpServerRequest request) {
161+
handle(request, new HashMap<>());
162+
}
163+
164+
/**
165+
* Handle the <i><b>outbound</b></i> {@code HttpServerRequest}.
166+
*
167+
* @param request the outbound {@code HttpServerRequest}
168+
* @param attachments the contextual data holder for {@code ProxyContext}. Must be mutable.
169+
*/
170+
void handle(HttpServerRequest request, Map<String, Object> attachments);
135171

136172
}

src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public class ReverseProxy implements HttpProxy {
5151
private final static Logger log = LoggerFactory.getLogger(ReverseProxy.class);
5252
private final HttpClient client;
5353
private final boolean supportWebSocket;
54-
private BiFunction<HttpServerRequest, HttpClient, Future<HttpClientRequest>> selector = (req, client) -> Future.failedFuture("No origin available");
54+
private BiFunction<ProxyContext, HttpClient, Future<HttpClientRequest>> selector = (req, client) -> Future.failedFuture("No origin available");
5555
private final List<ProxyInterceptorEntry> interceptors = new ArrayList<>();
5656

5757
public ReverseProxy(ProxyOptions options, HttpClient client) {
@@ -75,7 +75,7 @@ public Cache newCache(CacheOptions options, Vertx vertx) {
7575
}
7676

7777
@Override
78-
public HttpProxy originRequestProvider(BiFunction<HttpServerRequest, HttpClient, Future<HttpClientRequest>> provider) {
78+
public HttpProxy originRequestProvider(BiFunction<ProxyContext, HttpClient, Future<HttpClientRequest>> provider) {
7979
selector = provider;
8080
return this;
8181
}
@@ -87,7 +87,7 @@ public HttpProxy addInterceptor(ProxyInterceptor interceptor, boolean supportsWe
8787
}
8888

8989
@Override
90-
public void handle(HttpServerRequest request) {
90+
public void handle(HttpServerRequest request, Map<String, Object> attachments) {
9191
ProxyRequest proxyRequest = ProxyRequest.reverseProxy(request);
9292

9393
// Encoding sanity check
@@ -98,7 +98,7 @@ public void handle(HttpServerRequest request) {
9898
}
9999

100100
boolean isWebSocket = supportWebSocket && request.canUpgradeToWebSocket();
101-
Proxy proxy = new Proxy(proxyRequest, isWebSocket);
101+
Proxy proxy = new Proxy(proxyRequest, isWebSocket, attachments);
102102
proxy.sendRequest()
103103
.recover(throwable -> {
104104
log.trace("Error in sending the request", throwable);
@@ -121,22 +121,23 @@ private void end(ProxyRequest proxyRequest, int sc) {
121121
.send();
122122
}
123123

124-
private Future<HttpClientRequest> resolveOrigin(HttpServerRequest proxiedRequest) {
125-
return selector.apply(proxiedRequest, client);
124+
private Future<HttpClientRequest> resolveOrigin(ProxyContext context) {
125+
return selector.apply(context, client);
126126
}
127127

128128
private class Proxy implements ProxyContext {
129129

130130
private final ProxyRequest request;
131131
private ProxyResponse response;
132-
private final Map<String, Object> attachments = new HashMap<>();
132+
private final Map<String, Object> attachments;
133133
private final ListIterator<ProxyInterceptorEntry> filters;
134134
private final boolean isWebSocket;
135135

136-
private Proxy(ProxyRequest request, boolean isWebSocket) {
136+
private Proxy(ProxyRequest request, boolean isWebSocket, Map<String, Object> attachments) {
137137
this.request = request;
138138
this.isWebSocket = isWebSocket;
139139
this.filters = interceptors.listIterator();
140+
this.attachments = attachments;
140141
}
141142

142143
@Override
@@ -171,7 +172,7 @@ public Future<ProxyResponse> sendRequest() {
171172
} else {
172173
if (isWebSocket) {
173174
HttpServerRequest proxiedRequest = request().proxiedRequest();
174-
return resolveOrigin(proxiedRequest).compose(request -> {
175+
return resolveOrigin(this).compose(request -> {
175176
request.setMethod(request().getMethod());
176177
request.setURI(request().getURI());
177178
// Firefox is known to send an unexpected connection header value
@@ -237,7 +238,7 @@ public Future<Void> sendResponse() {
237238
}
238239

239240
private Future<ProxyResponse> sendProxyRequest(ProxyRequest proxyRequest) {
240-
return resolveOrigin(proxyRequest.proxiedRequest()).compose(proxyRequest::send);
241+
return resolveOrigin(this).compose(proxyRequest::send);
241242
}
242243

243244
private Future<Void> sendProxyResponse(ProxyResponse response) {
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package io.vertx.tests;
2+
3+
import io.vertx.core.AbstractVerticle;
4+
import io.vertx.core.Future;
5+
import io.vertx.core.Promise;
6+
import io.vertx.core.http.*;
7+
import io.vertx.core.net.SocketAddress;
8+
import io.vertx.ext.unit.Async;
9+
import io.vertx.ext.unit.TestContext;
10+
import io.vertx.httpproxy.*;
11+
import org.junit.Test;
12+
13+
import java.io.Closeable;
14+
import java.util.HashMap;
15+
import java.util.Map;
16+
import java.util.concurrent.*;
17+
import java.util.function.Consumer;
18+
19+
public class ProxyContextTest extends ProxyTestBase {
20+
21+
private WebSocketClient wsClient;
22+
23+
public ProxyContextTest(ProxyOptions options) {
24+
super(options);
25+
}
26+
27+
@Override
28+
public void tearDown(TestContext context) {
29+
super.tearDown(context);
30+
wsClient = null;
31+
}
32+
33+
// same in TestBase, but allow to attach contexts
34+
private Closeable startProxy(Consumer<HttpProxy> config, Map<String, Object> attachments) {
35+
CompletableFuture<Closeable> res = new CompletableFuture<>();
36+
vertx.deployVerticle(new AbstractVerticle() {
37+
HttpClient proxyClient;
38+
HttpServer proxyServer;
39+
HttpProxy proxy;
40+
@Override
41+
public void start(Promise<Void> startFuture) {
42+
proxyClient = vertx.createHttpClient(new HttpClientOptions(clientOptions));
43+
proxyServer = vertx.createHttpServer(new HttpServerOptions(serverOptions));
44+
proxy = HttpProxy.reverseProxy(proxyOptions, proxyClient);
45+
config.accept(proxy);
46+
proxyServer.requestHandler(request -> {
47+
proxy.handle(request, attachments);
48+
});
49+
proxyServer.listen().onComplete(ar -> startFuture.handle(ar.mapEmpty()));
50+
}
51+
}).onComplete(ar -> {
52+
if (ar.succeeded()) {
53+
String id = ar.result();
54+
res.complete(() -> {
55+
CountDownLatch latch = new CountDownLatch(1);
56+
vertx.undeploy(id).onComplete(ar2 -> latch.countDown());
57+
try {
58+
latch.await(10, TimeUnit.SECONDS);
59+
} catch (InterruptedException e) {
60+
Thread.currentThread().interrupt();
61+
throw new AssertionError(e);
62+
}
63+
});
64+
} else {
65+
res.completeExceptionally(ar.cause());
66+
}
67+
});
68+
try {
69+
return res.get(10, TimeUnit.SECONDS);
70+
} catch (InterruptedException e) {
71+
Thread.currentThread().interrupt();
72+
throw new AssertionError(e);
73+
} catch (ExecutionException e) {
74+
throw new AssertionError(e.getMessage());
75+
} catch (TimeoutException e) {
76+
throw new AssertionError(e);
77+
}
78+
}
79+
80+
@Test
81+
public void testOriginSelector(TestContext ctx) {
82+
Async latch = ctx.async();
83+
SocketAddress backend = startHttpBackend(ctx, 8081, req -> {
84+
req.response().end("end");
85+
});
86+
startProxy(proxy -> {
87+
proxy.originSelector(context -> Future.succeededFuture(context.get("backend", SocketAddress.class)));
88+
}, new HashMap<>(Map.of("backend", backend)));
89+
vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
90+
.compose(HttpClientRequest::send)
91+
.onComplete(ctx.asyncAssertSuccess(resp -> {
92+
ctx.assertEquals(resp.statusCode(), 200);
93+
latch.complete();
94+
}));
95+
}
96+
97+
@Test
98+
public void testOriginSelectorWebSocket(TestContext ctx) {
99+
Async latch = ctx.async();
100+
SocketAddress backend = startHttpBackend(ctx, 8081, req -> {
101+
req.toWebSocket().onSuccess(ws -> {
102+
ws.handler(ws::write);
103+
});
104+
});
105+
startProxy(proxy -> {
106+
proxy.originSelector(context -> Future.succeededFuture(context.get("backend", SocketAddress.class)));
107+
}, new HashMap<>(Map.of("backend", backend)));
108+
wsClient = vertx.createWebSocketClient();
109+
wsClient.connect(8080, "localhost", "/")
110+
.onComplete(ctx.asyncAssertSuccess(ws -> {
111+
latch.complete();
112+
}));
113+
}
114+
115+
@Test
116+
public void testInterceptor(TestContext ctx) {
117+
Async latch = ctx.async();
118+
SocketAddress backend = startHttpBackend(ctx, 8081, req -> {
119+
if (!req.uri().equals("/new-uri")) {
120+
req.response().setStatusCode(404).end();
121+
}
122+
req.response().end("end");
123+
});
124+
startProxy(proxy -> {
125+
proxy.origin(backend)
126+
.addInterceptor(new ProxyInterceptor() {
127+
@Override
128+
public Future<ProxyResponse> handleProxyRequest(ProxyContext context) {
129+
context.request().setURI(context.get("uri", String.class));
130+
return context.sendRequest();
131+
}
132+
});
133+
}, new HashMap<>(Map.of("uri", "/new-uri")));
134+
vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
135+
.compose(HttpClientRequest::send)
136+
.onComplete(ctx.asyncAssertSuccess(resp -> {
137+
ctx.assertEquals(resp.statusCode(), 200);
138+
latch.complete();
139+
}));
140+
}
141+
}

0 commit comments

Comments
 (0)