@@ -389,13 +389,13 @@ let rec consumer_resume_cell t ~success ?in_transition cell =
389389 if Atomic. compare_and_set cell old Finished then success req
390390 else consumer_resume_cell t ~success ?in_transition cell
391391
392- let take_suspend_select ~enqueue ~ctx ~cancel_all t loc finished =
392+ let take_suspend_select ~enqueue ~ctx ~cancel_all t f loc finished =
393393 let Short cell | Long (_, cell) = loc in
394394 let kc v = begin
395395 if (Atomic. compare_and_set finished false true ) then (
396396 cancel_all () ;
397397 (* deliver value *)
398- enqueue (Ok v );
398+ enqueue (Ok (f v) );
399399 true
400400 ) else (
401401 (* reject value, let producer try again *)
@@ -447,12 +447,12 @@ let take (t : _ t) =
447447 take_suspend t (Long (Q. next_suspend t.consumers))
448448 )
449449
450- let select_of_many (type a ) (ts : a t list ) =
450+ let select_of_many (type a b ) (ts : ( a t * (a -> b) ) list ) =
451451 let finished = Atomic. make false in
452452 let cancel_fns = ref [] in
453453 let add_cancel_fn fn = cancel_fns := (fn :: ! cancel_fns) in
454454 let cancel_all () = List. iter (fun fn -> fn () ) ! cancel_fns in
455- let wait ctx enqueue (t : a t ) = begin
455+ let wait ctx enqueue (( t , f ): ( a t * ( a -> b )) ) = begin
456456 if (Atomic. fetch_and_add t.balance (- 1 )) > 0 then (
457457 (* have item, can cancel remaining stream waiters*)
458458 if Atomic. compare_and_set finished false true then (
@@ -464,14 +464,14 @@ let select_of_many (type a) (ts: a t list) =
464464 let v = consumer_resume_cell t cell
465465 ~success: (fun it -> it.kp (Ok true ); it.v)
466466 ?in_transition:None in
467- enqueue (Ok v )
467+ enqueue (Ok (f v) )
468468 ) else (
469469 (* restore old balance, because another stream was ready first. *)
470470 ignore (Atomic. fetch_and_add t.balance (+ 1 ))
471471 )
472472 ) else (
473473 let cell = Long (Q. next_suspend t.consumers) in
474- let cancel_fn = take_suspend_select ~enqueue ~ctx ~cancel_all t cell finished in
474+ let cancel_fn = take_suspend_select ~enqueue ~ctx ~cancel_all t f cell finished in
475475 add_cancel_fn cancel_fn
476476 )
477477 end in
0 commit comments