Skip to content

Commit be67640

Browse files
committed
Do not hang in poll if reactor is destroyed
If channel is dropped, receiver may still return EOF, and if channel is alive, receiver produces an error.
1 parent 1d40bf1 commit be67640

File tree

11 files changed

+146
-48
lines changed

11 files changed

+146
-48
lines changed

src/channel.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ impl<T> Stream for Receiver<T> {
9999
match self.rx.get_ref().try_recv() {
100100
Ok(t) => Ok(Async::Ready(Some(t))),
101101
Err(TryRecvError::Empty) => {
102-
self.rx.need_read();
102+
try!(self.rx.need_read());
103103
Ok(Async::NotReady)
104104
}
105105
Err(TryRecvError::Disconnected) => Ok(Async::Ready(None)),

src/net/tcp.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ impl TcpListener {
108108
match self.inner.io.get_ref().accept() {
109109
Ok(pair) => Ok(Async::Ready(Some(pair))),
110110
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
111-
self.inner.io.need_read();
111+
try!(self.inner.io.need_read());
112112
Ok(Async::NotReady)
113113
}
114114
Err(e) => Err(e)
@@ -127,7 +127,7 @@ impl TcpListener {
127127
});
128128
tx.complete(res);
129129
Ok(())
130-
});
130+
}).expect("failed to spawn");
131131
rx.then(|r| r.expect("shouldn't be canceled"))
132132
}).boxed(),
133133
}

