Skip to content

Commit edb0933

Browse files
committed
Implement stream combinators using async_stream_block and generators
1 parent debc474 commit edb0933

File tree

4 files changed

+103
-141
lines changed

4 files changed

+103
-141
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ edition = "2018"
1212

1313
[dependencies]
1414
pin-utils = "=0.1.0-alpha.4"
15+
futures-async-stream = "0.1.0-alpha.1"
1516

1617
[dependencies.futures]
1718
version = "=0.3.0-alpha.17"

src/future.rs

+11-23
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use futures::future::Future;
22
use futures::stream::Stream;
3+
use futures_async_stream::async_stream_block;
34

45
use core::task::{Poll, Context};
56

@@ -434,22 +435,13 @@ pub fn flatten_stream<Fut, St, T>(future: Fut) -> impl Stream<Item = T>
434435
where Fut: Future<Output = St>,
435436
St: Stream<Item = T>,
436437
{
437-
use crate::stream::next;
438-
futures::stream::unfold((Some(future), None), async move | (future, stream)| {
439-
match (future, stream) {
440-
(Some(future), None) => {
441-
let stream = future.await;
442-
let mut stream = Box::pin(stream);
443-
let item = next(&mut stream).await;
444-
item.map(|item| (item, (None, Some(stream))))
445-
},
446-
(None, Some(mut stream)) => {
447-
let item = next(&mut stream).await;
448-
item.map(|item| (item, (None, Some(stream))))
449-
},
450-
_ => unreachable!()
438+
async_stream_block! {
439+
let stream = future.await;
440+
#[for_await]
441+
for item in stream {
442+
yield item
451443
}
452-
})
444+
}
453445
}
454446

455447
/// Convert this future into a single element stream.
@@ -474,14 +466,10 @@ pub fn flatten_stream<Fut, St, T>(future: Fut) -> impl Stream<Item = T>
474466
pub fn into_stream<Fut>(future: Fut) -> impl Stream<Item = Fut::Output>
475467
where Fut: Future,
476468
{
477-
futures::stream::unfold(Some(future), async move |future| {
478-
if let Some(future) = future {
479-
let item = future.await;
480-
Some((item, (None)))
481-
} else {
482-
None
483-
}
484-
})
469+
async_stream_block! {
470+
let item = future.await;
471+
yield item
472+
}
485473
}
486474

487475
/// Creates a new future wrapping around a function returning [`Poll`](core::task::Poll).

src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#![feature(async_await, async_closure, gen_future, generators)]
1+
#![feature(async_await, gen_future, generators, proc_macro_hygiene)]
22

33
pub mod future;
44
pub mod stream;

src/stream.rs

+90-117
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
pub use futures::stream::Stream;
22
use futures::future::Future;
3+
use futures_async_stream::async_stream_block;
34

45
use core::pin::Pin;
56
use core::iter::IntoIterator;
@@ -97,11 +98,13 @@ pub fn map<St, U, F>(stream: St, f: F) -> impl Stream<Item = U>
9798
where St: Stream,
9899
F: FnMut(St::Item) -> U,
99100
{
100-
let stream = Box::pin(stream);
101-
unfold((stream, f), async move | (mut stream, mut f)| {
102-
let item = next(&mut stream).await;
103-
item.map(|item| (f(item), (stream, f)))
104-
})
101+
let mut f = f;
102+
async_stream_block! {
103+
#[for_await]
104+
for item in stream {
105+
yield f(item)
106+
}
107+
}
105108
}
106109

107110
/// Filters the values produced by this stream according to the provided
@@ -136,18 +139,15 @@ pub fn filter<St, Fut, F>(stream: St, f: F) -> impl Stream<Item = St::Item>
136139
F: FnMut(&St::Item) -> Fut,
137140
Fut: Future<Output = bool>
138141
{
139-
let stream = Box::pin(stream);
140-
unfold((stream, f), async move | (mut stream, mut f)| {
141-
while let Some(item) = next(&mut stream).await {
142-
let matched = f(&item).await;
143-
if matched {
144-
return Some((item, (stream, f)))
145-
} else {
146-
continue;
142+
let mut f = f;
143+
async_stream_block! {
144+
#[for_await]
145+
for item in stream {
146+
if f(&item).await {
147+
yield item
147148
}
148-
};
149-
None
150-
})
149+
}
150+
}
151151
}
152152

153153
/// Filters the values produced by this stream while simultaneously mapping
@@ -183,17 +183,15 @@ pub fn filter_map<St, Fut, F, U>(stream: St, f: F) -> impl Stream<Item = U>
183183
F: FnMut(St::Item) -> Fut,
184184
Fut: Future<Output = Option<U>>
185185
{
186-
let stream = Box::pin(stream);
187-
unfold((stream, f), async move | (mut stream, mut f)| {
188-
while let Some(item) = next(&mut stream).await {
186+
let mut f = f;
187+
async_stream_block! {
188+
#[for_await]
189+
for item in stream {
189190
if let Some(item) = f(item).await {
190-
return Some((item, (stream, f)))
191-
} else {
192-
continue;
191+
yield item
193192
}
194-
};
195-
None
196-
})
193+
}
194+
}
197195
}
198196

199197
/// Converts this stream into a future of `(next_item, tail_of_stream)`.
@@ -370,18 +368,18 @@ pub async fn for_each<St, Fut, F>(stream: St, f: F) -> ()
370368
pub fn take<St>(stream: St, n: u64) -> impl Stream<Item = St::Item>
371369
where St: Stream,
372370
{
373-
let stream = Box::pin(stream);
374-
unfold((stream, n), async move | (mut stream, n)| {
375-
if n == 0 {
376-
None
377-
} else {
378-
if let Some(item) = next(&mut stream).await {
379-
Some((item, (stream, n - 1)))
371+
let mut n = n;
372+
async_stream_block! {
373+
#[for_await]
374+
for item in stream {
375+
if n == 0 {
376+
break;
380377
} else {
381-
None
378+
n = n - 1;
379+
yield item
382380
}
383381
}
384-
})
382+
}
385383
}
386384

387385
/// Create a stream which produces the same item repeatedly.
@@ -435,27 +433,15 @@ pub fn flatten<St, SubSt, T>(stream: St) -> impl Stream<Item = T>
435433
where SubSt: Stream<Item = T>,
436434
St: Stream<Item = SubSt>,
437435
{
438-
let stream = Box::pin(stream);
439-
unfold((Some(stream), None), async move | (mut state_stream, mut state_substream)| {
440-
loop {
441-
if let Some(mut substream) = state_substream.take() {
442-
if let Some(item) = next(&mut substream).await {
443-
return Some((item, (state_stream, Some(substream))))
444-
} else {
445-
continue;
446-
}
447-
}
448-
if let Some(mut stream) = state_stream.take() {
449-
if let Some(substream) = next(&mut stream).await {
450-
let substream = Box::pin(substream);
451-
state_stream = Some(stream);
452-
state_substream = Some(substream);
453-
continue;
454-
}
436+
async_stream_block! {
437+
#[for_await]
438+
for substream in stream {
439+
#[for_await]
440+
for item in substream {
441+
yield item
455442
}
456-
return None;
457443
}
458-
})
444+
}
459445
}
460446

461447
/// Computes from this stream's items new items of a different type using
@@ -488,16 +474,14 @@ pub fn then<St, F, Fut>(stream: St, f: F) -> impl Stream<Item = St::Item>
488474
F: FnMut(St::Item) -> Fut,
489475
Fut: Future<Output = St::Item>
490476
{
491-
let stream = Box::pin(stream);
492-
unfold((stream, f), async move | (mut stream, mut f)| {
493-
let item = next(&mut stream).await;
494-
if let Some(item) = item {
477+
let mut f = f;
478+
async_stream_block! {
479+
#[for_await]
480+
for item in stream {
495481
let new_item = f(item).await;
496-
Some((new_item, (stream, f)))
497-
} else {
498-
None
482+
yield new_item
499483
}
500-
})
484+
}
501485
}
502486

503487
/// Creates a new stream which skips `n` items of the underlying stream.
@@ -522,22 +506,18 @@ pub fn then<St, F, Fut>(stream: St, f: F) -> impl Stream<Item = St::Item>
522506
pub fn skip<St>(stream: St, n: u64) -> impl Stream<Item = St::Item>
523507
where St: Stream,
524508
{
525-
let stream = Box::pin(stream);
526-
unfold((stream, n), async move | (mut stream, mut n)| {
527-
while n != 0 {
528-
if let Some(_) = next(&mut stream).await {
529-
n = n - 1;
530-
continue
509+
let mut n = n;
510+
async_stream_block! {
511+
#[for_await]
512+
for item in stream {
513+
if n == 0 {
514+
yield item
531515
} else {
532-
return None
516+
n = n - 1;
517+
continue;
533518
}
534519
}
535-
if let Some(item) = next(&mut stream).await {
536-
Some((item, (stream, 0)))
537-
} else {
538-
None
539-
}
540-
})
520+
}
541521
}
542522

543523
/// An adapter for zipping two streams together.
@@ -566,16 +546,18 @@ pub fn zip<St1, St2>(stream: St1, other: St2) -> impl Stream<Item = (St1::Item,
566546
where St1: Stream,
567547
St2: Stream,
568548
{
569-
let stream = Box::pin(stream);
570-
let other = Box::pin(other);
571-
unfold((stream, other), async move | (mut stream, mut other)| {
572-
let left = next(&mut stream).await;
573-
let right = next(&mut other).await;
574-
match (left, right) {
575-
(Some(left), Some(right)) => Some(((left, right), (stream, other))),
576-
_ => None
549+
let mut stream = Box::pin(stream);
550+
let mut other = Box::pin(other);
551+
async_stream_block! {
552+
loop {
553+
let left = next(&mut stream).await;
554+
let right = next(&mut other).await;
555+
match (left, right) {
556+
(Some(left), Some(right)) => yield (left, right),
557+
_ => break,
558+
}
577559
}
578-
})
560+
}
579561
}
580562

581563
/// Adapter for chaining two stream.
@@ -605,21 +587,16 @@ pub fn zip<St1, St2>(stream: St1, other: St2) -> impl Stream<Item = (St1::Item,
605587
pub fn chain<St>(stream: St, other: St) -> impl Stream<Item = St::Item>
606588
where St: Stream,
607589
{
608-
let stream = Box::pin(stream);
609-
let other = Box::pin(other);
610-
let start_with_first = true;
611-
unfold((stream, other, start_with_first), async move | (mut stream, mut other, start_with_first)| {
612-
if start_with_first {
613-
if let Some(item) = next(&mut stream).await {
614-
return Some((item, (stream, other, start_with_first)))
615-
}
590+
async_stream_block! {
591+
#[for_await]
592+
for item in stream {
593+
yield item
616594
}
617-
if let Some(item) = next(&mut other).await {
618-
Some((item, (stream, other, /* start_with_first */ false)))
619-
} else {
620-
None
595+
#[for_await]
596+
for item in other {
597+
yield item
621598
}
622-
})
599+
}
623600
}
624601

625602
/// Take elements from this stream while the provided asynchronous predicate
@@ -649,18 +626,17 @@ pub fn take_while<St, F, Fut>(stream: St, f: F) -> impl Stream<Item = St::Item>
649626
F: FnMut(&St::Item) -> Fut,
650627
Fut: Future<Output = bool>,
651628
{
652-
let stream = Box::pin(stream);
653-
unfold((stream, f), async move | (mut stream, mut f)| {
654-
if let Some(item) = next(&mut stream).await {
629+
let mut f = f;
630+
async_stream_block! {
631+
#[for_await]
632+
for item in stream {
655633
if f(&item).await {
656-
Some((item, (stream, f)))
634+
yield item
657635
} else {
658-
None
636+
break;
659637
}
660-
} else {
661-
None
662638
}
663-
})
639+
}
664640
}
665641

666642
/// Skip elements on this stream while the provided asynchronous predicate
@@ -690,26 +666,23 @@ pub fn skip_while<St, F, Fut>(stream: St, f: F) -> impl Stream<Item = St::Item>
690666
F: FnMut(&St::Item) -> Fut,
691667
Fut: Future<Output = bool>,
692668
{
693-
let stream = Box::pin(stream);
694-
let should_skip = true;
695-
unfold((stream, f, should_skip), async move | (mut stream, mut f, should_skip)| {
696-
while should_skip {
697-
if let Some(item) = next(&mut stream).await {
669+
let mut f = f;
670+
let mut should_skip = true;
671+
async_stream_block! {
672+
#[for_await]
673+
for item in stream {
674+
if should_skip {
698675
if f(&item).await {
699676
continue;
700677
} else {
701-
return Some((item, (stream, f, /* should_skip */ false)))
678+
should_skip = false;
679+
yield item
702680
}
703681
} else {
704-
return None
682+
yield item
705683
}
706684
}
707-
if let Some(item) = next(&mut stream).await {
708-
Some((item, (stream, f, /* should_skip */ false)))
709-
} else {
710-
None
711-
}
712-
})
685+
}
713686
}
714687

715688
/// Execute an accumulating asynchronous computation over a stream,

0 commit comments

Comments
 (0)