@@ -9,15 +9,13 @@ use crate::progress::operate::SharedProgress;
99use crate :: progress:: frontier:: { Antichain , MutableAntichain } ;
1010
1111use crate :: Container ;
12- use crate :: container:: ContainerBuilder ;
1312use crate :: dataflow:: { Scope , StreamCore } ;
14- use crate :: dataflow:: channels:: pushers:: Tee ;
1513use crate :: dataflow:: channels:: pushers:: Counter as PushCounter ;
16- use crate :: dataflow:: channels:: pushers:: buffer :: Buffer as PushBuffer ;
14+ use crate :: dataflow:: channels:: pushers;
1715use crate :: dataflow:: channels:: pact:: ParallelizationContract ;
1816use crate :: dataflow:: channels:: pullers:: Counter as PullCounter ;
1917use crate :: dataflow:: operators:: capability:: Capability ;
20- use crate :: dataflow:: operators:: generic:: handles:: { InputHandleCore , new_input_handle, OutputWrapper } ;
18+ use crate :: dataflow:: operators:: generic:: handles:: { InputHandleCore , new_input_handle} ;
2119use crate :: dataflow:: operators:: generic:: operator_info:: OperatorInfo ;
2220use crate :: dataflow:: operators:: generic:: builder_raw:: OperatorShape ;
2321use crate :: progress:: operate:: PortConnectivity ;
@@ -90,7 +88,7 @@ impl<G: Scope> OperatorBuilder<G> {
9088 }
9189
9290 /// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
93- pub fn new_output < CB : ContainerBuilder > ( & mut self ) -> ( OutputWrapper < G :: Timestamp , CB , Tee < G :: Timestamp , CB :: Container > > , StreamCore < G , CB :: Container > ) {
91+ pub fn new_output < C : Container > ( & mut self ) -> ( pushers :: Output < G :: Timestamp , C > , StreamCore < G , C > ) {
9492 let connection = ( 0 ..self . builder . shape ( ) . inputs ( ) ) . map ( |i| ( i, Antichain :: from_elem ( Default :: default ( ) ) ) ) ;
9593 self . new_output_connection ( connection)
9694 }
@@ -103,9 +101,10 @@ impl<G: Scope> OperatorBuilder<G> {
103101 ///
104102 /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty
105103 /// antichain indicating that there is no connection from the input to the output.
106- pub fn new_output_connection < CB : ContainerBuilder , I > ( & mut self , connection : I ) -> (
107- OutputWrapper < G :: Timestamp , CB , Tee < G :: Timestamp , CB :: Container > > ,
108- StreamCore < G , CB :: Container >
104+ pub fn new_output_connection < C : Container , I > ( & mut self , connection : I ) -> (
105+ // OutputWrapper<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
106+ pushers:: Output < G :: Timestamp , C > ,
107+ StreamCore < G , C > ,
109108 )
110109 where
111110 I : IntoIterator < Item = ( usize , Antichain < <G :: Timestamp as Timestamp >:: Summary > ) > + Clone ,
@@ -116,14 +115,16 @@ impl<G: Scope> OperatorBuilder<G> {
116115 let internal = Rc :: new ( RefCell :: new ( ChangeBatch :: new ( ) ) ) ;
117116 self . internal . borrow_mut ( ) . push ( Rc :: clone ( & internal) ) ;
118117
119- let mut buffer = PushBuffer :: new ( PushCounter :: new ( tee) ) ;
120- self . produced . push ( Rc :: clone ( buffer . inner ( ) . produced ( ) ) ) ;
118+ let counter = PushCounter :: new ( tee) ;
119+ self . produced . push ( Rc :: clone ( counter . produced ( ) ) ) ;
121120
122121 for ( input, entry) in connection {
123122 self . summaries [ input] . borrow_mut ( ) . add_port ( new_output, entry) ;
124123 }
125124
126- ( OutputWrapper :: new ( buffer, internal, new_output) , stream)
125+ let output = pushers:: Output :: new ( counter, internal, new_output) ;
126+ ( output, stream)
127+ // (OutputWrapper::new(buffer, internal, new_output), stream)
127128 }
128129
129130 /// Creates an operator implementation from supplied logic constructor.
@@ -222,7 +223,7 @@ impl<G: Scope> OperatorBuilder<G> {
222223
223224#[ cfg( test) ]
224225mod tests {
225- use crate :: container :: CapacityContainerBuilder ;
226+ use crate :: dataflow :: operators :: generic :: OutputBuilder ;
226227
227228 #[ test]
228229 #[ should_panic]
@@ -238,8 +239,10 @@ mod tests {
238239 let mut builder = OperatorBuilder :: new ( "Failure" . to_owned ( ) , scope. clone ( ) ) ;
239240
240241 // let mut input = builder.new_input(stream, Pipeline);
241- let ( mut output1, _stream1) = builder. new_output :: < CapacityContainerBuilder < Vec < ( ) > > > ( ) ;
242- let ( mut output2, _stream2) = builder. new_output :: < CapacityContainerBuilder < Vec < ( ) > > > ( ) ;
242+ let ( output1, _stream1) = builder. new_output :: < Vec < ( ) > > ( ) ;
243+ let ( output2, _stream2) = builder. new_output :: < Vec < ( ) > > ( ) ;
244+ let mut output1 = OutputBuilder :: from ( output1) ;
245+ let mut output2 = OutputBuilder :: from ( output2) ;
243246
244247 builder. build ( move |capabilities| {
245248 move |_frontiers| {
@@ -268,8 +271,10 @@ mod tests {
268271 let mut builder = OperatorBuilder :: new ( "Failure" . to_owned ( ) , scope. clone ( ) ) ;
269272
270273 // let mut input = builder.new_input(stream, Pipeline);
271- let ( mut output1, _stream1) = builder. new_output :: < CapacityContainerBuilder < Vec < ( ) > > > ( ) ;
272- let ( mut output2, _stream2) = builder. new_output :: < CapacityContainerBuilder < Vec < ( ) > > > ( ) ;
274+ let ( output1, _stream1) = builder. new_output :: < Vec < ( ) > > ( ) ;
275+ let ( output2, _stream2) = builder. new_output :: < Vec < ( ) > > ( ) ;
276+ let mut output1 = OutputBuilder :: from ( output1) ;
277+ let mut output2 = OutputBuilder :: from ( output2) ;
273278
274279 builder. build ( move |mut capabilities| {
275280 move |_frontiers| {
0 commit comments