Skip to content

Commit 2b2f615

Browse files
committed
Do not hang in poll if reactor is destroyed
After channel receiver dropped, sender send returns an error. So any further `poll` requests fail.
1 parent cd331dd commit 2b2f615

File tree

12 files changed

+157
-86
lines changed

12 files changed

+157
-86
lines changed

src/channel.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,13 @@ impl<T> Stream for Receiver<T> {
9393
type Error = io::Error;
9494

9595
fn poll(&mut self) -> Poll<Option<T>, io::Error> {
96-
if let Async::NotReady = self.rx.poll_read() {
96+
if let Async::NotReady = try!(self.rx.poll_read()) {
9797
return Ok(Async::NotReady)
9898
}
9999
match self.rx.get_ref().try_recv() {
100100
Ok(t) => Ok(Async::Ready(Some(t))),
101101
Err(TryRecvError::Empty) => {
102-
self.rx.need_read();
102+
try!(self.rx.need_read());
103103
Ok(Async::NotReady)
104104
}
105105
Err(TryRecvError::Disconnected) => Ok(Async::Ready(None)),

src/io/mod.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
66
use std::io::{self, Read, Write};
77

8-
use futures::{BoxFuture, Async};
8+
use futures::{BoxFuture, Async, Poll};
99
use futures::stream::BoxStream;
1010

1111
/// A convenience typedef around a `Future` whose error component is `io::Error`
@@ -76,8 +76,8 @@ pub trait Io: Read + Write {
7676
///
7777
/// This method is likely to panic if called from outside the context of a
7878
/// future's task.
79-
fn poll_read(&mut self) -> Async<()> {
80-
Async::Ready(())
79+
fn poll_read(&mut self) -> Poll<(), io::Error> {
80+
Ok(Async::Ready(()))
8181
}
8282

8383
/// Tests to see if this I/O object may be writable.
@@ -98,8 +98,8 @@ pub trait Io: Read + Write {
9898
///
9999
/// This method is likely to panic if called from outside the context of a
100100
/// future's task.
101-
fn poll_write(&mut self) -> Async<()> {
102-
Async::Ready(())
101+
fn poll_write(&mut self) -> Poll<(), io::Error> {
102+
Ok(Async::Ready(()))
103103
}
104104

105105
/// Helper method for splitting this read/write object into two halves.

src/net/tcp.rs

+11-11
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ impl TcpListener {
7676
}
7777

7878
/// Test whether this socket is ready to be read or not.
79-
pub fn poll_read(&self) -> Async<()> {
79+
pub fn poll_read(&self) -> Poll<(), io::Error> {
8080
self.io.poll_read()
8181
}
8282

@@ -103,13 +103,13 @@ impl TcpListener {
103103
type Error = io::Error;
104104

105105
fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
106-
if let Async::NotReady = self.inner.io.poll_read() {
106+
if let Async::NotReady = try!(self.inner.io.poll_read()) {
107107
return Ok(Async::NotReady)
108108
}
109109
match self.inner.io.get_ref().accept() {
110110
Ok(pair) => Ok(Async::Ready(Some(pair))),
111111
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
112-
self.inner.io.need_read();
112+
try!(self.inner.io.need_read());
113113
Ok(Async::NotReady)
114114
}
115115
Err(e) => Err(e)
@@ -128,7 +128,7 @@ impl TcpListener {
128128
});
129129
tx.complete(res);
130130
Ok(())
131-
});
131+
}).expect("failed to spawn");
132132
rx.then(|r| r.expect("shouldn't be canceled"))
133133
}).boxed(),
134134
}
@@ -266,7 +266,7 @@ impl TcpStream {
266266
/// get a notification when the socket does become readable. That is, this
267267
/// is only suitable for calling in a `Future::poll` method and will
268268
/// automatically handle ensuring a retry once the socket is readable again.
269-
pub fn poll_read(&self) -> Async<()> {
269+
pub fn poll_read(&self) -> Poll<(), io::Error> {
270270
self.io.poll_read()
271271
}
272272

@@ -276,7 +276,7 @@ impl TcpStream {
276276
/// get a notification when the socket does become writable. That is, this
277277
/// is only suitable for calling in a `Future::poll` method and will
278278
/// automatically handle ensuring a retry once the socket is writable again.
279-
pub fn poll_write(&self) -> Async<()> {
279+
pub fn poll_write(&self) -> Poll<(), io::Error> {
280280
self.io.poll_write()
281281
}
282282

@@ -379,11 +379,11 @@ impl Write for TcpStream {
379379
}
380380

381381
impl Io for TcpStream {
382-
fn poll_read(&mut self) -> Async<()> {
382+
fn poll_read(&mut self) -> Poll<(), io::Error> {
383383
<TcpStream>::poll_read(self)
384384
}
385385

386-
fn poll_write(&mut self) -> Async<()> {
386+
fn poll_write(&mut self) -> Poll<(), io::Error> {
387387
<TcpStream>::poll_write(self)
388388
}
389389
}
@@ -405,11 +405,11 @@ impl<'a> Write for &'a TcpStream {
405405
}
406406

407407
impl<'a> Io for &'a TcpStream {
408-
fn poll_read(&mut self) -> Async<()> {
408+
fn poll_read(&mut self) -> Poll<(), io::Error> {
409409
<TcpStream>::poll_read(self)
410410
}
411411

412-
fn poll_write(&mut self) -> Async<()> {
412+
fn poll_write(&mut self) -> Poll<(), io::Error> {
413413
<TcpStream>::poll_write(self)
414414
}
415415
}
@@ -446,7 +446,7 @@ impl Future for TcpStreamConnect {
446446
// actually hit an error or not.
447447
//
448448
// If all that succeeded then we ship everything on up.
449-
if let Async::NotReady = stream.io.poll_write() {
449+
if let Async::NotReady = try!(stream.io.poll_write()) {
450450
return Ok(Async::NotReady)
451451
}
452452
if let Some(e) = try!(stream.io.get_ref().take_error()) {

src/net/udp.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ impl UdpSocket {
6161
/// get a notification when the socket does become readable. That is, this
6262
/// is only suitable for calling in a `Future::poll` method and will
6363
/// automatically handle ensuring a retry once the socket is readable again.
64-
pub fn poll_read(&self) -> Async<()> {
64+
pub fn poll_read(&self) -> Poll<(), io::Error> {
6565
self.io.poll_read()
6666
}
6767

@@ -71,7 +71,7 @@ impl UdpSocket {
7171
/// get a notification when the socket does become writable. That is, this
7272
/// is only suitable for calling in a `Future::poll` method and will
7373
/// automatically handle ensuring a retry once the socket is writable again.
74-
pub fn poll_write(&self) -> Async<()> {
74+
pub fn poll_write(&self) -> Poll<(), io::Error> {
7575
self.io.poll_write()
7676
}
7777

@@ -81,13 +81,13 @@ impl UdpSocket {
8181
/// Address type can be any implementor of `ToSocketAddrs` trait. See its
8282
/// documentation for concrete examples.
8383
pub fn send_to(&self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> {
84-
if let Async::NotReady = self.io.poll_write() {
84+
if let Async::NotReady = try!(self.io.poll_write()) {
8585
return Err(mio::would_block())
8686
}
8787
match self.io.get_ref().send_to(buf, target) {
8888
Ok(Some(n)) => Ok(n),
8989
Ok(None) => {
90-
self.io.need_write();
90+
try!(self.io.need_write());
9191
Err(mio::would_block())
9292
}
9393
Err(e) => Err(e),
@@ -97,13 +97,13 @@ impl UdpSocket {
9797
/// Receives data from the socket. On success, returns the number of bytes
9898
/// read and the address from whence the data came.
9999
pub fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
100-
if let Async::NotReady = self.io.poll_read() {
100+
if let Async::NotReady = try!(self.io.poll_read()) {
101101
return Err(mio::would_block())
102102
}
103103
match self.io.get_ref().recv_from(buf) {
104104
Ok(Some(n)) => Ok(n),
105105
Ok(None) => {
106-
self.io.need_read();
106+
try!(self.io.need_read());
107107
Err(mio::would_block())
108108
}
109109
Err(e) => Err(e),

src/reactor/channel.rs

+24-5
Original file line numberDiff line numberDiff line change
@@ -10,25 +10,35 @@ use std::cell::Cell;
1010
use std::io;
1111
use std::marker;
1212
use std::sync::Arc;
13+
use std::sync::atomic::AtomicBool;
14+
use std::sync::atomic::Ordering;
1315

1416
use mio;
1517
use mio::channel::{ctl_pair, SenderCtl, ReceiverCtl};
1618

1719
use mpsc_queue::{Queue, PopResult};
1820

21+
struct Inner<T> {
22+
queue: Queue<T>,
23+
receiver_alive: AtomicBool,
24+
}
25+
1926
pub struct Sender<T> {
2027
ctl: SenderCtl,
21-
inner: Arc<Queue<T>>,
28+
inner: Arc<Inner<T>>,
2229
}
2330

2431
pub struct Receiver<T> {
2532
ctl: ReceiverCtl,
26-
inner: Arc<Queue<T>>,
33+
inner: Arc<Inner<T>>,
2734
_marker: marker::PhantomData<Cell<()>>, // this type is not Sync
2835
}
2936

3037
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
31-
let inner = Arc::new(Queue::new());
38+
let inner = Arc::new(Inner {
39+
queue: Queue::new(),
40+
receiver_alive: AtomicBool::new(true),
41+
});
3242
let (tx, rx) = ctl_pair();
3343

3444
let tx = Sender {
@@ -45,7 +55,10 @@ pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
4555

4656
impl<T> Sender<T> {
4757
pub fn send(&self, data: T) -> io::Result<()> {
48-
self.inner.push(data);
58+
if !self.inner.receiver_alive.load(Ordering::SeqCst) {
59+
return Err(io::Error::new(io::ErrorKind::Other, "receiver has been dropped"));
60+
}
61+
self.inner.queue.push(data);
4962
self.ctl.inc()
5063
}
5164
}
@@ -57,7 +70,7 @@ impl<T> Receiver<T> {
5770
//
5871
// We, however, are the only thread with a `Receiver<T>` because this
5972
// type is not `Sync`. and we never handed out another instance.
60-
match unsafe { self.inner.pop() } {
73+
match unsafe { self.inner.queue.pop() } {
6174
PopResult::Data(t) => {
6275
try!(self.ctl.dec());
6376
Ok(Some(t))
@@ -108,6 +121,12 @@ impl<T> mio::Evented for Receiver<T> {
108121
}
109122
}
110123

124+
impl<T> Drop for Receiver<T> {
125+
fn drop(&mut self) {
126+
self.inner.receiver_alive.store(false, Ordering::SeqCst);
127+
}
128+
}
129+
111130
impl<T> Clone for Sender<T> {
112131
fn clone(&self) -> Sender<T> {
113132
Sender {

src/reactor/io_token.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,8 @@ impl IoToken {
8282
///
8383
/// This function will also panic if there is not a currently running future
8484
/// task.
85-
pub fn schedule_read(&self, handle: &Remote) {
86-
handle.send(Message::Schedule(self.token, task::park(), Direction::Read));
85+
pub fn schedule_read(&self, handle: &Remote) -> io::Result<()> {
86+
handle.send(Message::Schedule(self.token, task::park(), Direction::Read))
8787
}
8888

8989
/// Schedule the current future task to receive a notification when the
@@ -109,8 +109,8 @@ impl IoToken {
109109
///
110110
/// This function will also panic if there is not a currently running future
111111
/// task.
112-
pub fn schedule_write(&self, handle: &Remote) {
113-
handle.send(Message::Schedule(self.token, task::park(), Direction::Write));
112+
pub fn schedule_write(&self, handle: &Remote) -> io::Result<()> {
113+
handle.send(Message::Schedule(self.token, task::park(), Direction::Write))
114114
}
115115

116116
/// Unregister all information associated with a token on an event loop,
@@ -135,7 +135,7 @@ impl IoToken {
135135
/// This function will panic if the event loop this handle is associated
136136
/// with has gone away, or if there is an error communicating with the event
137137
/// loop.
138-
pub fn drop_source(&self, handle: &Remote) {
139-
handle.send(Message::DropSource(self.token));
138+
pub fn drop_source(&self, handle: &Remote) -> io::Result<()> {
139+
handle.send(Message::DropSource(self.token))
140140
}
141141
}

src/reactor/mod.rs

+34-16
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ const SLAB_CAPACITY: usize = 1024 * 64;
4444
pub struct Core {
4545
events: mio::Events,
4646
tx: Sender<Message>,
47-
rx: Receiver<Message>,
47+
// `rx` is `Option` here only because it is needed to be dropped explicitly
48+
// in `drop` before other things.
49+
rx: Option<Receiver<Message>>,
4850
inner: Rc<RefCell<Inner>>,
4951

5052
// Used for determining when the future passed to `run` is ready. Once the
@@ -143,7 +145,7 @@ impl Core {
143145
Ok(Core {
144146
events: mio::Events::with_capacity(1024),
145147
tx: tx,
146-
rx: rx,
148+
rx: Some(rx),
147149
_future_registration: future_pair.0,
148150
future_readiness: Arc::new(MySetReadiness(future_pair.1)),
149151

@@ -378,8 +380,9 @@ impl Core {
378380

379381
fn consume_queue(&self) {
380382
debug!("consuming notification queue");
383+
let rx = self.rx.as_ref().unwrap();
381384
// TODO: can we do better than `.unwrap()` here?
382-
while let Some(msg) = self.rx.recv().unwrap() {
385+
while let Some(msg) = rx.recv().unwrap() {
383386
self.notify(msg);
384387
}
385388
}
@@ -407,6 +410,27 @@ impl Core {
407410
}
408411
}
409412

413+
impl Drop for Core {
414+
fn drop(&mut self) {
415+
// Destroy the receiver, so all schedule operations will be rejected.
416+
// Do it explicitly before unparking, to avoid race condition.
417+
self.rx.take();
418+
419+
// Unpark all tasks.
420+
// It has no effect for tasks in this event loop,
421+
// however tasks in another executors get an error
422+
// when they do `poll` right after wakeup.
423+
for io in self.inner.borrow_mut().io_dispatch.iter_mut() {
424+
if let Some(ref mut reader) = io.reader {
425+
reader.unpark();
426+
}
427+
if let Some(ref mut writer) = io.writer {
428+
writer.unpark();
429+
}
430+
}
431+
}
432+
}
433+
410434
impl Inner {
411435
fn add_source(&mut self, source: &mio::Evented)
412436
-> io::Result<(Arc<AtomicUsize>, usize)> {
@@ -497,26 +521,20 @@ impl Inner {
497521
}
498522

499523
impl Remote {
500-
fn send(&self, msg: Message) {
524+
fn send(&self, msg: Message) -> io::Result<()> {
501525
self.with_loop(|lp| {
502526
match lp {
503527
Some(lp) => {
504528
// Need to execute all existing requests first, to ensure
505529
// that our message is processed "in order"
506530
lp.consume_queue();
507531
lp.notify(msg);
532+
Ok(())
508533
}
509534
None => {
510-
match self.tx.send(msg) {
511-
Ok(()) => {}
512-
513-
// This should only happen when there was an error
514-
// writing to the pipe to wake up the event loop,
515-
// hopefully that never happens
516-
Err(e) => {
517-
panic!("error sending message to event loop: {}", e)
518-
}
519-
}
535+
// May return an error if receiver is closed
536+
// or if there was an error writing to the pipe.
537+
self.tx.send(msg)
520538
}
521539
}
522540
})
@@ -547,15 +565,15 @@ impl Remote {
547565
///
548566
/// Note that while the closure, `F`, requires the `Send` bound as it might
549567
/// cross threads, the future `R` does not.
550-
pub fn spawn<F, R>(&self, f: F)
568+
pub fn spawn<F, R>(&self, f: F) -> io::Result<()>
551569
where F: FnOnce(&Handle) -> R + Send + 'static,
552570
R: IntoFuture<Item=(), Error=()>,
553571
R::Future: 'static,
554572
{
555573
self.send(Message::Run(Box::new(|lp: &Core| {
556574
let f = f(&lp.handle());
557575
lp.inner.borrow_mut().spawn(Box::new(f.into_future()));
558-
})));
576+
})))
559577
}
560578
}
561579

0 commit comments

Comments
 (0)