Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ source-repository-package
source-repository-package
type: git
location: https://github.com/IntersectMBO/ouroboros-network
tag: 3c4433d05ec012af6d1a26e6b5e86665627c08c4
--sha256: sha256-Jemp6PlzISA+l1wdXV6MrIxaBpAxdrLLAlbkB7ZqF2Y=
-- from coot/dmq-related-changes
tag: 625296c92363b8c5e77cddee40de4525421d2660
--sha256: sha256-WRbKqNimAsYtgj/r3SJ0IT6z7+Q3XZf3p89BM9w6bF8=
subdir:
acts-generic
cardano-diffusion
Expand Down
19 changes: 12 additions & 7 deletions dmq-node/app/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
module Main where

import Control.Concurrent.Class.MonadSTM.Strict
import Control.Concurrent.Class.MonadMVar
import Control.Monad (void, when)
import Control.Monad.Class.MonadThrow
import Control.Tracer (Tracer (..), nullTracer, traceWith)
Expand All @@ -33,8 +34,6 @@ import System.IOManager (withIOManager)

import Cardano.Git.Rev (gitRev)
import Cardano.KESAgent.Protocols.StandardCrypto (StandardCrypto)
import Cardano.Ledger.Keys (VKey (..))
import Cardano.Ledger.Hashes (hashKey)

import DMQ.Configuration
import DMQ.Configuration.CLIOptions (parseCLIOptions)
Expand Down Expand Up @@ -93,8 +92,13 @@ runDMQ commandLineConfig = do
} = config' <> commandLineConfig
`act`
defaultConfiguration
let tracer :: ToJSON ev => Tracer IO (WithEventType ev)
tracer = dmqTracer prettyLog

lock <- newMVar ()
let tracer', tracer :: ToJSON ev => Tracer IO (WithEventType ev)
tracer' = dmqTracer prettyLog
-- use a lock to prevent writing two lines at the same time
-- TODO: this won't be needed with `cardano-tracer` integration
tracer = Tracer $ \a -> withMVar lock $ \_ -> traceWith tracer' a

when version $ do
let gitrev = $(gitRev)
Expand All @@ -119,6 +123,7 @@ runDMQ commandLineConfig = do

stdGen <- newStdGen
let (psRng, policyRng) = split stdGen
policyRngVar <- newTVarIO policyRng

