Skip to content

Commit 697fdfd

Browse files
storage: adapt Netty Reactor HTTP client as GCS storage client
Notes: * Uses direct memory buffers. Recommend to run Diskless with "-Dio.netty.maxDirectMemory=0" to have the Netty cleaner running. * Has static 96 max connections pool. * Has static 32 worker thread pool. * "SO_KEEPALIVE" set for sockets and keep alive header for HTTP. * Compression disabled, producer compression recommended and compressing again likely not beneficial. * GCS client handles redirects, Netty Reactor client following disabled. * Can use static BoringSSL library to offload SSL to OpenSSL. * Zero-copy until the response handling where direct memory buffer bytes are copied to heap manager byte array.
1 parent 3eae17d commit 697fdfd

File tree

6 files changed

+305
-1
lines changed

6 files changed

+305
-1
lines changed

build.gradle

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2442,6 +2442,13 @@ project(':storage:inkless') {
24422442
exclude group: "com.fasterxml.jackson.core"
24432443
exclude group: "org.slf4j"
24442444
}
2445+
2446+
implementation(libs.nettyReactorCore)
2447+
implementation(libs.nettyReactorHttp)
2448+
implementation(libs.nettyTcNativeBoringSSLStatic)
2449+
implementation(libs.nettyTcNativeBoringSSLStaticLinuxAarch64)
2450+
implementation(libs.nettyTcNativeBoringSSLStaticLinuxX86_64)
2451+
24452452
implementation(libs.gcsSdk) {
24462453
exclude group: 'com.google.errorprone', module: 'error_prone_annotations'
24472454
exclude group: 'org.checkerframework', module: 'checker-qual'

gradle/dependencies.gradle

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,9 @@ versions += [
119119
mavenArtifact: "3.9.6",
120120
metrics: "2.2.0",
121121
mockito: "5.14.2",
122+
// Required to be compatible with BoringSSL version
123+
nettyReactor: "1.1.22",
124+
nettyTcNativeBoringSSL: "2.0.65.Final",
122125
opentelemetryProto: "1.3.2-alpha",
123126
postgresql: "42.7.4",
124127
protobuf: "3.25.5", // a dependency of opentelemetryProto
@@ -238,6 +241,11 @@ libs += [
238241
metrics: "com.yammer.metrics:metrics-core:$versions.metrics",
239242
mockitoCore: "org.mockito:mockito-core:$versions.mockito",
240243
mockitoJunitJupiter: "org.mockito:mockito-junit-jupiter:$versions.mockito",
244+
nettyReactorCore: "io.projectreactor.netty:reactor-netty-core:$versions.nettyReactor",
245+
nettyReactorHttp: "io.projectreactor.netty:reactor-netty-http:$versions.nettyReactor",
246+
nettyTcNativeBoringSSLStatic: "io.netty:netty-tcnative-boringssl-static:$versions.nettyTcNativeBoringSSL",
247+
nettyTcNativeBoringSSLStaticLinuxAarch64: "io.netty:netty-tcnative-boringssl-static:$versions.nettyTcNativeBoringSSL:linux-aarch_64",
248+
nettyTcNativeBoringSSLStaticLinuxX86_64: "io.netty:netty-tcnative-boringssl-static:$versions.nettyTcNativeBoringSSL:linux-x86_64",
241249
pcollections: "org.pcollections:pcollections:$versions.pcollections",
242250
opentelemetryProto: "io.opentelemetry.proto:opentelemetry-proto:$versions.opentelemetryProto",
243251
postgresql: "org.postgresql:postgresql:$versions.postgresql",

storage/inkless/src/main/java/io/aiven/inkless/storage_backend/gcs/GcsStorage.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package io.aiven.inkless.storage_backend.gcs;
2020

21+
import com.google.api.client.http.HttpTransport;
2122
import com.google.cloud.BaseServiceException;
2223
import com.google.cloud.ReadChannel;
2324
import com.google.cloud.http.HttpTransportOptions;
@@ -34,6 +35,7 @@
3435
import java.nio.channels.ReadableByteChannel;
3536
import java.util.Map;
3637
import java.util.Objects;
38+
import java.util.Optional;
3739
import java.util.Set;
3840
import java.util.stream.Collectors;
3941

@@ -43,6 +45,8 @@
4345
import io.aiven.inkless.storage_backend.common.KeyNotFoundException;
4446
import io.aiven.inkless.storage_backend.common.StorageBackend;
4547
import io.aiven.inkless.storage_backend.common.StorageBackendException;
48+
import io.aiven.inkless.storage_backend.gcs.nettyhttpclient.ReactorNettyTransport;
49+
4650

4751
@CoverageIgnore // tested on integration level
4852
public class GcsStorage implements StorageBackend {
@@ -60,8 +64,12 @@ public class GcsStorage implements StorageBackend {
6064
public void configure(final Map<String, ?> configs) {
6165
final GcsStorageConfig config = new GcsStorageConfig(configs);
6266
this.bucketName = config.bucketName();
67+
final String gcsUrl = Optional.ofNullable(config.endpointUrl()).orElse("https://www.googleapis.com");
6368

64-
final HttpTransportOptions.Builder httpTransportOptionsBuilder = HttpTransportOptions.newBuilder();
69+
final HttpTransportOptions.Builder httpTransportOptionsBuilder = HttpTransportOptions.newBuilder()
70+
.setHttpTransportFactory(() -> {
71+
return (HttpTransport) ReactorNettyTransport.get(gcsUrl);
72+
});
6573

6674
final StorageOptions.Builder builder = StorageOptions.newBuilder()
6775
.setCredentials(config.credentials())
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package io.aiven.inkless.storage_backend.gcs.nettyhttpclient;
2+
3+
import com.google.api.client.http.LowLevelHttpRequest;
4+
import com.google.api.client.http.LowLevelHttpResponse;
5+
6+
import java.io.IOException;
7+
import java.net.URI;
8+
9+
import io.netty.buffer.ByteBuf;
10+
import io.netty.buffer.ByteBufOutputStream;
11+
import io.netty.handler.codec.http.DefaultHttpHeaders;
12+
import io.netty.handler.codec.http.HttpHeaderNames;
13+
import io.netty.handler.codec.http.HttpHeaders;
14+
import reactor.core.publisher.Flux;
15+
import reactor.netty.ByteBufFlux;
16+
import reactor.netty.http.client.HttpClient;
17+
18+
import static io.netty.buffer.Unpooled.wrappedBuffer;
19+
20+
public class ReactorNettyRequest extends LowLevelHttpRequest {
21+
22+
private HttpClient client;
23+
private final String method;
24+
private final URI uri;
25+
private final HttpHeaders headers = new DefaultHttpHeaders();
26+
27+
public ReactorNettyRequest(HttpClient client, String method, String url) {
28+
this.client = client;
29+
this.method = method;
30+
this.uri = URI.create(url);
31+
}
32+
33+
@Override
34+
public void addHeader(String name, String value) {
35+
headers.add(name, value);
36+
}
37+
38+
@Override
39+
public LowLevelHttpResponse execute() throws IOException {
40+
final HttpClient.ResponseReceiver<?> receiver;
41+
final ByteBuf buffer;
42+
switch (method) {
43+
case "POST":
44+
case "PUT":
45+
case "DELETE":
46+
if (getStreamingContent() != null) {
47+
long contentLength = getContentLength();
48+
buffer = wrappedBuffer(new byte[Math.toIntExact(contentLength)]);
49+
buffer.resetWriterIndex();
50+
try (ByteBufOutputStream out = new ByteBufOutputStream(buffer)) {
51+
getStreamingContent().writeTo(out);
52+
}
53+
54+
String contentType = getContentType();
55+
String contentEncoding = getContentEncoding();
56+
57+
if (contentType != null) {
58+
headers.set(HttpHeaderNames.CONTENT_TYPE.toString(), contentType);
59+
}
60+
if (contentEncoding != null) {
61+
headers.set(HttpHeaderNames.CONTENT_ENCODING.toString(), contentEncoding);
62+
}
63+
if (contentLength >= 0) {
64+
headers.set(HttpHeaderNames.CONTENT_LENGTH.toString(), Math.toIntExact(contentLength));
65+
}
66+
} else {
67+
buffer = wrappedBuffer(new byte[0]);
68+
}
69+
70+
client = client.headers(cons -> {
71+
cons.add(headers);
72+
});
73+
74+
HttpClient.RequestSender sender;
75+
switch (method) {
76+
case "POST":
77+
sender = client.post();
78+
break;
79+
case "PUT":
80+
sender = client.put();
81+
break;
82+
case "DELETE":
83+
sender = client.delete();
84+
break;
85+
default:
86+
throw new RuntimeException("unknown method");
87+
}
88+
receiver = sender.uri(uri).send(ByteBufFlux.fromInbound(Flux.just(buffer)));
89+
break;
90+
case "GET":
91+
client = client.headers(cons -> {
92+
cons.add(headers);
93+
});
94+
receiver = client.get().uri(uri);
95+
break;
96+
default:
97+
throw new RuntimeException("Unsupported method " + method);
98+
}
99+
100+
final ReactorNettyResponse block = receiver.responseSingle((response, content) -> {
101+
return content.map(bb -> {
102+
// Buffer is directly allocated, copy bytes to heap and
103+
// allow the Reactor framework to release the direct buffer
104+
// and free the memory for reuse.
105+
final byte[] clone = new byte[bb.readableBytes()];
106+
bb.readBytes(clone);
107+
return new ReactorNettyResponse(response, clone);
108+
}).defaultIfEmpty(new ReactorNettyResponse(response, null));
109+
}).block();
110+
if (block == null) {
111+
throw new RuntimeException("No response received.");
112+
}
113+
return block;
114+
}
115+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package io.aiven.inkless.storage_backend.gcs.nettyhttpclient;
2+
3+
import com.google.api.client.http.LowLevelHttpResponse;
4+
5+
import java.io.ByteArrayInputStream;
6+
import java.io.IOException;
7+
import java.io.InputStream;
8+
9+
import io.netty.handler.codec.http.HttpHeaderNames;
10+
import reactor.netty.http.client.HttpClientResponse;
11+
12+
public class ReactorNettyResponse extends LowLevelHttpResponse {
13+
14+
private final HttpClientResponse response;
15+
private final byte[] content;
16+
17+
public ReactorNettyResponse(HttpClientResponse response, byte[] content) {
18+
this.response = response;
19+
if (content == null) {
20+
this.content = new byte[0];
21+
} else {
22+
this.content = content;
23+
}
24+
}
25+
26+
@Override
27+
public InputStream getContent() throws IOException {
28+
return new ByteArrayInputStream(content);
29+
}
30+
31+
@Override
32+
public String getContentEncoding() throws IOException {
33+
return response.responseHeaders().get(HttpHeaderNames.CONTENT_ENCODING);
34+
}
35+
36+
@Override
37+
public long getContentLength() throws IOException {
38+
return content.length;
39+
}
40+
41+
@Override
42+
public String getContentType() throws IOException {
43+
return response.responseHeaders().get(HttpHeaderNames.CONTENT_TYPE);
44+
}
45+
46+
@Override
47+
public String getStatusLine() throws IOException {
48+
final StringBuilder buf = new StringBuilder();
49+
50+
buf.append(response.version()).append(" ").append(getStatusCode()).append(" ");
51+
if (this.getReasonPhrase() != null) {
52+
buf.append(getReasonPhrase());
53+
}
54+
return buf.toString();
55+
}
56+
57+
@Override
58+
public int getStatusCode() throws IOException {
59+
return response.status().code();
60+
}
61+
62+
@Override
63+
public String getReasonPhrase() throws IOException {
64+
return response.status().reasonPhrase();
65+
}
66+
67+
@Override
68+
public int getHeaderCount() throws IOException {
69+
return response.responseHeaders().size();
70+
}
71+
72+
@Override
73+
public String getHeaderName(int index) throws IOException {
74+
return response.responseHeaders().entries().get(index).getKey();
75+
}
76+
77+
@Override
78+
public String getHeaderValue(int index) throws IOException {
79+
return response.responseHeaders().entries().get(index).getValue();
80+
}
81+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package io.aiven.inkless.storage_backend.gcs.nettyhttpclient;
2+
3+
import com.google.api.client.http.HttpTransport;
4+
import com.google.api.client.http.LowLevelHttpRequest;
5+
6+
import java.io.IOException;
7+
import java.net.URI;
8+
9+
import io.netty.channel.ChannelOption;
10+
import io.netty.handler.codec.http.HttpHeaderNames;
11+
import reactor.netty.http.client.HttpClient;
12+
import reactor.netty.resources.ConnectionProvider;
13+
import reactor.netty.resources.LoopResources;
14+
15+
public final class ReactorNettyTransport extends HttpTransport {
16+
17+
private static ReactorNettyTransport instance;
18+
19+
private final URI uri;
20+
private final HttpClient client;
21+
private final LoopResources clientEventLoopGroup;
22+
private final ConnectionProvider connectionProvider;
23+
private boolean shutdown = false;
24+
25+
private ReactorNettyTransport(final String endpoint) {
26+
uri = URI.create(endpoint);
27+
final int port;
28+
if (uri.getPort() == -1) {
29+
// No port defined, select by the scheme
30+
port = "https://".equals(uri.getScheme()) ? 80 : 443;
31+
} else {
32+
port = uri.getPort();
33+
}
34+
35+
connectionProvider = ConnectionProvider.builder("custom")
36+
.maxConnections(96)
37+
.build();
38+
39+
clientEventLoopGroup = LoopResources.create("gcs-netty-http", 32, true);
40+
41+
client = HttpClient.create(connectionProvider)
42+
.runOn(clientEventLoopGroup)
43+
.host(uri.getHost())
44+
.port(uri.getPort())
45+
.keepAlive(true)
46+
.followRedirect(false)
47+
.compress(false)
48+
.option(ChannelOption.SO_KEEPALIVE, true)
49+
.headers(cons -> cons.set(HttpHeaderNames.HOST, String.format("%s:%s", uri.getHost(), port)));
50+
client.warmup().block();
51+
}
52+
53+
@Override
54+
protected LowLevelHttpRequest buildRequest(String method, String url) throws IOException {
55+
return new ReactorNettyRequest(client, method, url);
56+
}
57+
58+
@Override
59+
public void shutdown() throws IOException {
60+
if (!shutdown) {
61+
connectionProvider.disposeLater().block();
62+
clientEventLoopGroup.disposeLater().block();
63+
shutdown = true;
64+
}
65+
}
66+
67+
/**
68+
* Returns whether the transport is shutdown or not.
69+
*
70+
* @return true if the transport is shutdown.
71+
* @since 1.44.0
72+
*/
73+
@Override
74+
public boolean isShutdown() {
75+
return shutdown;
76+
}
77+
78+
public synchronized static ReactorNettyTransport get(String gcsUri) {
79+
if (instance == null) {
80+
instance = new ReactorNettyTransport(gcsUri);
81+
}
82+
return instance;
83+
}
84+
85+
}

0 commit comments

Comments
 (0)