@@ -96,13 +96,17 @@ cfg_unstable! {
9696 use std:: time:: Duration ;
9797
9898 use crate :: future:: Future ;
99- use crate :: stream:: FromStream ;
100- use crate :: stream:: { Product , Sum } ;
99+ use crate :: stream:: into_stream :: IntoStream ;
100+ use crate :: stream:: { FromStream , Product , Sum } ;
101101
102102 pub use merge:: Merge ;
103+ pub use flatten:: Flatten ;
104+ pub use flat_map:: FlatMap ;
103105 pub use timeout:: { TimeoutError , Timeout } ;
104106
105107 mod merge;
108+ mod flatten;
109+ mod flat_map;
106110 mod timeout;
107111}
108112
@@ -563,6 +567,76 @@ extension_trait! {
563567 Filter :: new( self , predicate)
564568 }
565569
570+ #[ doc= r#"
571+ Creates an stream that works like map, but flattens nested structure.
572+
573+ # Examples
574+
575+ Basic usage:
576+
577+ ```
578+ # async_std::task::block_on(async {
579+
580+ use std::collections::VecDeque;
581+ use async_std::prelude::*;
582+ use async_std::stream::IntoStream;
583+
584+ let inner1: VecDeque<u8> = vec![1,2,3].into_iter().collect();
585+ let inner2: VecDeque<u8> = vec![4,5,6].into_iter().collect();
586+
587+ let s: VecDeque<_> = vec![inner1, inner2].into_iter().collect();
588+
589+ let v :Vec<_> = s.flat_map(|s| s.into_stream()).collect().await;
590+
591+ assert_eq!(v, vec![1,2,3,4,5,6]);
592+
593+ # });
594+ ```
595+ "# ]
596+ #[ cfg( feature = "unstable" ) ]
597+ #[ cfg_attr( feature = "docs" , doc( cfg( unstable) ) ) ]
598+ fn flat_map<U , F >( self , f: F ) -> FlatMap <Self , U , Self :: Item , F >
599+ where
600+ Self : Sized ,
601+ U : IntoStream ,
602+ F : FnMut ( Self :: Item ) -> U ,
603+ {
604+ FlatMap :: new( self , f)
605+ }
606+
607+ #[ doc = r#"
608+ Creates an stream that flattens nested structure.
609+
610+ # Examples
611+
612+ Basic usage:
613+
614+ ```
615+ # async_std::task::block_on(async {
616+
617+ use std::collections::VecDeque;
618+ use async_std::prelude::*;
619+
620+ let inner1: VecDeque<u8> = vec![1,2,3].into_iter().collect();
621+ let inner2: VecDeque<u8> = vec![4,5,6].into_iter().collect();
622+ let s: VecDeque<_> = vec![inner1, inner2].into_iter().collect();
623+
624+ let v: Vec<_> = s.flatten().collect().await;
625+
626+ assert_eq!(v, vec![1,2,3,4,5,6]);
627+
628+ # });
629+ "# ]
630+ #[ cfg( feature = "unstable" ) ]
631+ #[ cfg_attr( feature = "docs" , doc( cfg( unstable) ) ) ]
632+ fn flatten( self ) -> Flatten <Self , Self :: Item >
633+ where
634+ Self : Sized ,
635+ Self :: Item : IntoStream ,
636+ {
637+ Flatten :: new( self )
638+ }
639+
566640 #[ doc = r#"
567641 Both filters and maps a stream.
568642
0 commit comments