Skip to content

Commit 3440120

Browse files
committed
Do not hang in poll if event loop is destroyed
`Loop` destructor marks all loop handles dead, so any further `poll` requests fail (they currently panic, but should probably return an error). Also `Loop` destructor unparks all tasks waiting for IO on self. It has no effect on tasks bound to self event loop, but tasks running on different executors call `poll` right afterwards, so they immediately fail instead of hanging.
1 parent 03ac1ea commit 3440120

File tree

11 files changed

+107
-38
lines changed

11 files changed

+107
-38
lines changed

src/channel.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ impl<T> Stream for Receiver<T> {
9191
match self.rx.get_ref().try_recv() {
9292
Ok(t) => Ok(Async::Ready(Some(t))),
9393
Err(TryRecvError::Empty) => {
94-
self.rx.need_read();
94+
try!(self.rx.need_read());
9595
Ok(Async::NotReady)
9696
}
9797
Err(TryRecvError::Disconnected) => Ok(Async::Ready(None)),

src/event_loop/mod.rs

+44-11
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::io::{self, ErrorKind};
33
use std::mem;
44
use std::rc::Rc;
55
use std::sync::Arc;
6-
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
6+
use std::sync::atomic::{AtomicUsize, AtomicBool, ATOMIC_USIZE_INIT, Ordering};
77
use std::time::{Instant, Duration};
88

99
use futures::{Future, Poll, IntoFuture, Async};
@@ -37,7 +37,7 @@ pub struct Loop {
3737
id: usize,
3838
io: mio::Poll,
3939
events: mio::Events,
40-
tx: Arc<Sender<Message>>,
40+
inner: Arc<LoopHandleInner>,
4141
rx: Receiver<Message>,
4242
io_dispatch: RefCell<Slab<ScheduledIo, usize>>,
4343
task_dispatch: RefCell<Slab<ScheduledTask, usize>>,
@@ -62,6 +62,31 @@ pub struct Loop {
6262
timeouts: RefCell<Slab<(Timeout, TimeoutState), usize>>,
6363
}
6464

65+
impl Drop for Loop {
66+
fn drop(&mut self) {
67+
// mark event loop as dead, so all schedule operations will be rejected
68+
self.inner.alive.store(false, Ordering::SeqCst);
69+
70+
// Unpark all tasks.
71+
// It has no effect for tasks in this event loop,
72+
// however tasks in another executors get an error
73+
// when they do `poll` right after wakeup.
74+
for io in self.io_dispatch.borrow_mut().iter_mut() {
75+
if let Some(ref mut reader) = io.reader {
76+
reader.unpark();
77+
}
78+
if let Some(ref mut writer) = io.writer {
79+
writer.unpark();
80+
}
81+
}
82+
}
83+
}
84+
85+
struct LoopHandleInner {
86+
tx: Sender<Message>,
87+
alive: AtomicBool,
88+
}
89+
6590
/// Handle to an event loop, used to construct I/O objects, send messages, and
6691
/// otherwise interact indirectly with the event loop itself.
6792
///
@@ -70,7 +95,7 @@ pub struct Loop {
7095
#[derive(Clone)]
7196
pub struct LoopHandle {
7297
id: usize,
73-
tx: Arc<Sender<Message>>,
98+
inner: Arc<LoopHandleInner>,
7499
}
75100

76101
/// A non-sendable handle to an event loop, useful for manufacturing instances
@@ -145,7 +170,10 @@ impl Loop {
145170
id: NEXT_LOOP_ID.fetch_add(1, Ordering::Relaxed),
146171
io: io,
147172
events: mio::Events::with_capacity(1024),
148-
tx: Arc::new(tx),
173+
inner: Arc::new(LoopHandleInner {
174+
tx: tx,
175+
alive: AtomicBool::new(true),
176+
}),
149177
rx: rx,
150178
io_dispatch: RefCell::new(Slab::with_capacity(SLAB_CAPACITY)),
151179
task_dispatch: RefCell::new(Slab::with_capacity(SLAB_CAPACITY)),
@@ -169,7 +197,7 @@ impl Loop {
169197
pub fn handle(&self) -> LoopHandle {
170198
LoopHandle {
171199
id: self.id,
172-
tx: self.tx.clone(),
200+
inner: self.inner.clone(),
173201
}
174202
}
175203

@@ -506,18 +534,23 @@ impl Loop {
506534
}
507535

508536
impl LoopHandle {
509-
fn send(&self, msg: Message) {
537+
fn send(&self, msg: Message) -> io::Result<()> {
538+
if !self.inner.alive.load(Ordering::SeqCst) {
539+
return Err(io::Error::new(io::ErrorKind::Other, "loop has been dropped"));
540+
}
541+
510542
self.with_loop(|lp| {
511543
match lp {
512544
Some(lp) => {
513545
// Need to execute all existing requests first, to ensure
514546
// that our message is processed "in order"
515547
lp.consume_queue();
516548
lp.notify(msg);
549+
Ok(())
517550
}
518551
None => {
519-
match self.tx.send(msg) {
520-
Ok(()) => {}
552+
match self.inner.tx.send(msg) {
553+
Ok(()) => Ok(()),
521554

522555
// This should only happen when there was an error
523556
// writing to the pipe to wake up the event loop,
@@ -555,15 +588,15 @@ impl LoopHandle {
555588
///
556589
/// Note that while the closure, `F`, requires the `Send` bound as it might
557590
/// cross threads, the future `R` does not.
558-
pub fn spawn<F, R>(&self, f: F)
591+
pub fn spawn<F, R>(&self, f: F) -> io::Result<()>
559592
where F: FnOnce(&LoopPin) -> R + Send + 'static,
560593
R: IntoFuture<Item=(), Error=()>,
561594
R::Future: 'static,
562595
{
563596
self.send(Message::Run(Box::new(|lp: &Loop| {
564597
let f = f(&lp.pin());
565598
lp.spawn(Box::new(f.into_future()));
566-
})));
599+
})))
567600
}
568601
}
569602

@@ -626,7 +659,7 @@ impl<T, U> LoopFuture<T, U>
626659
task.unpark();
627660
});
628661
self.result = Some((result.clone(), token));
629-
self.loop_handle.send(g(data.take().unwrap(), result));
662+
try!(self.loop_handle.send(g(data.take().unwrap(), result)));
630663
Ok(Async::NotReady)
631664
}
632665
}

src/event_loop/source.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ impl LoopHandle {
7474
///
7575
/// This function will also panic if there is not a currently running future
7676
/// task.
77-
pub fn schedule_read(&self, tok: &IoToken) {
78-
self.send(Message::Schedule(tok.token, task::park(), Direction::Read));
77+
pub fn schedule_read(&self, tok: &IoToken) -> io::Result<()> {
78+
self.send(Message::Schedule(tok.token, task::park(), Direction::Read))
7979
}
8080

8181
/// Schedule the current future task to receive a notification when the
@@ -101,8 +101,8 @@ impl LoopHandle {
101101
///
102102
/// This function will also panic if there is not a currently running future
103103
/// task.
104-
pub fn schedule_write(&self, tok: &IoToken) {
105-
self.send(Message::Schedule(tok.token, task::park(), Direction::Write));
104+
pub fn schedule_write(&self, tok: &IoToken) -> io::Result<()> {
105+
self.send(Message::Schedule(tok.token, task::park(), Direction::Write))
106106
}
107107

108108
/// Unregister all information associated with a token on an event loop,
@@ -127,8 +127,8 @@ impl LoopHandle {
127127
/// This function will panic if the event loop this handle is associated
128128
/// with has gone away, or if there is an error communicating with the event
129129
/// loop.
130-
pub fn drop_source(&self, tok: &IoToken) {
131-
self.send(Message::DropSource(tok.token));
130+
pub fn drop_source(&self, tok: &IoToken) -> io::Result<()> {
131+
self.send(Message::DropSource(tok.token))
132132
}
133133
}
134134

src/event_loop/timeout.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ impl LoopHandle {
2525
///
2626
/// This method will panic if the timeout specified was not created by this
2727
/// loop handle's `add_timeout` method.
28-
pub fn update_timeout(&self, timeout: &TimeoutToken) {
28+
pub fn update_timeout(&self, timeout: &TimeoutToken) -> io::Result<()> {
2929
self.send(Message::UpdateTimeout(timeout.token, task::park()))
3030
}
3131

@@ -35,7 +35,7 @@ impl LoopHandle {
3535
///
3636
/// This method will panic if the timeout specified was not created by this
3737
/// loop handle's `add_timeout` method.
38-
pub fn cancel_timeout(&self, timeout: &TimeoutToken) {
38+
pub fn cancel_timeout(&self, timeout: &TimeoutToken) -> io::Result<()> {
3939
debug!("cancel timeout {}", timeout.token);
4040
self.send(Message::CancelTimeout(timeout.token))
4141
}

src/readiness_stream.rs

+8-7
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ impl<E> ReadinessStream<E> {
6666
if self.readiness.load(Ordering::SeqCst) & 1 != 0 {
6767
Ok(Async::Ready(()))
6868
} else {
69-
self.handle.schedule_read(&self.token);
69+
try!(self.handle.schedule_read(&self.token));
7070
Ok(Async::NotReady)
7171
}
7272
}
@@ -86,7 +86,7 @@ impl<E> ReadinessStream<E> {
8686
if self.readiness.load(Ordering::SeqCst) & 2 != 0 {
8787
Ok(Async::Ready(()))
8888
} else {
89-
self.handle.schedule_write(&self.token);
89+
try!(self.handle.schedule_write(&self.token));
9090
Ok(Async::NotReady)
9191
}
9292
}
@@ -102,9 +102,9 @@ impl<E> ReadinessStream<E> {
102102
/// The flag indicating that this stream is readable is unset and the
103103
/// current task is scheduled to receive a notification when the stream is
104104
/// then again readable.
105-
pub fn need_read(&self) {
105+
pub fn need_read(&self) -> io::Result<()> {
106106
self.readiness.fetch_and(!1, Ordering::SeqCst);
107-
self.handle.schedule_read(&self.token);
107+
self.handle.schedule_read(&self.token)
108108
}
109109

110110
/// Indicates to this source of events that the corresponding I/O object is
@@ -118,9 +118,9 @@ impl<E> ReadinessStream<E> {
118118
/// The flag indicating that this stream is writable is unset and the
119119
/// current task is scheduled to receive a notification when the stream is
120120
/// then again writable.
121-
pub fn need_write(&self) {
121+
pub fn need_write(&self) -> io::Result<()> {
122122
self.readiness.fetch_and(!2, Ordering::SeqCst);
123-
self.handle.schedule_write(&self.token);
123+
self.handle.schedule_write(&self.token)
124124
}
125125

126126
/// Returns a reference to the event loop handle that this readiness stream
@@ -161,6 +161,7 @@ impl<E> Future for ReadinessStreamNew<E>
161161

162162
impl<E> Drop for ReadinessStream<E> {
163163
fn drop(&mut self) {
164-
self.handle.drop_source(&self.token)
164+
// Ignore error
165+
drop(self.handle.drop_source(&self.token))
165166
}
166167
}

src/tcp.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ impl TcpListener {
9696
match self.inner.io.get_ref().accept() {
9797
Ok(pair) => Ok(Async::Ready(Some(pair))),
9898
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
99-
self.inner.io.need_read();
99+
try!(self.inner.io.need_read());
100100
Ok(Async::NotReady)
101101
}
102102
Err(e) => Err(e)
@@ -391,7 +391,7 @@ impl<'a> Read for &'a TcpStream {
391391
}
392392
let r = self.io.get_ref().read(buf);
393393
if is_wouldblock(&r) {
394-
self.io.need_read();
394+
try!(self.io.need_read());
395395
}
396396
return r
397397
}
@@ -404,7 +404,7 @@ impl<'a> Write for &'a TcpStream {
404404
}
405405
let r = self.io.get_ref().write(buf);
406406
if is_wouldblock(&r) {
407-
self.io.need_write();
407+
try!(self.io.need_write());
408408
}
409409
return r
410410
}
@@ -415,7 +415,7 @@ impl<'a> Write for &'a TcpStream {
415415
}
416416
let r = self.io.get_ref().flush();
417417
if is_wouldblock(&r) {
418-
self.io.need_write();
418+
try!(self.io.need_write());
419419
}
420420
return r
421421
}

src/timeout.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,15 @@ impl Future for Timeout {
5454
if *self.token.when() <= now {
5555
Ok(Async::Ready(()))
5656
} else {
57-
self.handle.update_timeout(&self.token);
57+
try!(self.handle.update_timeout(&self.token));
5858
Ok(Async::NotReady)
5959
}
6060
}
6161
}
6262

6363
impl Drop for Timeout {
6464
fn drop(&mut self) {
65-
self.handle.cancel_timeout(&self.token);
65+
// Ignore error
66+
drop(self.handle.cancel_timeout(&self.token));
6667
}
6768
}

src/udp.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ impl UdpSocket {
9090
match self.io.get_ref().send_to(buf, target) {
9191
Ok(Some(n)) => Ok(n),
9292
Ok(None) => {
93-
self.io.need_write();
93+
try!(self.io.need_write());
9494
Err(io::Error::new(io::ErrorKind::WouldBlock, "would block"))
9595
}
9696
Err(e) => Err(e),
@@ -106,7 +106,7 @@ impl UdpSocket {
106106
match self.io.get_ref().recv_from(buf) {
107107
Ok(Some(n)) => Ok(n),
108108
Ok(None) => {
109-
self.io.need_read();
109+
try!(self.io.need_read());
110110
Err(io::Error::new(io::ErrorKind::WouldBlock, "would block"))
111111
}
112112
Err(e) => Err(e),

tests/channel.rs

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
extern crate tokio_core;
2+
extern crate futures;
3+
4+
use tokio_core::Loop;
5+
6+
use futures::stream::Stream;
7+
8+
9+
#[test]
10+
fn recv_after_loop_drop() {
11+
let mut lp: Loop = Loop::new().unwrap();
12+
13+
let (_sender, receiver) = lp.handle().channel::<u32>();
14+
15+
let receiver = lp.run(receiver).unwrap();
16+
17+
drop(lp);
18+
19+
assert!(receiver.wait().next().unwrap().is_err());
20+
}
21+
22+
#[test]
23+
fn recv_after_loop_and_sender_drop() {
24+
let mut lp: Loop = Loop::new().unwrap();
25+
26+
let (sender, receiver) = lp.handle().channel::<u32>();
27+
28+
let receiver = lp.run(receiver).unwrap();
29+
30+
drop(lp);
31+
drop(sender);
32+
33+
assert!(receiver.wait().next().unwrap().is_err());
34+
}

tests/poll.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ fn poll_after_ready() {
5252

5353
fn poll(&mut self) -> Poll<(), ()> {
5454
if self.n == 0 {
55-
self.handle.schedule_read(&self.token);
55+
self.handle.schedule_read(&self.token).expect("schedule_read");
5656
self.n += 1;
5757
Ok(Async::NotReady)
5858
} else {

tests/spawn.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ fn simple() {
2121
tx2.complete(2);
2222
Ok(())
2323
})
24-
});
24+
}).expect("spawn");
2525

2626
assert_eq!(lp.run(rx1.join(rx2)).unwrap(), (1, 2));
2727
}
@@ -41,7 +41,7 @@ fn spawn_in_poll() {
4141
tx2.complete(2);
4242
Ok(())
4343
})
44-
});
44+
}).expect("spawn");
4545
Ok(())
4646
}));
4747

0 commit comments

Comments
 (0)