Skip to content

Commit 8d9ecc1

Browse files
committed
Updated test and ran fmt
1 parent 02d3be6 commit 8d9ecc1

File tree

4 files changed

+16
-17
lines changed

4 files changed

+16
-17
lines changed

src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ mod par_stream;
5252
pub use from_parallel_stream::FromParallelStream;
5353
pub use from_stream::{from_stream, FromStream};
5454
pub use into_parallel_stream::IntoParallelStream;
55-
pub use par_stream::{ForEach, Map, NextFuture, ParallelStream, Take, Any};
55+
pub use par_stream::{Any, ForEach, Map, NextFuture, ParallelStream, Take};
5656

5757
pub mod prelude;
5858
pub mod vec;

src/par_stream/any.rs

+12-14
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
use async_std::prelude::*;
21
use async_std::future::Future;
3-
use pin_project_lite::pin_project;
4-
use async_std::task::{self, Context, Poll};
2+
use async_std::prelude::*;
53
use async_std::sync::{self, Receiver, Sender};
4+
use async_std::task::{self, Context, Poll};
5+
use pin_project_lite::pin_project;
66

77
use std::pin::Pin;
88
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
@@ -59,13 +59,14 @@ impl Any {
5959
task::spawn(async move {
6060
// Execute the closure.
6161
let res = f(item).await;
62+
value.fetch_or(res, Ordering::SeqCst);
6263

6364
// Wake up the receiver if we know we're done.
6465
ref_count.fetch_sub(1, Ordering::SeqCst);
65-
if res {
66-
value.fetch_or(true, Ordering::SeqCst);
67-
sender.send(()).await;
68-
} else if exhausted.load(Ordering::SeqCst) && ref_count.load(Ordering::SeqCst) == 0 {
66+
if value.load(Ordering::SeqCst)
67+
|| (exhausted.load(Ordering::SeqCst)
68+
&& ref_count.load(Ordering::SeqCst) == 0)
69+
{
6970
sender.send(()).await;
7071
}
7172
});
@@ -91,13 +92,10 @@ impl Future for Any {
9192

9293
#[async_std::test]
9394
async fn smoke() {
94-
let s = async_std::stream::repeat(5usize);
95+
let s = async_std::stream::from_iter(vec![6, 9, 0, 7, 10]);
9596
let result = crate::from_stream(s)
96-
.take(3)
97-
.any(|n| async move {
98-
n * 2 < 9
99-
})
97+
.any(|n| async move { n * 2 < 9 })
10098
.await;
101-
102-
assert_eq!(result, false);
99+
100+
assert!(result);
103101
}

src/par_stream/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,17 @@ use std::pin::Pin;
55

66
use crate::FromParallelStream;
77

8+
pub use any::Any;
89
pub use for_each::ForEach;
910
pub use map::Map;
1011
pub use next::NextFuture;
1112
pub use take::Take;
12-
pub use any::Any;
1313

14+
mod any;
1415
mod for_each;
1516
mod map;
1617
mod next;
1718
mod take;
18-
mod any;
1919

2020
/// Parallel version of the standard `Stream` trait.
2121
pub trait ParallelStream: Sized + Send + Sync + Unpin + 'static {

tests/test.rs

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+

0 commit comments

Comments
 (0)