From 0b3a9a52b59775767a6894d8988a00668bcac2e2 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Fri, 31 Jan 2025 11:13:34 +0200 Subject: [PATCH] ensures connection is close on keepalive timeout Signed-off-by: Oleh Dokuka --- .../io/rsocket/core/RSocketRequester.java | 13 +- .../io/rsocket/integration/KeepaliveTest.java | 178 ++++++++++++++++++ 2 files changed, 188 insertions(+), 3 deletions(-) create mode 100644 rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java index 9e8d349bf..fb2198a45 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java @@ -309,9 +309,16 @@ private void handleMissingResponseProcessor(int streamId, FrameType type, ByteBu private void tryTerminateOnKeepAlive(KeepAliveSupport.KeepAlive keepAlive) { tryTerminate( - () -> - new ConnectionErrorException( - String.format("No keep-alive acks for %d ms", keepAlive.getTimeout().toMillis()))); + () -> { + ConnectionErrorException exception = new ConnectionErrorException(String.format( + "No keep-alive acks for %d ms", + keepAlive.getTimeout() + .toMillis())); + + + getDuplexConnection().dispose(); + return exception; + }); } private void tryShutdown(Throwable e) { diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java new file mode 100644 index 000000000..48257146d --- /dev/null +++ b/rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java @@ -0,0 +1,178 @@ +package io.rsocket.integration; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; + +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.core.RSocketClient; +import io.rsocket.core.RSocketConnector; +import io.rsocket.core.RSocketServer; +import io.rsocket.frame.decoder.PayloadDecoder; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.transport.netty.server.CloseableChannel; +import io.rsocket.transport.netty.server.TcpServerTransport; +import io.rsocket.util.DefaultPayload; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.netty.tcp.TcpClient; +import reactor.netty.tcp.TcpServer; +import reactor.test.StepVerifier; +import reactor.util.retry.Retry; +import reactor.util.retry.RetryBackoffSpec; + +public class KeepaliveTest { + + private static final Logger LOG = LoggerFactory.getLogger(KeepaliveTest.class); + private static final int PORT = 23200; + + @Test + void keepAliveTest() { + createServer().block(); + RSocketClient rsocketClient = createClient(); + + int expectedCount = 4; + AtomicBoolean sleepOnce = new AtomicBoolean(true); + StepVerifier.create( + Flux.range(0, expectedCount) + .delayElements(Duration.ofMillis(2000)) + .concatMap(i -> + rsocketClient.requestResponse(Mono.just(DefaultPayload.create(""))) + .doOnNext(__ -> { + if (sleepOnce.getAndSet(false)) { + try { + LOG.info("Sleeping..."); + Thread.sleep(1_000); + LOG.info("Waking up."); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }) + .log("id " + i) + .onErrorComplete() + )) + .expectSubscription() + .expectNextCount(expectedCount) + .verifyComplete(); + } + + @Test + void keepAliveTestLazy() { + createServer().block(); + Mono rsocketMono = createClientLazy(); + + int expectedCount = 4; + AtomicBoolean sleepOnce = new AtomicBoolean(true); + StepVerifier.create( + Flux.range(0, expectedCount) + .delayElements(Duration.ofMillis(2000)) + .concatMap(i -> + rsocketMono.flatMap(rsocket -> rsocket.requestResponse(DefaultPayload.create("")) + .doOnNext(__ -> { + if (sleepOnce.getAndSet(false)) { + try { + LOG.info("Sleeping..."); + Thread.sleep(1_000); + LOG.info("Waking up."); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }) + .log("id " + i) + .onErrorComplete() + ) + )) + .expectSubscription() + .expectNextCount(expectedCount) + .verifyComplete(); + } + + private static Mono createServer() { + LOG.info("Starting server at port {}", PORT); + + TcpServer tcpServer = TcpServer.create().host("localhost").port(PORT); + + return RSocketServer.create((setupPayload, rSocket) -> { + rSocket.onClose() + .doFirst(() -> LOG.info("Connected on server side.")) + .doOnTerminate(() -> LOG.info("Connection closed on server side.")) + .subscribe(); + + return Mono.just(new MyServerRsocket()); + }) + .payloadDecoder(PayloadDecoder.ZERO_COPY) + .bind(TcpServerTransport.create(tcpServer)) + .doOnNext(closeableChannel -> LOG.info("RSocket server started.")); + } + + private static RSocketClient createClient() { + LOG.info("Connecting...."); + + Function reconnectSpec = reason -> Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(10L)) + .doBeforeRetry(retrySignal -> LOG.info("Reconnecting. Reason: {}", reason)); + + Mono rsocketMono = RSocketConnector.create() + .fragment(16384) + .reconnect(reconnectSpec.apply("connector-close")) + .keepAlive(Duration.ofMillis(100L), Duration.ofMillis(900L)) + .connect(TcpClientTransport.create(TcpClient.create().host("localhost").port(PORT))); + + RSocketClient client = RSocketClient.from(rsocketMono); + + client + .source() + .doOnNext(r -> LOG.info("Got RSocket")) + .flatMap(RSocket::onClose) + .doOnError(err -> LOG.error("Error during onClose.", err)) + .retryWhen(reconnectSpec.apply("client-close")) + .doFirst(() -> LOG.info("Connected on client side.")) + .doOnTerminate(() -> LOG.info("Connection closed on client side.")) + .repeat() + .subscribe(); + + return client; + } + + + private static Mono createClientLazy() { + LOG.info("Connecting...."); + + Function reconnectSpec = reason -> Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(10L)) + .doBeforeRetry(retrySignal -> LOG.info("Reconnecting. Reason: {}", reason)); + + return RSocketConnector.create() + .fragment(16384) + .reconnect(reconnectSpec.apply("connector-close")) + .keepAlive(Duration.ofMillis(100L), Duration.ofMillis(900L)) + .connect(TcpClientTransport.create(TcpClient.create().host("localhost").port(PORT))); + +// RSocketClient client = RSocketClient.from(rsocketMono); + +// client +// .source() +// .doOnNext(r -> LOG.info("Got RSocket")) +// .flatMap(RSocket::onClose) +// .doOnError(err -> LOG.error("Error during onClose.", err)) +// .retryWhen(reconnectSpec.apply("client-close")) +// .doFirst(() -> LOG.info("Connected on client side.")) +// .doOnTerminate(() -> LOG.info("Connection closed on client side.")) +// .repeat() +// .subscribe(); + +// return client; + } + + public static class MyServerRsocket implements RSocket { + + @Override + public Mono requestResponse(Payload payload) { + return Mono.just("Pong").map(DefaultPayload::create); + } + } +} \ No newline at end of file