9
9
{-# LANGUAGE TypeApplications #-}
10
10
{-# LANGUAGE TypeFamilies #-}
11
11
{-# LANGUAGE ViewPatterns #-}
12
+ {-# LANGUAGE NoFieldSelectors #-}
12
13
13
14
module LeiosProtocol.Short.Node where
14
15
@@ -22,6 +23,7 @@ import Control.Monad.Class.MonadAsync
22
23
import Control.Monad.Class.MonadFork
23
24
import Control.Monad.Class.MonadThrow
24
25
import Control.Tracer
26
+ import Data.Bifunctor
25
27
import Data.Coerce (coerce )
26
28
import Data.Foldable (forM_ )
27
29
import Data.Ix (Ix )
@@ -121,8 +123,8 @@ data LeiosNodeTask
121
123
deriving (Eq , Ord , Ix , Bounded , Show )
122
124
123
125
type RelayIBState = RelayConsumerSharedState InputBlockId InputBlockHeader InputBlockBody
124
- type RelayEBState = RelayConsumerSharedState EndorseBlockId EndorseBlockId EndorseBlock
125
- type RelayVoteState = RelayConsumerSharedState VoteId VoteId VoteMsg
126
+ type RelayEBState = RelayConsumerSharedState EndorseBlockId ( RelayHeader EndorseBlockId ) EndorseBlock
127
+ type RelayVoteState = RelayConsumerSharedState VoteId ( RelayHeader VoteId ) VoteMsg
126
128
127
129
data LedgerState = LedgerState
128
130
@@ -136,9 +138,15 @@ data ValidationRequest m
136
138
--- Messages
137
139
--------------------------------------------------------------
138
140
141
+ data RelayHeader id = RelayHeader { id :: ! id , slot :: ! SlotNo }
142
+ deriving (Show )
143
+
144
+ instance MessageSize id => MessageSize (RelayHeader id ) where
145
+ messageSizeBytes (RelayHeader x y) = messageSizeBytes x + messageSizeBytes y
146
+
139
147
type RelayIBMessage = RelayMessage InputBlockId InputBlockHeader InputBlockBody
140
- type RelayEBMessage = RelayMessage EndorseBlockId EndorseBlockId EndorseBlock
141
- type RelayVoteMessage = RelayMessage VoteId VoteId VoteMsg
148
+ type RelayEBMessage = RelayMessage EndorseBlockId ( RelayHeader EndorseBlockId ) EndorseBlock
149
+ type RelayVoteMessage = RelayMessage VoteId ( RelayHeader VoteId ) VoteMsg
142
150
type PraosMessage = PraosNode. PraosMessage RankingBlockBody
143
151
144
152
data LeiosMessage
@@ -166,14 +174,14 @@ instance MuxBundle Leios where
166
174
type MuxMsg Leios = LeiosMessage
167
175
toFromMuxMsgBundle =
168
176
Leios
169
- { protocolIB = ToFromMuxMsg RelayIB fromRelayIB
170
- , protocolEB = ToFromMuxMsg RelayEB fromRelayEB
171
- , protocolVote = ToFromMuxMsg RelayVote fromRelayVote
177
+ { protocolIB = ToFromMuxMsg RelayIB ( . fromRelayIB)
178
+ , protocolEB = ToFromMuxMsg RelayEB ( . fromRelayEB)
179
+ , protocolVote = ToFromMuxMsg RelayVote ( . fromRelayVote)
172
180
, protocolPraos = case toFromMuxMsgBundle @ (PraosNode. Praos RankingBlockBody ) of
173
181
PraosNode. Praos a b -> PraosNode. Praos (p >>> a) (p >>> b)
174
182
}
175
183
where
176
- p = ToFromMuxMsg PraosMsg fromPraosMsg
184
+ p = ToFromMuxMsg PraosMsg ( . fromPraosMsg)
177
185
178
186
traverseMuxBundle f (Leios a b c d) = Leios <$> f a <*> f b <*> f c <*> traverseMuxBundle f d
179
187
@@ -230,35 +238,37 @@ relayEBConfig ::
230
238
Tracer m LeiosNodeEvent ->
231
239
LeiosNodeConfig ->
232
240
SubmitBlocks m EndorseBlockId EndorseBlock ->
233
- RelayConsumerConfig EndorseBlockId EndorseBlockId EndorseBlock m
241
+ RelayConsumerConfig EndorseBlockId ( RelayHeader EndorseBlockId ) EndorseBlock m
234
242
relayEBConfig _tracer cfg submitBlocks =
235
243
RelayConsumerConfig
236
244
{ relay = RelayConfig {maxWindowSize = 100 }
237
- , headerId = id
245
+ , headerId = ( . id )
238
246
, validateHeaders = const $ return ()
239
- , prioritize = prioritize cfg. leios. ebDiffusionStrategy $ error " FFD not supported for endorse blocks. "
247
+ , prioritize = prioritize cfg. leios. ebDiffusionStrategy ( . slot)
240
248
, submitPolicy = SubmitAll
241
249
, maxHeadersToRequest = 100
242
250
, maxBodiesToRequest = 1 -- should we chunk bodies here?
243
- , submitBlocks
251
+ , submitBlocks = \ hbs t k ->
252
+ submitBlocks (map (first (. id )) hbs) t (k . map (\ (i, b) -> (RelayHeader i b. slot, b)))
244
253
}
245
254
246
255
relayVoteConfig ::
247
256
MonadDelay m =>
248
257
Tracer m LeiosNodeEvent ->
249
258
LeiosNodeConfig ->
250
259
SubmitBlocks m VoteId VoteMsg ->
251
- RelayConsumerConfig VoteId VoteId VoteMsg m
260
+ RelayConsumerConfig VoteId ( RelayHeader VoteId ) VoteMsg m
252
261
relayVoteConfig _tracer cfg submitBlocks =
253
262
RelayConsumerConfig
254
263
{ relay = RelayConfig {maxWindowSize = 100 }
255
- , headerId = id
264
+ , headerId = ( . id )
256
265
, validateHeaders = const $ return ()
257
- , prioritize = prioritize cfg. leios. voteDiffusionStrategy $ error " FFD not supported for vote bundles. "
266
+ , prioritize = prioritize cfg. leios. voteDiffusionStrategy ( . slot)
258
267
, submitPolicy = SubmitAll
259
268
, maxHeadersToRequest = 100
260
269
, maxBodiesToRequest = 1 -- should we chunk bodies here?
261
- , submitBlocks
270
+ , submitBlocks = \ hbs t k ->
271
+ submitBlocks (map (first (. id )) hbs) t (k . map (\ (i, b) -> (RelayHeader i b. slot, b)))
262
272
}
263
273
264
274
queueAndWait :: (MonadSTM m , MonadDelay m ) => LeiosNodeState m -> LeiosNodeTask -> [CPUTask ] -> m ()
@@ -337,29 +347,29 @@ leiosNode tracer cfg followers peers = do
337
347
valHeaderRB
338
348
submitRB
339
349
praosState
340
- (map protocolPraos followers)
341
- (map protocolPraos peers)
350
+ (map ( . protocolPraos) followers)
351
+ (map ( . protocolPraos) peers)
342
352
343
353
ibThreads <-
344
354
setupRelay
345
355
(relayIBConfig tracer cfg valHeaderIB submitIB)
346
356
relayIBState
347
- (map protocolIB followers)
348
- (map protocolIB peers)
357
+ (map ( . protocolIB) followers)
358
+ (map ( . protocolIB) peers)
349
359
350
360
ebThreads <-
351
361
setupRelay
352
362
(relayEBConfig tracer cfg submitEB)
353
363
relayEBState
354
- (map protocolEB followers)
355
- (map protocolEB peers)
364
+ (map ( . protocolEB) followers)
365
+ (map ( . protocolEB) peers)
356
366
357
367
voteThreads <-
358
368
setupRelay
359
369
(relayVoteConfig tracer cfg submitVote)
360
370
relayVoteState
361
- (map protocolVote followers)
362
- (map protocolVote peers)
371
+ (map ( . protocolVote) followers)
372
+ (map ( . protocolVote) peers)
363
373
364
374
let processWaitingForRB =
365
375
processWaiting'
@@ -554,10 +564,10 @@ generator tracer cfg st = do
554
564
atomically $ modifyTVar' st. relayIBState. relayBufferVar (RB. snoc ib. header. id (ib. header, ib. body))
555
565
traceWith tracer (LeiosNodeEvent Generate (EventIB ib))
556
566
SomeAction Generate. Endorse eb -> (GenEB ,) $ do
557
- atomically $ modifyTVar' st. relayEBState. relayBufferVar (RB. snoc eb. id (eb. id , eb))
567
+ atomically $ modifyTVar' st. relayEBState. relayBufferVar (RB. snoc eb. id (RelayHeader eb. id eb . slot , eb))
558
568
traceWith tracer (LeiosNodeEvent Generate (EventEB eb))
559
569
SomeAction Generate. Vote v -> (GenVote ,) $ do
560
- atomically $ modifyTVar' st. relayVoteState. relayBufferVar (RB. snoc v. id (v. id , v))
570
+ atomically $ modifyTVar' st. relayVoteState. relayBufferVar (RB. snoc v. id (RelayHeader v. id v . slot , v))
561
571
traceWith tracer (LeiosNodeEvent Generate (EventVote v))
562
572
let LeiosNodeConfig {.. } = cfg
563
573
leiosBlockGenerator $ LeiosGeneratorConfig {submit = mapM_ submitOne, .. }
@@ -620,6 +630,9 @@ mkBuffersView cfg st = BuffersView{..}
620
630
621
631
mkSchedule :: MonadSTM m => LeiosNodeConfig -> m (SlotNo -> m [(SomeRole , Word64 )])
622
632
mkSchedule cfg = do
633
+ -- For each pipeline, we want to deploy all our votes in a single
634
+ -- message to cut down on traffic, so we pick one slot out of each
635
+ -- active voting range (they are assumed not to overlap).
623
636
votingSlots <- newTVarIO $ pickFromRanges rng1 $ votingRanges cfg. leios
624
637
mkScheduler rng2 (rates votingSlots)
625
638
where
0 commit comments