Skip to content

Commit 03edb0f

Browse files
conradludgatetaiki-e
authored andcommitted
Fix bug in FuturesOrdered::push_front (#2664)
1 parent 1340b16 commit 03edb0f

File tree

2 files changed

+30
-10
lines changed

2 files changed

+30
-10
lines changed

futures-util/src/stream/futures_ordered.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pin_project! {
1919
struct OrderWrapper<T> {
2020
#[pin]
2121
data: T, // A future or a future's output
22-
index: usize,
22+
index: isize,
2323
}
2424
}
2525

@@ -95,8 +95,8 @@ where
9595
pub struct FuturesOrdered<T: Future> {
9696
in_progress_queue: FuturesUnordered<OrderWrapper<T>>,
9797
queued_outputs: BinaryHeap<OrderWrapper<T::Output>>,
98-
next_incoming_index: usize,
99-
next_outgoing_index: usize,
98+
next_incoming_index: isize,
99+
next_outgoing_index: isize,
100100
}
101101

102102
impl<T: Future> Unpin for FuturesOrdered<T> {}
@@ -160,13 +160,9 @@ impl<Fut: Future> FuturesOrdered<Fut> {
160160
/// task notifications. This future will be the next future to be returned
161161
/// complete.
162162
pub fn push_front(&mut self, future: Fut) {
163-
if self.next_outgoing_index == 0 {
164-
self.push_back(future)
165-
} else {
166-
let wrapped = OrderWrapper { data: future, index: self.next_outgoing_index - 1 };
167-
self.next_outgoing_index -= 1;
168-
self.in_progress_queue.push(wrapped);
169-
}
163+
let wrapped = OrderWrapper { data: future, index: self.next_outgoing_index - 1 };
164+
self.next_outgoing_index -= 1;
165+
self.in_progress_queue.push(wrapped);
170166
}
171167
}
172168

futures/tests/stream_futures_ordered.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,3 +146,27 @@ fn queue_never_unblocked() {
146146
assert!(stream.poll_next_unpin(cx).is_pending());
147147
assert!(stream.poll_next_unpin(cx).is_pending());
148148
}
149+
150+
#[test]
151+
fn test_push_front_negative() {
152+
let (a_tx, a_rx) = oneshot::channel::<i32>();
153+
let (b_tx, b_rx) = oneshot::channel::<i32>();
154+
let (c_tx, c_rx) = oneshot::channel::<i32>();
155+
156+
let mut stream = FuturesOrdered::new();
157+
158+
let mut cx = noop_context();
159+
160+
stream.push_front(a_rx);
161+
stream.push_front(b_rx);
162+
stream.push_front(c_rx);
163+
164+
a_tx.send(1).unwrap();
165+
b_tx.send(2).unwrap();
166+
c_tx.send(3).unwrap();
167+
168+
// These should all be recieved in reverse order
169+
assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx));
170+
assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx));
171+
assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx));
172+
}

0 commit comments

Comments
 (0)