@@ -711,23 +711,11 @@ relayConsumerPipelined config sst =
711
711
-- It's important not to pipeline more requests for headers when we
712
712
-- have no bodies to ask for, since (with no other guard) this will
713
713
-- put us into a busy-polling loop.
714
- --
715
- -- Question (Andrea) : Here we have a branching in the local state:
716
- -- - `rb` continues using the state that tracks we have requested more bodies
717
- --
718
- -- - `handleResponse lst'` instead doesn't know about
719
- -- more bodies requested but shrinks the pending
720
- -- requests (which makes sense, because it's the one
721
- -- used when a response is received.)
722
- --
723
- -- Both of those eventually go back to idle, but there
724
- -- doesn't seem to be a mechanism to reconcile the
725
- -- states, nor to stop one of the branches?
726
- --
727
- -- Aside from the state branching, don't you get more and
728
- -- more `RelayConsumer id header body n 'StIdle m ()` continuations to
729
- -- process (probably retaining data we no longer need?) this way?
730
714
let lst' = lst0{pendingRequests = pendingRequests'}
715
+
716
+ -- Note: the peer will proceed with only one of the two
717
+ -- arguments of Collect, depending on whether responses are
718
+ -- available or not.
731
719
return $ TS. Collect (Just rb) (handleResponse lst')
732
720
Left lst -> do
733
721
-- In this case there is nothing else to do so we block until we
@@ -751,56 +739,59 @@ relayConsumerPipelined config sst =
751
739
buffer' = lst. buffer <> Map. map (const Nothing ) ignored
752
740
(idsToAcknowledge, window') =
753
741
Seq. spanl (`Map.member` buffer') lst. window
742
+ new_buffer =
743
+ forceElemsToWHNF $ -- TODO: Do we need this?
744
+ Map. restrictKeys buffer' (Set. fromList $ Foldable. toList window')
754
745
in
755
- assertRelayConsumerLocalStateInvariant $
756
- lst
757
- { buffer =
758
- forceElemsToWHNF $ -- TODO: Do we need this?
759
- Map. restrictKeys buffer' (Set. fromList $ Foldable. toList window')
760
- , available = available'
761
- , window = window'
762
- , pendingShrink = lst. pendingShrink + fromIntegral (Seq. length idsToAcknowledge)
763
- }
746
+ assert (all isNothing $ Map. elems $ Map. difference buffer' new_buffer) $
747
+ assertRelayConsumerLocalStateInvariant $
748
+ lst
749
+ { buffer = new_buffer
750
+ , available = available'
751
+ , window = window'
752
+ , pendingShrink = lst. pendingShrink + fromIntegral (Seq. length idsToAcknowledge)
753
+ }
764
754
765
755
tryRequestBodies ::
766
756
forall (n :: N ).
767
757
RelayConsumerLocalState id header body n ->
768
758
m (Either (RelayConsumerLocalState id header body n ) (RelayConsumer id header body n 'StIdle m () ))
769
759
tryRequestBodies lst0 = do
770
- isIgnored <- config. shouldIgnore
771
- atomically $ do
772
- -- New headers are filtered before becoming available, but we have
773
- -- to filter `lst.available` again in the same STM tx that sets them as
774
- -- `inFlight`.
775
- inFlight <- readTVar sst. inFlightVar
776
- let ! lst =
777
- dropFromLST
778
- ( \ k hd ->
779
- k `Set.member` inFlight
780
- || isIgnored hd
781
- )
782
- lst0
783
-
784
- let hdrsToRequest =
785
- take (fromIntegral config. maxBodiesToRequest) $
786
- config. prioritize lst. available (mapMaybe (`Map.lookup` lst. available) $ Foldable. toList $ lst. window)
787
- let idsToRequest = map config. headerId hdrsToRequest
788
- let idsToRequestSet = Set. fromList idsToRequest
789
- if Set. null idsToRequestSet
790
- then return (Left lst)
791
- else do
792
- let available2 = Map. withoutKeys lst. available idsToRequestSet
793
- modifyTVar' sst. inFlightVar $ Set. union idsToRequestSet
794
- let ! lst2 = lst{pendingRequests = Succ lst. pendingRequests, available = available2}
795
- return $
796
- Right $
797
- TS. YieldPipelined
798
- (MsgRequestBodies idsToRequest)
799
- ( TS. ReceiverAwait $ \ case
800
- MsgRespondBodies bodies ->
801
- TS. ReceiverDone (CollectBodies hdrsToRequest bodies)
802
- )
803
- (requestHeadersNonBlocking lst2)
760
+ if (min (Map. size lst0. available) (fromIntegral config. maxBodiesToRequest)) == 0
761
+ then return (Left lst0)
762
+ else return . Right . TS. Effect $ do
763
+ isIgnored <- config. shouldIgnore
764
+ atomically $ do
765
+ -- New headers are filtered before becoming available, but we have
766
+ -- to filter `lst.available` again in the same STM tx that sets them as
767
+ -- `inFlight`.
768
+ inFlight <- readTVar sst. inFlightVar
769
+ let ! lst =
770
+ dropFromLST
771
+ ( \ k hd ->
772
+ k `Set.member` inFlight
773
+ || isIgnored hd
774
+ )
775
+ lst0
776
+ let hdrsToRequest =
777
+ take (fromIntegral config. maxBodiesToRequest) $
778
+ config. prioritize lst. available (mapMaybe (`Map.lookup` lst. available) $ Foldable. toList $ lst. window)
779
+ let idsToRequest = map config. headerId hdrsToRequest
780
+ let idsToRequestSet = Set. fromList idsToRequest
781
+ if Set. null idsToRequestSet
782
+ then return (idle lst)
783
+ else do
784
+ let available2 = Map. withoutKeys lst. available idsToRequestSet
785
+ modifyTVar' sst. inFlightVar $ Set. union idsToRequestSet
786
+ let ! lst2 = lst{pendingRequests = Succ lst. pendingRequests, available = available2}
787
+ return $
788
+ TS. YieldPipelined
789
+ (MsgRequestBodies idsToRequest)
790
+ ( TS. ReceiverAwait $ \ case
791
+ MsgRespondBodies bodies ->
792
+ TS. ReceiverDone (CollectBodies hdrsToRequest bodies)
793
+ )
794
+ (requestHeadersNonBlocking lst2)
804
795
805
796
windowAdjust ::
806
797
forall (n :: N ).
0 commit comments