-- TODO: this might not work, since `ouroboros-network` creates its own IO Completion Port.
withIOManager \iocp -> do
Expand Down Expand Up @@ -149,7 +154,7 @@ runDMQ commandLineConfig = do
Mempool.getWriter SigDuplicate
sigId
(\now sigs ->
withPoolValidationCtx (stakePools nodeKernel) (validateSig (hashKey . VKey) now sigs)
withPoolValidationCtx (stakePools nodeKernel) (validateSig now sigs)
)
(traverse_ $ \(sigid, reason) -> do
traceWith ntnValidationTracer $ InvalidSignature sigid reason
Expand Down Expand Up @@ -183,7 +188,7 @@ runDMQ commandLineConfig = do
Mempool.getWriter SigDuplicate
sigId
(\now sigs ->
withPoolValidationCtx (stakePools nodeKernel) (validateSig (hashKey . VKey) now sigs)
withPoolValidationCtx (stakePools nodeKernel) (validateSig now sigs)
)
(traverse_ $ \(sigid, reason) ->
traceWith ntcValidationTracer $ InvalidSignature sigid reason
Expand Down Expand Up @@ -212,7 +217,7 @@ runDMQ commandLineConfig = do
dmqLimitsAndTimeouts
dmqNtNApps
dmqNtCApps
(policy policyRng)
(policy policyRngVar)

Diffusion.run dmqDiffusionArguments
(dmqDiffusionTracers dmqConfig tracer)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
### Breaking

- `validateSig`: removed the hashing function for cold key from arguments, added required constraints ledger's `hashKey . VKey` usage instead

### Non-Breaking

- Added a lock to avoid race conditions between trace events.
- Improved peer selection policy.

1 change: 0 additions & 1 deletion dmq-node/dmq-node.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ executable dmq-node
base,
bytestring,
cardano-git-rev,
cardano-ledger-core,
contra-tracer >=0.1 && <0.3,
dmq-node,
io-classes:{io-classes, strict-stm},
Expand Down
11 changes: 6 additions & 5 deletions dmq-node/src/DMQ/Diffusion/NodeKernel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ module DMQ.Diffusion.NodeKernel
, withNodeKernel
, PoolValidationCtx (..)
, StakePools (..)
, PoolId
) where

import Control.Concurrent.Class.MonadMVar
Expand Down Expand Up @@ -33,8 +34,8 @@ import Data.Word
import System.Random (StdGen)
import System.Random qualified as Random

import Cardano.Ledger.Shelley.API hiding (I)
import Ouroboros.Consensus.Shelley.Ledger.Query
import Cardano.Ledger.Shelley.API qualified as Ledger
import Ouroboros.Consensus.Shelley.Ledger.Query qualified as LedgerQuery

import Ouroboros.Network.BlockFetch (FetchClientRegistry,
newFetchClientRegistry)
Expand Down Expand Up @@ -76,13 +77,13 @@ data NodeKernel crypto ntnAddr m =

-- | Cardano pool id's are hashes of the cold verification key
--
type PoolId = KeyHash StakePool
type PoolId = Ledger.KeyHash Ledger.StakePool

data StakePools m = StakePools {
-- | contains map of cardano pool stake snapshot obtained
-- via local state query client
stakePoolsVar
:: !(StrictTVar m (Map PoolId StakeSnapshot))
:: !(StrictTVar m (Map PoolId LedgerQuery.StakeSnapshot))
-- | Acquire and update validation context for signature validation
, withPoolValidationCtx
:: forall a. (PoolValidationCtx -> (a, PoolValidationCtx)) -> STM m a
Expand All @@ -99,7 +100,7 @@ data PoolValidationCtx =
PoolValidationCtx {
vctxEpoch :: !(Maybe UTCTime)
-- ^ UTC time of next epoch boundary for handling clock skew
, vctxStakeMap :: !(Map PoolId StakeSnapshot)
, vctxStakeMap :: !(Map PoolId LedgerQuery.StakeSnapshot)
-- ^ for signature validation
, vctxOcertMap :: !(Map PoolId Word64)
-- ^ ocert counters to check monotonicity
Expand Down
140 changes: 109 additions & 31 deletions dmq-node/src/DMQ/Diffusion/PeerSelection.hs
Original file line number Diff line number Diff line change
@@ -1,40 +1,118 @@
module DMQ.Diffusion.PeerSelection where

import Data.Set (Set)
import Control.Concurrent.Class.MonadSTM.Strict
import Data.List (sortOn, unfoldr)
import Data.Map.Strict qualified as Map
import Data.Set qualified as Set
import Network.Socket (SockAddr)
import Ouroboros.Network.PeerSelection.Governor.Types
import System.Random (Random (..), StdGen)
import Data.Word (Word32)
import Ouroboros.Network.PeerSelection
import System.Random (Random (..), StdGen, split)

-- | Trivial peer selection policy used as dummy value
--
policy :: StdGen -> PeerSelectionPolicy SockAddr IO
policy gen =
policy :: forall peerAddr m.
( MonadSTM m
, Ord peerAddr
)
=> StrictTVar m StdGen
-> PeerSelectionPolicy peerAddr m
policy rngVar =
PeerSelectionPolicy {
policyPickKnownPeersForPeerShare = \_ _ _ -> pickTrivially
, policyPickColdPeersToForget = \_ _ _ -> pickTrivially
, policyPickColdPeersToPromote = \_ _ _ -> pickTrivially
, policyPickWarmPeersToPromote = \_ _ _ -> pickTrivially
, policyPickHotPeersToDemote = \_ _ _ -> pickTrivially
, policyPickWarmPeersToDemote = \_ _ _ -> pickTrivially
, policyPickInboundPeers = \_ _ _ -> pickTrivially
, policyFindPublicRootTimeout = 5
, policyMaxInProgressPeerShareReqs = 0
, policyPeerShareRetryTime = 0 -- seconds
, policyPeerShareBatchWaitTime = 0 -- seconds
, policyPeerShareOverallTimeout = 0 -- seconds
, policyPeerShareActivationDelay = 2 -- seconds
policyPickKnownPeersForPeerShare = simplePromotionPolicy,
policyPickColdPeersToPromote = simplePromotionPolicy,
policyPickWarmPeersToPromote = simplePromotionPolicy,
policyPickInboundPeers = simplePromotionPolicy,

policyPickHotPeersToDemote = hotDemotionPolicy,
policyPickWarmPeersToDemote = warmDemotionPolicy,
policyPickColdPeersToForget = coldForgetPolicy,

policyFindPublicRootTimeout = 5,
policyMaxInProgressPeerShareReqs = 0,
policyPeerShareRetryTime = 0, -- seconds
policyPeerShareBatchWaitTime = 0, -- seconds
policyPeerShareOverallTimeout = 0, -- seconds
policyPeerShareActivationDelay = 2 -- seconds
}
where
pickTrivially :: Applicative m => Set SockAddr -> Int -> m (Set SockAddr)
pickTrivially set n = pure
. fst
$ go gen (Set.toList set) n []
where
go g _ 0 acc = (Set.fromList acc, g)
go g [] _ acc = (Set.fromList acc, g)
go g xs k acc =
let (idx, g') = randomR (0, length xs - 1) g
picked = xs !! idx
xs' = take idx xs ++ drop (idx + 1) xs
in go g' xs' (k - 1) (picked : acc)
hotDemotionPolicy :: PickPolicy peerAddr (STM m)
hotDemotionPolicy _ _ _ available pickNum = do
available' <- addRand rngVar available (,)
return $ Set.fromList
. map fst
. take pickNum
. sortOn snd
. Map.assocs
$ available'

-- Randomly pick peers to demote, peers with knownPeerTepid set are twice
-- as likely to be demoted.
warmDemotionPolicy :: PickPolicy peerAddr (STM m)
warmDemotionPolicy _ _ isTepid available pickNum = do
available' <- addRand rngVar available (tepidWeight isTepid)
return $ Set.fromList
. map fst
. take pickNum
. sortOn snd
. Map.assocs
$ available'

simplePromotionPolicy :: PickPolicy peerAddr (STM m)
simplePromotionPolicy _ _ _ available pickNum = do
available' <- addRand rngVar available (,)
return $ Set.fromList
. map fst
. take pickNum
. sortOn snd
. Map.assocs
$ available'

-- Randomly pick peers to forget, peers with failures are more likely to
-- be forgotten.
coldForgetPolicy :: PickPolicy peerAddr (STM m)
coldForgetPolicy source failCnt _ available pickNum = do
available' <- addRand rngVar available (failWeight failCnt)
return $ Set.fromList
. map fst
. take pickNum
. sortOn snd
. Map.assocs
-- avoid demoting local root peers
. Map.filterWithKey (\peer _ -> source peer /= PeerSourceLocalRoot)
$ available'

-- Failures lowers r
failWeight :: (peerAddr -> Int)
-> peerAddr
-> Word32
-> (peerAddr, Word32)
failWeight failCnt peer r =
(peer, r `div` fromIntegral (failCnt peer + 1))

-- Tepid flag cuts r in half
tepidWeight :: (peerAddr -> Bool)
-> peerAddr
-> Word32
-> (peerAddr, Word32)
tepidWeight isTepid peer r =
if isTepid peer then (peer, r `div` 2)
else (peer, r)


-- Add scaled random number in order to prevent ordering based on SockAddr
addRand :: ( MonadSTM m
, Ord peerAddr
)
=> StrictTVar m StdGen
-> Set.Set peerAddr
-> (peerAddr -> Word32 -> (peerAddr, Word32))
-> STM m (Map.Map peerAddr Word32)
addRand rngVar available scaleFn = do
inRng <- readTVar rngVar

let (rng, rng') = split inRng
rns = take (Set.size available) $ unfoldr (Just . random) rng :: [Word32]
available' = Map.fromList $ zipWith scaleFn (Set.toList available) rns
writeTVar rngVar rng'
return available'

Loading