diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java index fb2198a45..26f052780 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java @@ -310,11 +310,9 @@ private void handleMissingResponseProcessor(int streamId, FrameType type, ByteBu private void tryTerminateOnKeepAlive(KeepAliveSupport.KeepAlive keepAlive) { tryTerminate( () -> { - ConnectionErrorException exception = new ConnectionErrorException(String.format( - "No keep-alive acks for %d ms", - keepAlive.getTimeout() - .toMillis())); - + ConnectionErrorException exception = + new ConnectionErrorException( + String.format("No keep-alive acks for %d ms", keepAlive.getTimeout().toMillis())); getDuplexConnection().dispose(); return exception; diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java index 48257146d..48d70f654 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java @@ -1,9 +1,5 @@ package io.rsocket.integration; -import java.time.Duration; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Function; - import io.rsocket.Payload; import io.rsocket.RSocket; import io.rsocket.core.RSocketClient; @@ -14,6 +10,9 @@ import io.rsocket.transport.netty.server.CloseableChannel; import io.rsocket.transport.netty.server.TcpServerTransport; import io.rsocket.util.DefaultPayload; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,152 +26,163 @@ public class KeepaliveTest { - private static final Logger LOG = LoggerFactory.getLogger(KeepaliveTest.class); - private static final int PORT = 23200; - - @Test - void keepAliveTest() { - createServer().block(); - RSocketClient rsocketClient = createClient(); - - int expectedCount = 4; - AtomicBoolean sleepOnce = new AtomicBoolean(true); - StepVerifier.create( - Flux.range(0, expectedCount) - .delayElements(Duration.ofMillis(2000)) - .concatMap(i -> - rsocketClient.requestResponse(Mono.just(DefaultPayload.create(""))) - .doOnNext(__ -> { - if (sleepOnce.getAndSet(false)) { - try { - LOG.info("Sleeping..."); - Thread.sleep(1_000); - LOG.info("Waking up."); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - }) - .log("id " + i) - .onErrorComplete() - )) - .expectSubscription() - .expectNextCount(expectedCount) - .verifyComplete(); - } - - @Test - void keepAliveTestLazy() { - createServer().block(); - Mono rsocketMono = createClientLazy(); - - int expectedCount = 4; - AtomicBoolean sleepOnce = new AtomicBoolean(true); - StepVerifier.create( - Flux.range(0, expectedCount) - .delayElements(Duration.ofMillis(2000)) - .concatMap(i -> - rsocketMono.flatMap(rsocket -> rsocket.requestResponse(DefaultPayload.create("")) - .doOnNext(__ -> { - if (sleepOnce.getAndSet(false)) { - try { - LOG.info("Sleeping..."); - Thread.sleep(1_000); - LOG.info("Waking up."); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - }) - .log("id " + i) - .onErrorComplete() - ) - )) - .expectSubscription() - .expectNextCount(expectedCount) - .verifyComplete(); - } - - private static Mono createServer() { - LOG.info("Starting server at port {}", PORT); - - TcpServer tcpServer = TcpServer.create().host("localhost").port(PORT); - - return RSocketServer.create((setupPayload, rSocket) -> { - rSocket.onClose() - .doFirst(() -> LOG.info("Connected on server side.")) - .doOnTerminate(() -> LOG.info("Connection closed on server side.")) - .subscribe(); - - return Mono.just(new MyServerRsocket()); - }) - .payloadDecoder(PayloadDecoder.ZERO_COPY) - .bind(TcpServerTransport.create(tcpServer)) - .doOnNext(closeableChannel -> LOG.info("RSocket server started.")); - } - - private static RSocketClient createClient() { - LOG.info("Connecting...."); - - Function reconnectSpec = reason -> Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(10L)) - .doBeforeRetry(retrySignal -> LOG.info("Reconnecting. Reason: {}", reason)); - - Mono rsocketMono = RSocketConnector.create() - .fragment(16384) - .reconnect(reconnectSpec.apply("connector-close")) - .keepAlive(Duration.ofMillis(100L), Duration.ofMillis(900L)) - .connect(TcpClientTransport.create(TcpClient.create().host("localhost").port(PORT))); - - RSocketClient client = RSocketClient.from(rsocketMono); - - client - .source() - .doOnNext(r -> LOG.info("Got RSocket")) - .flatMap(RSocket::onClose) - .doOnError(err -> LOG.error("Error during onClose.", err)) - .retryWhen(reconnectSpec.apply("client-close")) - .doFirst(() -> LOG.info("Connected on client side.")) - .doOnTerminate(() -> LOG.info("Connection closed on client side.")) - .repeat() - .subscribe(); - - return client; - } - - - private static Mono createClientLazy() { - LOG.info("Connecting...."); - - Function reconnectSpec = reason -> Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(10L)) - .doBeforeRetry(retrySignal -> LOG.info("Reconnecting. Reason: {}", reason)); - - return RSocketConnector.create() - .fragment(16384) - .reconnect(reconnectSpec.apply("connector-close")) - .keepAlive(Duration.ofMillis(100L), Duration.ofMillis(900L)) - .connect(TcpClientTransport.create(TcpClient.create().host("localhost").port(PORT))); - -// RSocketClient client = RSocketClient.from(rsocketMono); - -// client -// .source() -// .doOnNext(r -> LOG.info("Got RSocket")) -// .flatMap(RSocket::onClose) -// .doOnError(err -> LOG.error("Error during onClose.", err)) -// .retryWhen(reconnectSpec.apply("client-close")) -// .doFirst(() -> LOG.info("Connected on client side.")) -// .doOnTerminate(() -> LOG.info("Connection closed on client side.")) -// .repeat() -// .subscribe(); - -// return client; - } - - public static class MyServerRsocket implements RSocket { - - @Override - public Mono requestResponse(Payload payload) { - return Mono.just("Pong").map(DefaultPayload::create); - } - } -} \ No newline at end of file + private static final Logger LOG = LoggerFactory.getLogger(KeepaliveTest.class); + private static final int PORT = 23200; + + @Test + void keepAliveTest() { + createServer().block(); + RSocketClient rsocketClient = createClient(); + + int expectedCount = 4; + AtomicBoolean sleepOnce = new AtomicBoolean(true); + StepVerifier.create( + Flux.range(0, expectedCount) + .delayElements(Duration.ofMillis(2000)) + .concatMap( + i -> + rsocketClient + .requestResponse(Mono.just(DefaultPayload.create(""))) + .doOnNext( + __ -> { + if (sleepOnce.getAndSet(false)) { + try { + LOG.info("Sleeping..."); + Thread.sleep(1_000); + LOG.info("Waking up."); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }) + .log("id " + i) + .onErrorComplete())) + .expectSubscription() + .expectNextCount(expectedCount) + .verifyComplete(); + } + + @Test + void keepAliveTestLazy() { + createServer().block(); + Mono rsocketMono = createClientLazy(); + + int expectedCount = 4; + AtomicBoolean sleepOnce = new AtomicBoolean(true); + StepVerifier.create( + Flux.range(0, expectedCount) + .delayElements(Duration.ofMillis(2000)) + .concatMap( + i -> + rsocketMono.flatMap( + rsocket -> + rsocket + .requestResponse(DefaultPayload.create("")) + .doOnNext( + __ -> { + if (sleepOnce.getAndSet(false)) { + try { + LOG.info("Sleeping..."); + Thread.sleep(1_000); + LOG.info("Waking up."); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }) + .log("id " + i) + .onErrorComplete()))) + .expectSubscription() + .expectNextCount(expectedCount) + .verifyComplete(); + } + + private static Mono createServer() { + LOG.info("Starting server at port {}", PORT); + + TcpServer tcpServer = TcpServer.create().host("localhost").port(PORT); + + return RSocketServer.create( + (setupPayload, rSocket) -> { + rSocket + .onClose() + .doFirst(() -> LOG.info("Connected on server side.")) + .doOnTerminate(() -> LOG.info("Connection closed on server side.")) + .subscribe(); + + return Mono.just(new MyServerRsocket()); + }) + .payloadDecoder(PayloadDecoder.ZERO_COPY) + .bind(TcpServerTransport.create(tcpServer)) + .doOnNext(closeableChannel -> LOG.info("RSocket server started.")); + } + + private static RSocketClient createClient() { + LOG.info("Connecting...."); + + Function reconnectSpec = + reason -> + Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(10L)) + .doBeforeRetry(retrySignal -> LOG.info("Reconnecting. Reason: {}", reason)); + + Mono rsocketMono = + RSocketConnector.create() + .fragment(16384) + .reconnect(reconnectSpec.apply("connector-close")) + .keepAlive(Duration.ofMillis(100L), Duration.ofMillis(900L)) + .connect(TcpClientTransport.create(TcpClient.create().host("localhost").port(PORT))); + + RSocketClient client = RSocketClient.from(rsocketMono); + + client + .source() + .doOnNext(r -> LOG.info("Got RSocket")) + .flatMap(RSocket::onClose) + .doOnError(err -> LOG.error("Error during onClose.", err)) + .retryWhen(reconnectSpec.apply("client-close")) + .doFirst(() -> LOG.info("Connected on client side.")) + .doOnTerminate(() -> LOG.info("Connection closed on client side.")) + .repeat() + .subscribe(); + + return client; + } + + private static Mono createClientLazy() { + LOG.info("Connecting...."); + + Function reconnectSpec = + reason -> + Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(10L)) + .doBeforeRetry(retrySignal -> LOG.info("Reconnecting. Reason: {}", reason)); + + return RSocketConnector.create() + .fragment(16384) + .reconnect(reconnectSpec.apply("connector-close")) + .keepAlive(Duration.ofMillis(100L), Duration.ofMillis(900L)) + .connect(TcpClientTransport.create(TcpClient.create().host("localhost").port(PORT))); + + // RSocketClient client = RSocketClient.from(rsocketMono); + + // client + // .source() + // .doOnNext(r -> LOG.info("Got RSocket")) + // .flatMap(RSocket::onClose) + // .doOnError(err -> LOG.error("Error during onClose.", err)) + // .retryWhen(reconnectSpec.apply("client-close")) + // .doFirst(() -> LOG.info("Connected on client side.")) + // .doOnTerminate(() -> LOG.info("Connection closed on client side.")) + // .repeat() + // .subscribe(); + + // return client; + } + + public static class MyServerRsocket implements RSocket { + + @Override + public Mono requestResponse(Payload payload) { + return Mono.just("Pong").map(DefaultPayload::create); + } + } +} diff --git a/rsocket-transport-netty/src/test/resources/logback-test.xml b/rsocket-transport-netty/src/test/resources/logback-test.xml index b42db6df6..981d6d0b6 100644 --- a/rsocket-transport-netty/src/test/resources/logback-test.xml +++ b/rsocket-transport-netty/src/test/resources/logback-test.xml @@ -27,7 +27,6 @@ -