11
11
{-# LANGUAGE ScopedTypeVariables #-}
12
12
{-# LANGUAGE TypeApplications #-}
13
13
{-# LANGUAGE TypeFamilies #-}
14
+ {-# LANGUAGE TypeOperators #-}
14
15
15
16
module Ouroboros.Consensus.NodeKernel (
16
17
-- * Node kernel
@@ -45,6 +46,7 @@ import Data.List.NonEmpty (NonEmpty)
45
46
import Data.Map.Strict (Map )
46
47
import Data.Maybe (isJust , mapMaybe )
47
48
import Data.Proxy
49
+ import qualified Data.Set as Set
48
50
import qualified Data.Text as Text
49
51
import Data.Void (Void )
50
52
import Ouroboros.Consensus.Block hiding (blockMatchesHeader )
@@ -94,6 +96,7 @@ import Ouroboros.Network.AnchoredFragment (AnchoredFragment,
94
96
import qualified Ouroboros.Network.AnchoredFragment as AF
95
97
import Ouroboros.Network.Block (castTip , tipFromHeader )
96
98
import Ouroboros.Network.BlockFetch
99
+ import qualified Ouroboros.Network.BlockFetch.ClientState as BF
97
100
import Ouroboros.Network.Diffusion (PublicPeerSelectionState )
98
101
import Ouroboros.Network.NodeToNode (ConnectionId ,
99
102
MiniProtocolParameters (.. ))
@@ -131,7 +134,7 @@ data NodeKernel m addrNTN addrNTC blk = NodeKernel {
131
134
, getTopLevelConfig :: TopLevelConfig blk
132
135
133
136
-- | The fetch client registry, used for the block fetch clients.
134
- , getFetchClientRegistry :: FetchClientRegistry (ConnectionId addrNTN ) (Header blk ) blk m
137
+ , getFetchClientRegistry :: FetchClientRegistry (ConnectionId addrNTN ) (HeaderWithTime blk ) blk m
135
138
136
139
-- | The fetch mode, used by diffusion.
137
140
--
@@ -254,8 +257,8 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
254
257
, GSM. equivalent = (==) `on` (AF. headPoint . fst )
255
258
, GSM. getChainSyncStates = fmap cschState <$> readTVar varChainSyncHandles
256
259
, GSM. getCurrentSelection = do
257
- headers <- ChainDB. getCurrentChain chainDB
258
- extLedgerState <- ChainDB. getCurrentLedger chainDB
260
+ headers <- ChainDB. getCurrentChainWithTime chainDB
261
+ extLedgerState <- ChainDB. getCurrentLedger chainDB
259
262
return (headers, ledgerState extLedgerState)
260
263
, GSM. minCaughtUpDuration = gsmMinCaughtUpDuration
261
264
, GSM. setCaughtUpPersistentMark = \ upd ->
@@ -309,8 +312,8 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
309
312
-- 'addFetchedBlock' whenever a new block is downloaded.
310
313
void $ forkLinkedThread registry " NodeKernel.blockFetchLogic" $
311
314
blockFetchLogic
312
- (blockFetchDecisionTracer tracers)
313
- (blockFetchClientTracer tracers)
315
+ (contramap ( map ( fmap ( fmap ( map castPoint)))) $ blockFetchDecisionTracer tracers)
316
+ (contramap ( fmap castTraceFetchClientState) $ blockFetchClientTracer tracers)
314
317
blockFetchInterface
315
318
fetchClientRegistry
316
319
blockFetchConfiguration
@@ -344,6 +347,45 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
344
347
blockForging' <- traverse (forkBlockForging st) blockForging
345
348
go blockForging'
346
349
350
+ castTraceFetchClientState ::
351
+ forall blk . HasHeader (Header blk )
352
+ => TraceFetchClientState (HeaderWithTime blk ) -> TraceFetchClientState (Header blk )
353
+ castTraceFetchClientState = mapTraceFetchClientState hwtHeader
354
+
355
+ mapTraceFetchClientState ::
356
+ (HeaderHash h1 ~ HeaderHash h2 , HasHeader h2 )
357
+ => (h1 -> h2 ) -> TraceFetchClientState h1 -> TraceFetchClientState h2
358
+ mapTraceFetchClientState fheader = \ case
359
+ AddedFetchRequest request inflight inflightLimits status -> AddedFetchRequest (frequest request) (finflight inflight) inflightLimits (fstatus status)
360
+
361
+ AcknowledgedFetchRequest request -> AcknowledgedFetchRequest (frequest request)
362
+
363
+ SendFetchRequest headers gsv -> SendFetchRequest (AF. mapAnchoredFragment fheader headers) gsv
364
+
365
+ StartedFetchBatch range inflight inflightLimits status -> StartedFetchBatch (frange range) (finflight inflight) inflightLimits (fstatus status)
366
+ CompletedBlockFetch point inflight inflightLimits status time size -> CompletedBlockFetch (fpoint point) (finflight inflight) inflightLimits (fstatus status) time size
367
+ CompletedFetchBatch range inflight inflightLimits status -> CompletedFetchBatch (frange range) (finflight inflight) inflightLimits (fstatus status)
368
+ RejectedFetchBatch range inflight inflightLimits status -> RejectedFetchBatch (frange range) (finflight inflight) inflightLimits (fstatus status)
369
+
370
+ ClientTerminating i -> ClientTerminating i
371
+ where
372
+ frequest (BF. FetchRequest headers) = BF. FetchRequest $ map (AF. mapAnchoredFragment fheader) headers
373
+
374
+ finflight inflight = inflight { BF. peerFetchBlocksInFlight = fpoints (BF. peerFetchBlocksInFlight inflight) }
375
+
376
+ fstatus = \ case
377
+ BF. PeerFetchStatusShutdown -> BF. PeerFetchStatusShutdown
378
+ BF. PeerFetchStatusStarting -> BF. PeerFetchStatusStarting
379
+ BF. PeerFetchStatusAberrant -> BF. PeerFetchStatusAberrant
380
+ BF. PeerFetchStatusBusy -> BF. PeerFetchStatusBusy
381
+ BF. PeerFetchStatusReady points idle -> BF. PeerFetchStatusReady (fpoints points) idle
382
+
383
+ fpoints = Set. mapMonotonic fpoint
384
+
385
+ frange (BF. ChainRange p1 p2) = BF. ChainRange (fpoint p1) (fpoint p2)
386
+
387
+ fpoint = castPoint
388
+
347
389
{- ------------------------------------------------------------------------------
348
390
Internal node components
349
391
-------------------------------------------------------------------------------}
@@ -354,8 +396,8 @@ data InternalState m addrNTN addrNTC blk = IS {
354
396
, registry :: ResourceRegistry m
355
397
, btime :: BlockchainTime m
356
398
, chainDB :: ChainDB m blk
357
- , blockFetchInterface :: BlockFetchConsensusInterface (ConnectionId addrNTN ) (Header blk ) blk m
358
- , fetchClientRegistry :: FetchClientRegistry (ConnectionId addrNTN ) (Header blk ) blk m
399
+ , blockFetchInterface :: BlockFetchConsensusInterface (ConnectionId addrNTN ) (HeaderWithTime blk ) blk m
400
+ , fetchClientRegistry :: FetchClientRegistry (ConnectionId addrNTN ) (HeaderWithTime blk ) blk m
359
401
, varChainSyncHandles :: StrictTVar m (Map (ConnectionId addrNTN ) (ChainSyncClientHandle m blk ))
360
402
, varGsmState :: StrictTVar m GSM. GsmState
361
403
, mempool :: Mempool m blk
@@ -394,7 +436,7 @@ initInternalState NodeKernelArgs { tracers, chainDB, registry, cfg
394
436
395
437
fetchClientRegistry <- newFetchClientRegistry
396
438
397
- let getCandidates :: STM m (Map (ConnectionId addrNTN ) (AnchoredFragment (Header blk )))
439
+ let getCandidates :: STM m (Map (ConnectionId addrNTN ) (AnchoredFragment (HeaderWithTime blk )))
398
440
getCandidates = viewChainSyncState varChainSyncHandles csCandidate
399
441
400
442
slotForgeTimeOracle <- BlockFetchClientInterface. initSlotForgeTimeOracle cfg chainDB
@@ -403,7 +445,7 @@ initInternalState NodeKernelArgs { tracers, chainDB, registry, cfg
403
445
(ChainDB. getCurrentChain chainDB)
404
446
getUseBootstrapPeers
405
447
(GSM. gsmStateToLedgerJudgement <$> readTVar varGsmState)
406
- blockFetchInterface :: BlockFetchConsensusInterface (ConnectionId addrNTN ) (Header blk ) blk m
448
+ blockFetchInterface :: BlockFetchConsensusInterface (ConnectionId addrNTN ) (HeaderWithTime blk ) blk m
407
449
blockFetchInterface = BlockFetchClientInterface. mkBlockFetchConsensusInterface
408
450
(configBlock cfg)
409
451
(BlockFetchClientInterface. defaultChainDbView chainDB)
0 commit comments