Skip to content

Commit d62b636

Browse files
authored
feat: close connection correctly when client is dropped (#40)
1 parent a65a056 commit d62b636

File tree

2 files changed

+39
-20
lines changed

2 files changed

+39
-20
lines changed

rsocket/src/core/client.rs

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crate::Result;
2222
pub struct Client {
2323
closed: Arc<Notify>,
2424
socket: DuplexSocket,
25+
closing: mpsc::Sender<()>,
2526
}
2627

2728
pub struct ClientBuilder<T, C> {
@@ -171,28 +172,41 @@ where
171172

172173
// begin read loop
173174
let closer = self.closer.take();
174-
let closed = Arc::new(Notify::new());
175-
let closed_clone = closed.clone();
175+
let close_notify = Arc::new(Notify::new());
176+
let close_notify_clone = close_notify.clone();
177+
let (closing, mut closing_rx) = mpsc::channel::<()>(1);
176178

177179
let (read_tx, mut read_rx) = mpsc::unbounded_channel::<Frame>();
178180

181+
// read frames from stream, then writes into channel
179182
runtime::spawn(async move {
180-
while let Some(next) = stream.next().await {
181-
match next {
182-
Ok(frame) => {
183-
if let Err(e) = read_tx.send(frame) {
184-
error!("read next frame failed: {}", e);
185-
break;
183+
loop {
184+
tokio::select! {
185+
res = stream.next() => {
186+
match res {
187+
Some(next) => match next {
188+
Ok(frame) => {
189+
if let Err(e) = read_tx.send(frame) {
190+
error!("forward frame failed: {}", e);
191+
break;
192+
}
193+
}
194+
Err(e) => {
195+
error!("read frame failed: {}", e);
196+
break;
197+
}
198+
}
199+
None => break,
186200
}
187201
}
188-
Err(e) => {
189-
error!("read next frame failed: {}", e);
190-
break;
202+
_ = closing_rx.recv() => {
203+
break
191204
}
192205
}
193206
}
194207
});
195208

209+
// process frames
196210
runtime::spawn(async move {
197211
while let Some(next) = read_rx.recv().await {
198212
if let Err(e) = cloned_socket.dispatch(next, None).await {
@@ -205,12 +219,12 @@ where
205219
let close_frame = frame::Error::builder(0, 0)
206220
.set_code(ERR_CONN_CLOSED)
207221
.build();
208-
if let Err(_) = cloned_snd_tx.send(close_frame) {
209-
debug!("send close notify frame failed!");
222+
if let Err(e) = cloned_snd_tx.send(close_frame) {
223+
debug!("send close notify frame failed: {}", e);
210224
}
211225

212226
// notify client closed
213-
closed_clone.notify_one();
227+
close_notify_clone.notify_one();
214228

215229
// invoke on_close handler
216230
if let Some(mut invoke) = closer {
@@ -219,13 +233,18 @@ where
219233
});
220234

221235
socket.setup(setup).await;
222-
Ok(Client::new(socket, closed))
236+
237+
Ok(Client::new(socket, close_notify, closing))
223238
}
224239
}
225240

226241
impl Client {
227-
fn new(socket: DuplexSocket, closed: Arc<Notify>) -> Client {
228-
Client { socket, closed }
242+
fn new(socket: DuplexSocket, closed: Arc<Notify>, closing: mpsc::Sender<()>) -> Client {
243+
Client {
244+
socket,
245+
closed,
246+
closing,
247+
}
229248
}
230249

231250
pub async fn wait_for_close(self) {

rsocket/src/core/server.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,12 +133,12 @@ where
133133
match reader.next().await {
134134
Some(Ok(frame)) => {
135135
if let Err(e) = read_tx.send(frame) {
136-
error!("read next frame failed: {}", e);
136+
error!("forward frame failed: {}", e);
137137
break;
138138
}
139139
}
140140
Some(Err(e)) => {
141-
error!("read next frame failed: {}", e);
141+
error!("read frame failed: {}", e);
142142
break;
143143
}
144144
None => {
@@ -150,7 +150,7 @@ where
150150

151151
while let Some(frame) = read_rx.recv().await {
152152
if let Err(e) = socket.dispatch(frame, acceptor.as_ref().as_ref()).await {
153-
error!("dispatch incoming frame failed: {}", e);
153+
error!("dispatch frame failed: {}", e);
154154
break;
155155
}
156156
}

0 commit comments

Comments
 (0)