Skip to content

Commit

Permalink
Configurable TcpClient for ReactorNettyTcpClient
Browse files Browse the repository at this point in the history
Issue: SPR-17523
  • Loading branch information
rstoyanchev committed Nov 21, 2018
1 parent fef0e21 commit 24848ec
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,19 +110,41 @@ public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec<P> codec)
this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
this.loopResources = LoopResources.create("tcp-client-loop");
this.poolResources = ConnectionProvider.elastic("tcp-client-pool");
this.codec = codec;

this.tcpClient = TcpClient.create(this.poolResources)
.host(host).port(port)
.runOn(this.loopResources, false)
.doOnConnected(conn -> this.channelGroup.add(conn.channel()));
}

/**
* A variant of {@link #ReactorNettyTcpClient(String, int, ReactorNettyCodec)}
* that still manages the lifecycle of the {@link TcpClient} and underlying
* resources, but allows for direct configuration of other properties of the
* client through a {@code Function<TcpClient, TcpClient>}.
* @param clientConfigurer the configurer function
* @param codec for encoding and decoding the input/output byte streams
* @since 5.1.3
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec
*/
public ReactorNettyTcpClient(Function<TcpClient, TcpClient> clientConfigurer, ReactorNettyCodec<P> codec) {
Assert.notNull(codec, "ReactorNettyCodec is required");

this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
this.loopResources = LoopResources.create("tcp-client-loop");
this.poolResources = ConnectionProvider.elastic("tcp-client-pool");
this.codec = codec;

this.tcpClient = clientConfigurer.apply(TcpClient
.create(this.poolResources)
.runOn(this.loopResources, false)
.doOnConnected(conn -> this.channelGroup.add(conn.channel())));
}

/**
* Constructor with an externally created {@link TcpClient} instance whose
* lifecycle is expected to be managed externally.
*
* @param tcpClient the TcpClient instance to use
* @param codec for encoding and decoding the input/output byte streams
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec
Expand Down
17 changes: 6 additions & 11 deletions src/docs/asciidoc/web/websocket.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1641,10 +1641,10 @@ to receive notifications when the "`system`" connection to the broker is lost an
re-established. For example, a Stock Quote service that broadcasts stock quotes can
stop trying to send messages when there is no active "`system`" connection.

By default, the STOMP broker relay always connects (and reconnects as needed if
connectivity is lost) to the same host and port. If you wish to supply multiple addresses,
By default, the STOMP broker relay always connects, and reconnects as needed if
connectivity is lost, to the same host and port. If you wish to supply multiple addresses,
on each attempt to connect, you can configure a supplier of addresses, instead of a
fixed host and port. The following example shows how to do so:
fixed host and port. The following example shows how to do that:

====
[source,java,indent=0]
Expand All @@ -1663,14 +1663,9 @@ public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
}
private ReactorNettyTcpClient<byte[]> createTcpClient() {
Consumer<ClientOptions.Builder<?>> builderConsumer = builder -> {
builder.connectAddress(()-> {
// Select address to connect to ...
});
};
return new ReactorNettyTcpClient<>(builderConsumer, new StompReactorNettyCodec());
return new ReactorNettyTcpClient<>(
client -> client.addressSupplier(() -> ... ),
new StompReactorNettyCodec());
}
}
----
Expand Down

0 comments on commit 24848ec

Please sign in to comment.