Skip to content

Commit c67e96d

Browse files
authored
feat: implement client-side keepalive (#25)
1 parent 06a22ab commit c67e96d

File tree

6 files changed

+63
-18
lines changed

6 files changed

+63
-18
lines changed

examples/cli.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1+
#[macro_use]
2+
extern crate log;
3+
14
use rsocket_rust::prelude::*;
5+
use rsocket_rust::Result;
26
use rsocket_rust_transport_tcp::TcpClientTransport;
3-
use std::error::Error;
47

58
#[tokio::main]
6-
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
9+
async fn main() -> Result<()> {
10+
env_logger::builder().format_timestamp_millis().init();
711
let client = RSocketFactory::connect()
812
.transport(TcpClientTransport::from("127.0.0.1:7878"))
913
.acceptor(Box::new(|| {
@@ -15,8 +19,11 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
1519
.expect("Connect failed!");
1620

1721
let req = Payload::builder().set_data_utf8("Ping!").build();
18-
let res = client.request_response(req).await.expect("Requet failed!");
19-
println!("request success: response={:?}", res);
22+
23+
match client.request_response(req).await {
24+
Ok(res) => info!("{:?}", res),
25+
Err(e) => error!("{}", e),
26+
}
2027

2128
Ok(())
2229
}

examples/echo.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,13 @@ extern crate log;
44
use clap::{App, Arg, SubCommand};
55
use rsocket_rust::prelude::*;
66
use rsocket_rust::transport::Connection;
7+
use rsocket_rust::Result;
78
use rsocket_rust_transport_tcp::{
89
TcpClientTransport, TcpServerTransport, UnixClientTransport, UnixServerTransport,
910
};
1011
use rsocket_rust_transport_websocket::{WebsocketClientTransport, WebsocketServerTransport};
1112
use std::fs;
1213

13-
type Result<T> = rsocket_rust::Result<T>;
14-
1514
enum RequestMode {
1615
FNF,
1716
REQUEST,

examples/proxy.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,9 @@ extern crate log;
33

44
use futures::executor::block_on;
55
use rsocket_rust::prelude::*;
6+
use rsocket_rust::Result;
67
use rsocket_rust_transport_tcp::*;
78

8-
type Result<T> = rsocket_rust::Result<T>;
9-
109
#[tokio::main]
1110
async fn main() -> Result<()> {
1211
env_logger::builder().format_timestamp_millis().init();

examples/qps.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ extern crate log;
44
use clap::{App, Arg};
55
use rsocket_rust::prelude::*;
66
use rsocket_rust::transport::{Connection, Transport};
7+
use rsocket_rust::Result;
78
use rsocket_rust_transport_tcp::{TcpClientTransport, UnixClientTransport};
89
use rsocket_rust_transport_websocket::WebsocketClientTransport;
9-
use std::error::Error;
1010
use std::sync::{
1111
atomic::{AtomicU32, Ordering},
1212
Arc,
@@ -20,7 +20,7 @@ async fn connect<A, B>(
2020
count: u32,
2121
payload_size: usize,
2222
notify: Arc<Notify>,
23-
) -> Result<(), Box<dyn Error + Send + Sync>>
23+
) -> Result<()>
2424
where
2525
A: Send + Sync + Transport<Conn = B> + 'static,
2626
B: Send + Sync + Connection + 'static,
@@ -57,7 +57,7 @@ where
5757
}
5858

5959
#[tokio::main]
60-
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
60+
async fn main() -> Result<()> {
6161
env_logger::builder().format_timestamp_millis().init();
6262

6363
let cli = App::new("echo")

rsocket/src/core/client.rs

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::error::RSocketError;
1+
use crate::error::{RSocketError, ERR_CONN_CLOSED};
22
use crate::frame::{self, Frame};
33
use crate::payload::{Payload, SetupPayload, SetupPayloadBuilder};
44
use crate::runtime;
@@ -7,7 +7,7 @@ use crate::transport::{
77
self, Acceptor, Connection, DuplexSocket, Reader, Splitter, Transport, Writer,
88
};
99
use crate::Result;
10-
use futures::{future, FutureExt, SinkExt, StreamExt};
10+
use futures::{future, select, FutureExt, SinkExt, StreamExt};
1111
use std::error::Error;
1212
use std::net::SocketAddr;
1313
use std::pin::Pin;
@@ -115,6 +115,7 @@ where
115115
};
116116

117117
let (snd_tx, mut snd_rx) = mpsc::channel::<Frame>(super::CHANNEL_SIZE);
118+
let cloned_snd_tx = snd_tx.clone();
118119
let mut socket = DuplexSocket::new(1, snd_tx, splitter).await;
119120

120121
let mut cloned_socket = socket.clone();
@@ -126,15 +127,40 @@ where
126127
let conn = tp.connect().await?;
127128
let (mut sink, mut stream) = conn.split();
128129

130+
let setup = self.setup.build();
131+
132+
// begin write loop
133+
let tick_period = setup.keepalive_interval();
129134
runtime::spawn(async move {
130-
while let Some(frame) = snd_rx.next().await {
131-
if let Err(e) = (&mut sink).write(frame).await {
132-
error!("write frame failed: {}", e);
133-
break;
135+
loop {
136+
// send keepalive if timeout
137+
match tokio::time::timeout(tick_period, snd_rx.next()).await {
138+
Ok(Some(frame)) => {
139+
if let frame::Body::Error(e) = frame.get_body_ref() {
140+
if e.get_code() == ERR_CONN_CLOSED {
141+
break;
142+
}
143+
}
144+
if let Err(e) = (&mut sink).write(frame).await {
145+
error!("write frame failed: {}", e);
146+
break;
147+
}
148+
}
149+
Ok(None) => break,
150+
Err(_) => {
151+
// keepalive
152+
let keepalive_frame =
153+
frame::Keepalive::builder(0, Frame::FLAG_RESPOND).build();
154+
if let Err(e) = (&mut sink).write(keepalive_frame).await {
155+
error!("write frame failed: {}", e);
156+
break;
157+
}
158+
}
134159
}
135160
}
136161
});
137162

163+
// begin read loop
138164
let closer = self.closer.take();
139165
runtime::spawn(async move {
140166
while let Some(next) = stream.read().await {
@@ -151,12 +177,22 @@ where
151177
}
152178
}
153179
}
180+
181+
// workaround: send a notify frame that the connection has been closed.
182+
let close_frame = frame::Error::builder(0, 0)
183+
.set_code(ERR_CONN_CLOSED)
184+
.build();
185+
if let Err(_) = cloned_snd_tx.send(close_frame).await {
186+
debug!("send close notify frame failed!");
187+
}
188+
189+
// invoke on_close handler
154190
if let Some(mut invoke) = closer {
155191
invoke();
156192
}
157193
});
158194

159-
socket.setup(self.setup.build()).await;
195+
socket.setup(setup).await;
160196
Ok(Client::from(socket))
161197
}
162198
}

rsocket/src/frame/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,10 @@ impl Frame {
187187
self.body
188188
}
189189

190+
pub fn get_body_ref(&self) -> &Body {
191+
&self.body
192+
}
193+
190194
pub fn get_flag(&self) -> u16 {
191195
self.flag
192196
}

0 commit comments

Comments
 (0)