Skip to content

Commit 0b20ef5

Browse files
committed
Do not hang in poll if reactor is destroyed
If channel is dropped, receiver may still return EOF, and if channel is alive, receiver produces an error.
1 parent 717e99c commit 0b20ef5

File tree

12 files changed

+152
-53
lines changed

12 files changed

+152
-53
lines changed

src/channel.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ impl<T> Stream for Receiver<T> {
114114
match self.rx.get_ref().try_recv() {
115115
Ok(t) => Ok(Async::Ready(Some(t))),
116116
Err(TryRecvError::Empty) => {
117-
self.rx.need_read();
117+
try!(self.rx.need_read());
118118
Ok(Async::NotReady)
119119
}
120120
Err(TryRecvError::Disconnected) => Ok(Async::Ready(None)),

src/net/tcp.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ impl TcpListener {
7373
match self.io.get_ref().accept() {
7474
Err(e) => {
7575
if e.kind() == io::ErrorKind::WouldBlock {
76-
self.io.need_read();
76+
try!(self.io.need_read());
7777
}
7878
return Err(e)
7979
},
@@ -87,7 +87,7 @@ impl TcpListener {
8787
});
8888
tx.complete(res);
8989
Ok(())
90-
});
90+
}).expect("failed to spawn");
9191
self.pending_accept = Some(rx);
9292
// continue to polling the `rx` at the beginning of the loop
9393
}