src/net/udp.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ impl UdpSocket {
7979
match self.io.get_ref().send_to(buf, target) {
8080
Ok(Some(n)) => Ok(n),
8181
Ok(None) => {
82-
self.io.need_write();
82+
try!(self.io.need_write());
8383
Err(mio::would_block())
8484
}
8585
Err(e) => Err(e),
@@ -95,7 +95,7 @@ impl UdpSocket {
9595
match self.io.get_ref().recv_from(buf) {
9696
Ok(Some(n)) => Ok(n),
9797
Ok(None) => {
98-
self.io.need_read();
98+
try!(self.io.need_read());
9999
Err(mio::would_block())
100100
}
101101
Err(e) => Err(e),

src/reactor/channel.rs

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,25 +10,35 @@ use std::cell::Cell;
1010
use std::io;
1111
use std::marker;
1212
use std::sync::Arc;
13+
use std::sync::atomic::AtomicBool;
14+
use std::sync::atomic::Ordering;
1315

1416
use mio;
1517
use mio::channel::{ctl_pair, SenderCtl, ReceiverCtl};
1618

1719
use mpsc_queue::{Queue, PopResult};
1820

21+
struct Inner<T> {
22+
queue: Queue<T>,
23+
receiver_alive: AtomicBool,
24+
}
25+
1926
pub struct Sender<T> {
2027
ctl: SenderCtl,
21-
inner: Arc<Queue<T>>,
28+
inner: Arc<Inner<T>>,
2229
}
2330

2431
pub struct Receiver<T> {
2532
ctl: ReceiverCtl,
26-
inner: Arc<Queue<T>>,
33+
inner: Arc<Inner<T>>,
2734
_marker: marker::PhantomData<Cell<()>>, // this type is not Sync
2835
}
2936

3037
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
31-
let inner = Arc::new(Queue::new());
38+
let inner = Arc::new(Inner {
39+
queue: Queue::new(),
40+
receiver_alive: AtomicBool::new(true),
41+
});
3242
let (tx, rx) = ctl_pair();
3343

3444
let tx = Sender {
@@ -45,7 +55,10 @@ pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
4555

4656
impl<T> Sender<T> {
4757
pub fn send(&self, data: T) -> io::Result<()> {
48-
self.inner.push(data);
58+
if !self.inner.receiver_alive.load(Ordering::SeqCst) {
59+
return Err(io::Error::new(io::ErrorKind::Other, "receiver has been closed"));
60+
}
61+
self.inner.queue.push(data);
4962
self.ctl.inc()
5063
}
5164
}
@@ -57,7 +70,7 @@ impl<T> Receiver<T> {
5770
//
5871
// We, however, are the only thread with a `Receiver<T>` because this
5972
// type is not `Sync`. and we never handed out another instance.
60-
match unsafe { self.inner.pop() } {
73+
match unsafe { self.inner.queue.pop() } {
6174
PopResult::Data(t) => {
6275
try!(self.ctl.dec());
6376
Ok(Some(t))
@@ -85,6 +98,13 @@ impl<T> Receiver<T> {
8598
}
8699
}
87100

101+
// Close receiver, so further send operations would fail.
102+
// This function is used internally in `Core` and is not exposed as public API.
103+
pub fn close_receiver<T>(receiver: &Receiver<T>) {
104+
receiver.inner.as_ref().receiver_alive.store(false, Ordering::SeqCst);
105+
}
106+
107+
88108
// Just delegate everything to `self.ctl`
89109
impl<T> mio::Evented for Receiver<T> {
90110
fn register(&self,
@@ -108,6 +128,12 @@ impl<T> mio::Evented for Receiver<T> {
108128
}
109129
}
110130

131+
impl<T> Drop for Receiver<T> {
132+
fn drop(&mut self) {
133+
close_receiver(self);
134+
}
135+
}
136+
111137
impl<T> Clone for Sender<T> {
112138
fn clone(&self) -> Sender<T> {
113139
Sender {

src/reactor/io_token.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ impl IoToken {
7171
/// receive further notifications it will need to call `schedule_read`
7272
/// again.
7373
///
74+
/// This function returns an error if reactor is destroyed.
75+
///
7476
/// > **Note**: This method should generally not be used directly, but
7577
/// > rather the `ReadinessStream` type should be used instead.
7678
///
@@ -82,8 +84,8 @@ impl IoToken {
8284
///
8385
/// This function will also panic if there is not a currently running future
8486
/// task.
85-
pub fn schedule_read(&self, handle: &Remote) {
86-
handle.send(Message::Schedule(self.token, task::park(), Direction::Read));
87+
pub fn schedule_read(&self, handle: &Remote) -> io::Result<()> {
88+
handle.send(Message::Schedule(self.token, task::park(), Direction::Read))
8789
}
8890

8991
/// Schedule the current future task to receive a notification when the
@@ -98,6 +100,8 @@ impl IoToken {
98100
/// receive further notifications it will need to call `schedule_write`
99101
/// again.
100102
///
103+
/// This function returns an error if reactor is destroyed.
104+
///
101105
/// > **Note**: This method should generally not be used directly, but
102106
/// > rather the `ReadinessStream` type should be used instead.
103107
///
@@ -109,8 +113,8 @@ impl IoToken {
109113
///
110114
/// This function will also panic if there is not a currently running future
111115
/// task.
112-
pub fn schedule_write(&self, handle: &Remote) {
113-
handle.send(Message::Schedule(self.token, task::park(), Direction::Write));
116+
pub fn schedule_write(&self, handle: &Remote) -> io::Result<()> {
117+
handle.send(Message::Schedule(self.token, task::park(), Direction::Write))
114118
}
115119

116120
/// Unregister all information associated with a token on an event loop,
@@ -127,6 +131,8 @@ impl IoToken {
127131
/// ensure that the callbacks are **not** invoked, so pending scheduled
128132
/// callbacks cannot be relied upon to get called.
129133
///
134+
/// This function returns an error if reactor is destroyed.
135+
///
130136
/// > **Note**: This method should generally not be used directly, but
131137
/// > rather the `ReadinessStream` type should be used instead.
132138
///
@@ -135,7 +141,7 @@ impl IoToken {
135141
/// This function will panic if the event loop this handle is associated
136142
/// with has gone away, or if there is an error communicating with the event
137143
/// loop.
138-
pub fn drop_source(&self, handle: &Remote) {
139-
handle.send(Message::DropSource(self.token));
144+
pub fn drop_source(&self, handle: &Remote) -> io::Result<()> {
145+
handle.send(Message::DropSource(self.token))
140146
}
141147
}

src/reactor/mod.rs

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,27 @@ impl Core {
408408
}
409409
}
410410

411+
impl Drop for Core {
412+
fn drop(&mut self) {
413+
// Close the receiver, so all schedule operations will be rejected.
414+
// Do it explicitly before unparking to avoid race condition.
415+
channel::close_receiver(&self.rx);
416+
417+
// Unpark all tasks.
418+
// It has no effect for tasks in this event loop,
419+
// however tasks in another executors get an error
420+
// when they do `poll` right after wakeup.
421+
for io in self.inner.borrow_mut().io_dispatch.iter_mut() {
422+
if let Some(ref mut reader) = io.reader {
423+
reader.unpark();
424+
}
425+
if let Some(ref mut writer) = io.writer {
426+
writer.unpark();
427+
}
428+
}
429+
}
430+
}
431+
411432
impl Inner {
412433
fn add_source(&mut self, source: &mio::Evented)
413434
-> io::Result<(Arc<AtomicUsize>, usize)> {
@@ -498,26 +519,20 @@ impl Inner {
498519
}
499520

500521
impl Remote {
501-
fn send(&self, msg: Message) {
522+
fn send(&self, msg: Message) -> io::Result<()> {
502523
self.with_loop(|lp| {
503524
match lp {
504525
Some(lp) => {
505526
// Need to execute all existing requests first, to ensure
506527
// that our message is processed "in order"
507528
lp.consume_queue();
508529
lp.notify(msg);
530+
Ok(())
509531
}
510532
None => {
511-
match self.tx.send(msg) {
512-
Ok(()) => {}
513-
514-
// This should only happen when there was an error
515-
// writing to the pipe to wake up the event loop,
516-
// hopefully that never happens
517-
Err(e) => {
518-
panic!("error sending message to event loop: {}", e)
519-
}
520-
}
533+
// May return an error if receiver is closed
534+
// or if there was an error writing to the pipe.
535+
self.tx.send(msg)
521536
}
522537
}
523538
})
@@ -548,15 +563,17 @@ impl Remote {
548563
///
549564
/// Note that while the closure, `F`, requires the `Send` bound as it might
550565
/// cross threads, the future `R` does not.
551-
pub fn spawn<F, R>(&self, f: F)
566+
///
567+
/// This function returns an error if reactor is destroyed.
568+
pub fn spawn<F, R>(&self, f: F) -> io::Result<()>
552569
where F: FnOnce(&Handle) -> R + Send + 'static,
553570
R: IntoFuture<Item=(), Error=()>,
554571
R::Future: 'static,
555572
{
556573
self.send(Message::Run(Box::new(|lp: &Core| {
557574
let f = f(&lp.handle());
558575
lp.inner.borrow_mut().spawn(Box::new(f.into_future()));
559-
})));
576+
})))
560577
}
561578
}
562579

src/reactor/poll_evented.rs

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,11 @@ impl<E> PollEvented<E> {
7171
if self.readiness.load(Ordering::SeqCst) & 1 != 0 {
7272
Async::Ready(())
7373
} else {
74-
self.token.schedule_read(&self.handle);
75-
Async::NotReady
74+
match self.token.schedule_read(&self.handle) {
75+
Ok(()) => Async::NotReady,
76+
// next read will return error
77+
Err(_) => Async::Ready(()),
78+
}
7679
}
7780
}
7881

@@ -91,8 +94,12 @@ impl<E> PollEvented<E> {
9194
if self.readiness.load(Ordering::SeqCst) & 2 != 0 {
9295
Async::Ready(())
9396
} else {
94-
self.token.schedule_write(&self.handle);
95-
Async::NotReady
97+
// ignore error, will be handled in need_write
98+
match self.token.schedule_write(&self.handle) {
99+
Ok(()) => Async::NotReady,
100+
// next read will return error
101+
Err(_) => Async::Ready(()),
102+
}
96103
}
97104
}
98105

@@ -111,7 +118,9 @@ impl<E> PollEvented<E> {
111118
/// Note that it is also only valid to call this method if `poll_read`
112119
/// previously indicated that the object is readable. That is, this function
113120
/// must always be paired with calls to `poll_read` previously.
114-
pub fn need_read(&self) {
121+
///
122+
/// This function returns an error if reactor is destroyed.
123+
pub fn need_read(&self) -> io::Result<()> {
115124
self.readiness.fetch_and(!1, Ordering::SeqCst);
116125
self.token.schedule_read(&self.handle)
117126
}
@@ -131,7 +140,9 @@ impl<E> PollEvented<E> {
131140
/// Note that it is also only valid to call this method if `poll_write`
132141
/// previously indicated that the object is writeable. That is, this function
133142
/// must always be paired with calls to `poll_write` previously.
134-
pub fn need_write(&self) {
143+
///
144+
/// This function returns an error if reactor is destroyed.
145+
pub fn need_write(&self) -> io::Result<()> {
135146
self.readiness.fetch_and(!2, Ordering::SeqCst);
136147
self.token.schedule_write(&self.handle)
137148
}
@@ -162,7 +173,7 @@ impl<E: Read> Read for PollEvented<E> {
162173
}
163174
let r = self.get_mut().read(buf);
164175
if is_wouldblock(&r) {
165-
self.need_read();
176+
try!(self.need_read());
166177
}
167178
return r
168179
}
@@ -175,7 +186,7 @@ impl<E: Write> Write for PollEvented<E> {
175186
}
176187
let r = self.get_mut().write(buf);
177188
if is_wouldblock(&r) {
178-
self.need_write();
189+
try!(self.need_write());
179190
}
180191
return r
181192
}
@@ -186,7 +197,7 @@ impl<E: Write> Write for PollEvented<E> {
186197
}
187198
let r = self.get_mut().flush();
188199
if is_wouldblock(&r) {
189-
self.need_write();
200+
try!(self.need_write());
190201
}
191202
return r
192203
}
@@ -211,7 +222,7 @@ impl<'a, E> Read for &'a PollEvented<E>
211222
}
212223
let r = self.get_ref().read(buf);
213224
if is_wouldblock(&r) {
214-
self.need_read();
225+
try!(self.need_read());
215226
}
216227
return r
217228
}
@@ -226,7 +237,7 @@ impl<'a, E> Write for &'a PollEvented<E>
226237
}
227238
let r = self.get_ref().write(buf);
228239
if is_wouldblock(&r) {
229-
self.need_write();
240+
try!(self.need_write());
230241
}
231242
return r
232243
}
@@ -237,7 +248,7 @@ impl<'a, E> Write for &'a PollEvented<E>
237248
}
238249
let r = self.get_ref().flush();
239250
if is_wouldblock(&r) {
240-
self.need_write();
251+
try!(self.need_write());
241252
}
242253
return r
243254
}
@@ -264,6 +275,7 @@ fn is_wouldblock<T>(r: &io::Result<T>) -> bool {
264275

265276
impl<E> Drop for PollEvented<E> {
266277
fn drop(&mut self) {
267-
self.token.drop_source(&self.handle);
278+
// Ignore error
279+
drop(self.token.drop_source(&self.handle));
268280
}
269281
}

0 commit comments

Comments
 (0)