Skip to content

Commit ad35412

Browse files
fix unsoundness due to not releasing task during mapped_futures cancel()
1 parent b89460e commit ad35412

File tree

2 files changed

+9
-8
lines changed
  • futures-util/src/stream

2 files changed

+9
-8
lines changed

futures-util/src/stream/futures_unordered_internal/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ impl<K, Fut, S: ReleasesTask<K>> FuturesUnorderedInternal<K, Fut, S> {
253253
/// Releases the task. It destroys the future inside and either drops
254254
/// the `Arc<Task>` or transfers ownership to the ready to run queue.
255255
/// The task this method is called on must have been unlinked before.
256-
fn release_task(&mut self, task: Arc<Task<K, Fut>>) {
256+
pub(super) fn release_task(&mut self, task: Arc<Task<K, Fut>>) {
257257
if let Some(key) = task.key() {
258258
self.inner.release_task(key);
259259
}

futures-util/src/stream/mapped_futures/mod.rs

+8-7
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,8 @@ impl<K: Hash + Eq, Fut> MappedFutures<K, Fut> {
167167
if let Some(task) = self.set().take(key) {
168168
unsafe {
169169
if (*task.future.get()).is_some() {
170-
*task.future.get() = None;
171170
self.inner.unlink(Arc::as_ptr(&task.inner));
171+
self.inner.release_task(task.inner.clone());
172172
return true;
173173
}
174174
}
@@ -186,6 +186,7 @@ impl<K: Hash + Eq, Fut> MappedFutures<K, Fut> {
186186
unsafe {
187187
let fut = (*task.future.get()).take().unwrap();
188188
self.inner.unlink(Arc::as_ptr(&task.inner));
189+
self.inner.release_task(task.inner.clone());
189190
return Some(fut);
190191
}
191192
}
@@ -311,7 +312,7 @@ impl<K: Hash + Eq, Fut: Future> Stream for MappedFutures<K, Fut> {
311312
// let key = task.take_key();
312313
// MappedFutures::set(self.get_mut()).remove(&key);
313314
MappedFutures::set(self.get_mut()).remove(task.key().unwrap());
314-
return Poll::Ready(Some((task.take_key(), output)));
315+
Poll::Ready(Some((task.take_key(), output)))
315316
}
316317
None => Poll::Ready(None),
317318
}
@@ -450,13 +451,13 @@ pub mod tests {
450451
#[test]
451452
fn mutate() {
452453
let mut futures: MappedFutures<u32, Delay> = MappedFutures::new();
453-
insert_millis(&mut futures, 1, 50);
454-
insert_millis(&mut futures, 2, 100);
455-
insert_millis(&mut futures, 3, 150);
456-
insert_millis(&mut futures, 4, 200);
454+
insert_millis(&mut futures, 1, 500);
455+
insert_millis(&mut futures, 2, 1000);
456+
insert_millis(&mut futures, 3, 1500);
457+
insert_millis(&mut futures, 4, 2000);
457458

458459
assert_eq!(block_on(futures.next()).unwrap().0, 1);
459-
futures.get_mut(&3).unwrap().reset(Duration::from_millis(30));
460+
futures.get_mut(&3).unwrap().reset(Duration::from_millis(300));
460461
assert_eq!(block_on(futures.next()).unwrap().0, 3);
461462
assert_eq!(block_on(futures.next()).unwrap().0, 2);
462463
assert_eq!(block_on(futures.next()).unwrap().0, 4);

0 commit comments

Comments
 (0)