src/net/udp/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ impl UdpSocket {
106106
match self.io.get_ref().send_to(buf, target) {
107107
Ok(Some(n)) => Ok(n),
108108
Ok(None) => {
109-
self.io.need_write();
109+
try!(self.io.need_write());
110110
Err(mio::would_block())
111111
}
112112
Err(e) => Err(e),
@@ -149,7 +149,7 @@ impl UdpSocket {
149149
match self.io.get_ref().recv_from(buf) {
150150
Ok(Some(n)) => Ok(n),
151151
Ok(None) => {
152-
self.io.need_read();
152+
try!(self.io.need_read());
153153
Err(mio::would_block())
154154
}
155155
Err(e) => Err(e),

src/reactor/channel.rs

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,25 +10,35 @@ use std::cell::Cell;
1010
use std::io;
1111
use std::marker;
1212
use std::sync::Arc;
13+
use std::sync::atomic::AtomicBool;
14+
use std::sync::atomic::Ordering;
1315

1416
use mio;
1517
use mio::channel::{ctl_pair, SenderCtl, ReceiverCtl};
1618

1719
use mpsc_queue::{Queue, PopResult};
1820

21+
struct Inner<T> {
22+
queue: Queue<T>,
23+
receiver_alive: AtomicBool,
24+
}
25+
1926
pub struct Sender<T> {
2027
ctl: SenderCtl,
21-
inner: Arc<Queue<T>>,
28+
inner: Arc<Inner<T>>,
2229
}
2330

2431
pub struct Receiver<T> {
2532
ctl: ReceiverCtl,
26-
inner: Arc<Queue<T>>,
33+
inner: Arc<Inner<T>>,
2734
_marker: marker::PhantomData<Cell<()>>, // this type is not Sync
2835
}
2936

3037
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
31-
let inner = Arc::new(Queue::new());
38+
let inner = Arc::new(Inner {
39+
queue: Queue::new(),
40+
receiver_alive: AtomicBool::new(true),
41+
});
3242
let (tx, rx) = ctl_pair();
3343

3444
let tx = Sender {
@@ -45,7 +55,10 @@ pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
4555

4656
impl<T> Sender<T> {
4757
pub fn send(&self, data: T) -> io::Result<()> {
48-
self.inner.push(data);
58+
if !self.inner.receiver_alive.load(Ordering::SeqCst) {
59+
return Err(io::Error::new(io::ErrorKind::Other, "receiver has been closed"));
60+
}
61+
self.inner.queue.push(data);
4962
self.ctl.inc()
5063
}
5164
}
@@ -57,7 +70,7 @@ impl<T> Receiver<T> {
5770
//
5871
// We, however, are the only thread with a `Receiver<T>` because this
5972
// type is not `Sync`. and we never handed out another instance.
60-
match unsafe { self.inner.pop() } {
73+
match unsafe { self.inner.queue.pop() } {
6174
PopResult::Data(t) => {
6275
try!(self.ctl.dec());
6376
Ok(Some(t))
@@ -85,6 +98,13 @@ impl<T> Receiver<T> {
8598
}
8699
}
87100

101+
// Close receiver, so further send operations would fail.
102+
// This function is used internally in `Core` and is not exposed as public API.
103+
pub fn close_receiver<T>(receiver: &Receiver<T>) {
104+
receiver.inner.as_ref().receiver_alive.store(false, Ordering::SeqCst);
105+
}
106+
107+
88108
// Just delegate everything to `self.ctl`
89109
impl<T> mio::Evented for Receiver<T> {
90110
fn register(&self,
@@ -108,6 +128,12 @@ impl<T> mio::Evented for Receiver<T> {
108128
}
109129
}
110130

131+
impl<T> Drop for Receiver<T> {
132+
fn drop(&mut self) {
133+
close_receiver(self);
134+
}
135+
}
136+
111137
impl<T> Clone for Sender<T> {
112138
fn clone(&self) -> Sender<T> {
113139
Sender {

src/reactor/interval.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,18 +66,19 @@ impl Stream for Interval {
6666
let now = Instant::now();
6767
if self.next <= now {
6868
self.next = next_interval(self.next, now, self.interval);
69-
self.token.reset_timeout(self.next, &self.handle);
69+
try!(self.token.reset_timeout(self.next, &self.handle));
7070
Ok(Async::Ready(Some(())))
7171
} else {
72-
self.token.update_timeout(&self.handle);
72+
try!(self.token.update_timeout(&self.handle));
7373
Ok(Async::NotReady)
7474
}
7575
}
7676
}
7777

7878
impl Drop for Interval {
7979
fn drop(&mut self) {
80-
self.token.cancel_timeout(&self.handle);
80+
// Ignore error
81+
drop(self.token.cancel_timeout(&self.handle));
8182
}
8283
}
8384

src/reactor/io_token.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ impl IoToken {
7171
/// receive further notifications it will need to call `schedule_read`
7272
/// again.
7373
///
74+
/// This function returns an error if reactor is destroyed.
75+
///
7476
/// > **Note**: This method should generally not be used directly, but
7577
/// > rather the `ReadinessStream` type should be used instead.
7678
///
@@ -82,8 +84,8 @@ impl IoToken {
8284
///
8385
/// This function will also panic if there is not a currently running future
8486
/// task.
85-
pub fn schedule_read(&self, handle: &Remote) {
86-
handle.send(Message::Schedule(self.token, task::park(), Direction::Read));
87+
pub fn schedule_read(&self, handle: &Remote) -> io::Result<()> {
88+
handle.send(Message::Schedule(self.token, task::park(), Direction::Read))
8789
}
8890

8991
/// Schedule the current future task to receive a notification when the
@@ -98,6 +100,8 @@ impl IoToken {
98100
/// receive further notifications it will need to call `schedule_write`
99101
/// again.
100102
///
103+
/// This function returns an error if reactor is destroyed.
104+
///
101105
/// > **Note**: This method should generally not be used directly, but
102106
/// > rather the `ReadinessStream` type should be used instead.
103107
///
@@ -109,8 +113,8 @@ impl IoToken {
109113
///
110114
/// This function will also panic if there is not a currently running future
111115
/// task.
112-
pub fn schedule_write(&self, handle: &Remote) {
113-
handle.send(Message::Schedule(self.token, task::park(), Direction::Write));
116+
pub fn schedule_write(&self, handle: &Remote) -> io::Result<()> {
117+
handle.send(Message::Schedule(self.token, task::park(), Direction::Write))
114118
}
115119

116120
/// Unregister all information associated with a token on an event loop,
@@ -127,6 +131,8 @@ impl IoToken {
127131
/// ensure that the callbacks are **not** invoked, so pending scheduled
128132
/// callbacks cannot be relied upon to get called.
129133
///
134+
/// This function returns an error if reactor is destroyed.
135+
///
130136
/// > **Note**: This method should generally not be used directly, but
131137
/// > rather the `ReadinessStream` type should be used instead.
132138
///
@@ -135,7 +141,7 @@ impl IoToken {
135141
/// This function will panic if the event loop this handle is associated
136142
/// with has gone away, or if there is an error communicating with the event
137143
/// loop.
138-
pub fn drop_source(&self, handle: &Remote) {
139-
handle.send(Message::DropSource(self.token));
144+
pub fn drop_source(&self, handle: &Remote) -> io::Result<()> {
145+
handle.send(Message::DropSource(self.token))
140146
}
141147
}

src/reactor/mod.rs

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,27 @@ impl Core {
412412
}
413413
}
414414

415+
impl Drop for Core {
416+
fn drop(&mut self) {
417+
// Close the receiver, so all schedule operations will be rejected.
418+
// Do it explicitly before unparking to avoid race condition.
419+
channel::close_receiver(&self.rx);
420+
421+
// Unpark all tasks.
422+
// It has no effect for tasks in this event loop,
423+
// however tasks in another executors get an error
424+
// when they do `poll` right after wakeup.
425+
for io in self.inner.borrow_mut().io_dispatch.iter_mut() {
426+
if let Some(ref mut reader) = io.reader {
427+
reader.unpark();
428+
}
429+
if let Some(ref mut writer) = io.writer {
430+
writer.unpark();
431+
}
432+
}
433+
}
434+
}
435+
415436
impl Inner {
416437
fn add_source(&mut self, source: &mio::Evented)
417438
-> io::Result<(Arc<AtomicUsize>, usize)> {
@@ -519,26 +540,20 @@ impl Inner {
519540
}
520541

521542
impl Remote {
522-
fn send(&self, msg: Message) {
543+
fn send(&self, msg: Message) -> io::Result<()> {
523544
self.with_loop(|lp| {
524545
match lp {
525546
Some(lp) => {
526547
// Need to execute all existing requests first, to ensure
527548
// that our message is processed "in order"
528549
lp.consume_queue();
529550
lp.notify(msg);
551+
Ok(())
530552
}
531553
None => {
532-
match self.tx.send(msg) {
533-
Ok(()) => {}
534-
535-
// This should only happen when there was an error
536-
// writing to the pipe to wake up the event loop,
537-
// hopefully that never happens
538-
Err(e) => {
539-
panic!("error sending message to event loop: {}", e)
540-
}
541-
}
554+
// May return an error if receiver is closed
555+
// or if there was an error writing to the pipe.
556+
self.tx.send(msg)
542557
}
543558
}
544559
})
@@ -569,15 +584,17 @@ impl Remote {
569584
///
570585
/// Note that while the closure, `F`, requires the `Send` bound as it might
571586
/// cross threads, the future `R` does not.
572-
pub fn spawn<F, R>(&self, f: F)
587+
///
588+
/// This function returns an error if reactor is destroyed.
589+
pub fn spawn<F, R>(&self, f: F) -> io::Result<()>
573590
where F: FnOnce(&Handle) -> R + Send + 'static,
574591
R: IntoFuture<Item=(), Error=()>,
575592
R::Future: 'static,
576593
{
577594
self.send(Message::Run(Box::new(|lp: &Core| {
578595
let f = f(&lp.handle());
579596
lp.inner.borrow_mut().spawn(Box::new(f.into_future()));
580-
})));
597+
})))
581598
}
582599
}
583600

0 commit comments

Comments
 (0)