Skip to content

Commit f726b6f

Browse files
committedJan 30, 2025
simulation: made header validation blocking in praos-diffusion
No significant change in adoption time benchmark.
1 parent f7fd3a2 commit f726b6f

File tree

4 files changed

+30
-9
lines changed

4 files changed

+30
-9
lines changed
 

‎simulation/src/LeiosProtocol/Common.hs

+2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
module LeiosProtocol.Common (
1313
module PraosProtocol.Common,
14+
module TaskMultiQueue,
1415
RankingBlockHeader,
1516
RankingBlockBody (..),
1617
RankingBlock,
@@ -49,6 +50,7 @@ import GHC.Generics
4950
import GHC.Records
5051
import PraosProtocol.Common
5152
import SimTypes
53+
import TaskMultiQueue
5254

5355
{-
5456
Note [size of blocks/messages]: we add a `size` field to most

‎simulation/src/LeiosProtocol/Short/SimP2P.hs

+4-2
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,10 @@ exampleTrace2' rng0 leios p2pTopography@P2PNetwork{..} =
125125
, baseChain = Genesis
126126
, slotConfig
127127
, leios
128-
, processingQueueBound = 100
129-
, processingCores = fromMaybe undefined $ Map.lookup nodeId p2pNodeCores
128+
, processingQueueBound = defaultQueueBound processingCores
129+
, processingCores
130130
, nodeId
131131
, rng
132132
}
133+
where
134+
processingCores = fromMaybe undefined $ Map.lookup nodeId p2pNodeCores

‎simulation/src/PraosProtocol/PraosNode.hs

+18-7
Original file line numberDiff line numberDiff line change
@@ -161,11 +161,10 @@ setupPraosThreads tracer cfg queue st0 followers peers = do
161161
(ts, f) <- BlockFetch.setupValidatorThreads cfg st0.blockFetchControllerState queue
162162
let valHeader h = assert (blockInvariant h) $ do
163163
let !delay = cfg.headerValidationDelay h
164-
atomically $
165-
curry
166-
queue
167-
(CPUTask delay $ T.pack $ "ValidateHeader " ++ show (coerce @_ @Int $ blockHash h))
168-
(return ())
164+
curry
165+
(queueAndWait (atomically . queue))
166+
(CPUTask delay $ T.pack $ "ValidateHeader " ++ show (coerce @_ @Int $ blockHash h))
167+
(return ())
169168
(map Concurrently ts ++) <$> setupPraosThreads' tracer cfg valHeader f st0 followers peers
170169

171170
setupPraosThreads' ::
@@ -208,7 +207,7 @@ praosNode ::
208207
m [m ()]
209208
praosNode tracer cfg followers peers = do
210209
st0 <- PraosNodeState <$> newBlockFetchControllerState cfg.chain <*> pure Map.empty
211-
taskQueue <- atomically $ newTaskMultiQueue @() 100
210+
taskQueue <- atomically $ newTaskMultiQueue @() (defaultQueueBound cfg.processingCores)
212211
let queue = writeTMQueue taskQueue ()
213212
praosThreads <- setupPraosThreads tracer cfg.praosConfig queue st0 followers peers
214213
let cpuTasksProcessors = processCPUTasks cfg.processingCores (contramap PraosNodeEventCPU tracer) taskQueue
@@ -221,5 +220,17 @@ praosNode tracer cfg followers peers = do
221220
cfg.blockMarker
222221
st0.blockFetchControllerState.cpsVar
223222
(BlockFetch.addProducedBlock st0.blockFetchControllerState)
224-
(atomically . queue)
223+
(queueAndWait (atomically . queue))
225224
return $ cpuTasksProcessors : generationThread : map runConcurrently praosThreads
225+
226+
queueAndWait ::
227+
MonadSTM m =>
228+
((a, m ()) -> m ()) ->
229+
(a, m ()) ->
230+
m ()
231+
queueAndWait queue (d, m) = do
232+
sem <- newEmptyTMVarIO
233+
curry queue d $ do
234+
m
235+
atomically $ putTMVar sem ()
236+
atomically $ takeTMVar sem

‎simulation/src/TaskMultiQueue.hs

+6
Original file line numberDiff line numberDiff line change
@@ -81,3 +81,9 @@ processCPUTasks (Finite n) tracer queue = newBoundedWorkerPool n [taskSource l |
8181
m
8282
-- TODO: read from var and log exception.
8383
return $ Task action var
84+
85+
defaultQueueBound :: NumCores -> Natural
86+
defaultQueueBound processingCores = do
87+
fromIntegral $ case processingCores of
88+
Infinite -> 100
89+
Finite n -> n * 2

0 commit comments

Comments
 (0)
Please sign in to comment.