Skip to content

Commit feb3856

Browse files
committed
Avoid Rollbacks during restart
Currently when there is a restart, db-sync goes back to the latest on disk ledger snapshot. It also deletes all data in the db back to this point as it maintains the property that ledger and db are always at the same point. However in most cases deleting the db is unnecessary and db-sync has to reinsert all the deleted data. With this pr we change the property: now that the db tip point is never behind the ledger. So on a relatively long rollback received from the chainsync server, db-sync won't delete any db-blocks. It will apply each new block to the ledger state and it will check if the block is already in the db. If it's already there it just goes to the next block. If it's not, it first cleans up the db for any existing blocks with greater or equal BlockNo and then inserts the block. After that the initial property, ledger and db at the same point, is restored. As an optimization, when db-sync doesn't do a full rollback, ie doesn't delete the db, it will change its ConsistentLevel to DBAheadOfLedger. When the consistency between ledger and db is restored it will change the level back to Consistent. The query I mentioned above only happens with DBAheadOfLedger, so in the normal syncing case no additional queries are executed.
1 parent 73742e9 commit feb3856

File tree

11 files changed

+159
-81
lines changed

11 files changed

+159
-81
lines changed

cardano-db-sync/src/Cardano/DbSync/Api.hs

+21
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ module Cardano.DbSync.Api
88
( SyncEnv (..)
99
, LedgerEnv (..)
1010
, SyncOptions (..)
11+
, ConsistentLevel (..)
12+
, setConsistentLevel
13+
, isConsistent
1114
, mkSyncEnvFromConfig
1215
, replaceConnection
1316
, verifyFilePoints
@@ -71,6 +74,7 @@ data SyncEnv = SyncEnv
7174
, envSystemStart :: !SystemStart
7275
, envConnString :: ConnectionString
7376
, envBackend :: !(StrictTVar IO (Strict.Maybe SqlBackend))
77+
, envConsistentLevel :: !(StrictTVar IO ConsistentLevel)
7478
, envOptions :: !SyncOptions
7579
, envCache :: !Cache
7680
, envOfflineWorkQueue :: !(TBQueue IO PoolFetchRetry)
@@ -81,6 +85,21 @@ data SyncEnv = SyncEnv
8185
, envLedger :: !LedgerEnv
8286
}
8387

88+
data ConsistentLevel = Consistent | DBAheadOfLedger | Unchecked
89+
deriving (Show, Eq)
90+
91+
setConsistentLevel :: SyncEnv -> ConsistentLevel -> IO ()
92+
setConsistentLevel env cst = do
93+
logInfo (getTrace env) $ "Setting ConsistencyLevel to " <> textShow cst
94+
atomically $ writeTVar (envConsistentLevel env) cst
95+
96+
isConsistent :: SyncEnv -> IO Bool
97+
isConsistent env = do
98+
cst <- readTVarIO (envConsistentLevel env)
99+
case cst of
100+
Consistent -> pure True
101+
_ -> pure False
102+
84103
data SyncOptions = SyncOptions
85104
{ soptExtended :: !Bool
86105
, soptAbortOnInvalid :: !Bool
@@ -198,6 +217,7 @@ mkSyncEnv trce connSring syncOptions protoInfo nw nwMagic systemStart dir = do
198217
(snapshotEveryFollowing syncOptions) (snapshotEveryLagging syncOptions)
199218
cache <- if soptCache syncOptions then newEmptyCache 100000 else pure uninitiatedCache
200219
backendVar <- newTVarIO Strict.Nothing
220+
consistentLevelVar <- newTVarIO Unchecked
201221
owq <- newTBQueueIO 100
202222
orq <- newTBQueueIO 100
203223
epochVar <- newTVarIO initEpochState
@@ -210,6 +230,7 @@ mkSyncEnv trce connSring syncOptions protoInfo nw nwMagic systemStart dir = do
210230
, envConnString = connSring
211231
, envBackend = backendVar
212232
, envOptions = syncOptions
233+
, envConsistentLevel = consistentLevelVar
213234
, envCache = cache
214235
, envOfflineWorkQueue = owq
215236
, envOfflineResultQueue = orq

cardano-db-sync/src/Cardano/DbSync/Cache.hs

+6-6
Original file line numberDiff line numberDiff line change
@@ -200,24 +200,24 @@ newEmptyCache maCapacity =
200200
-- NOTE: For 'StakeAddresses' we use a mixed approach. If the rollback is long we just drop
201201
-- everything, since it is very rare. If not, we query all the StakeAddressesId of blocks
202202
-- that wil be deleted.
203-
rollbackCache :: MonadIO m => Cache -> BlockNo -> Word64 -> ReaderT SqlBackend m ()
204-
rollbackCache cache (BlockNo blkNo) nBlocks =
203+
rollbackCache :: MonadIO m => Cache -> BlockNo -> Bool -> Word64 -> ReaderT SqlBackend m ()
204+
rollbackCache cache (BlockNo blkNo) deleteEq nBlocks =
205205
case cache of
206206
UninitiatedCache -> pure ()
207207
Cache ci -> do
208208
liftIO $ do
209209
atomically $ writeTVar (cPools ci) Map.empty
210210
atomically $ modifyTVar (cMultiAssets ci) LRU.cleanup
211211
atomically $ writeTVar (cPrevBlock ci) Nothing
212-
rollbackStakeAddr ci blkNo nBlocks
212+
rollbackStakeAddr ci blkNo deleteEq nBlocks
213213

214-
rollbackStakeAddr :: MonadIO m => CacheInternal -> Word64 -> Word64 -> ReaderT SqlBackend m ()
215-
rollbackStakeAddr ci blkNo nBlocks = do
214+
rollbackStakeAddr :: MonadIO m => CacheInternal -> Word64 -> Bool -> Word64 -> ReaderT SqlBackend m ()
215+
rollbackStakeAddr ci blkNo deleteEq nBlocks = do
216216
if nBlocks > 600
217217
then liftIO $ atomically $ writeTVar (cStakeCreds ci) Map.empty
218218
else do
219219
initMp <- liftIO $ readTVarIO (cStakeCreds ci)
220-
stakeAddrIdsToDelete <- DB.queryStakeAddressIdsAfter blkNo
220+
stakeAddrIdsToDelete <- DB.queryStakeAddressIdsAfter blkNo deleteEq
221221
let stakeAddrIdsSetToDelete = Set.fromList stakeAddrIdsToDelete
222222
let !mp = Map.filter (`Set.notMember` stakeAddrIdsSetToDelete) initMp
223223
liftIO $ atomically $ writeTVar (cStakeCreds ci) mp

cardano-db-sync/src/Cardano/DbSync/Database.hs

+8-2
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,17 @@ runActions env actions = do
8989
case spanDbApply xs of
9090
([], DbFinish:_) -> do
9191
pure Done
92-
([], DbRollBackToPoint pt resultVar : ys) -> do
93-
newExceptT $ rollbackToPoint env pt
92+
([], DbRollBackToPoint pt serverTip resultVar : ys) -> do
93+
deletedAllBlocks <- newExceptT $ rollbackToPoint env pt serverTip
9494
points <- if hasLedgerState env
9595
then lift $ rollbackLedger env pt
9696
else pure Nothing
97+
-- Ledger state always rollbacks at least back to the 'point' given by the Node.
98+
-- It needs to rollback even further, if 'points' is not 'Nothing'.
99+
-- The db may not rollback to the Node point.
100+
case (deletedAllBlocks, points) of
101+
(True, Nothing) -> liftIO $ setConsistentLevel env Consistent
102+
_ -> liftIO $ setConsistentLevel env DBAheadOfLedger
97103
blockNo <- lift $ getDbTipBlockNo env
98104
lift $ atomically $ putTMVar resultVar (points, blockNo)
99105
dbAction Continue ys

cardano-db-sync/src/Cardano/DbSync/DbAction.hs

+5-5
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@ import qualified Control.Concurrent.STM.TBQueue as TBQ
2222

2323
import Control.Monad.Class.MonadSTM.Strict (StrictTMVar, newEmptyTMVarIO, takeTMVar)
2424

25-
import Ouroboros.Network.Block (BlockNo)
25+
import Ouroboros.Network.Block (BlockNo, Tip (..))
2626
import qualified Ouroboros.Network.Point as Point
2727

2828
data DbAction
2929
= DbApplyBlock !CardanoBlock
30-
| DbRollBackToPoint !CardanoPoint (StrictTMVar IO (Maybe [CardanoPoint], Point.WithOrigin BlockNo))
30+
| DbRollBackToPoint !CardanoPoint !(Tip CardanoBlock) (StrictTMVar IO (Maybe [CardanoPoint], Point.WithOrigin BlockNo))
3131
| DbFinish
3232

3333
newtype DbActionQueue = DbActionQueue
@@ -39,10 +39,10 @@ mkDbApply = DbApplyBlock
3939

4040
-- | This simulates a synhronous operations, since the thread waits for the db
4141
-- worker thread to finish the rollback.
42-
waitRollback :: DbActionQueue -> CardanoPoint -> IO (Maybe [CardanoPoint], Point.WithOrigin BlockNo)
43-
waitRollback queue point = do
42+
waitRollback :: DbActionQueue -> CardanoPoint -> Tip CardanoBlock -> IO (Maybe [CardanoPoint], Point.WithOrigin BlockNo)
43+
waitRollback queue point serverTip = do
4444
resultVar <- newEmptyTMVarIO
45-
atomically $ writeDbActionQueue queue $ DbRollBackToPoint point resultVar
45+
atomically $ writeDbActionQueue queue $ DbRollBackToPoint point serverTip resultVar
4646
atomically $ takeTMVar resultVar
4747

4848
lengthDbActionQueue :: DbActionQueue -> STM Natural

cardano-db-sync/src/Cardano/DbSync/Default.hs

+44-20
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,10 @@ import Cardano.DbSync.Era.Shelley.Insert (insertShelleyBlock)
2525
import Cardano.DbSync.Era.Shelley.Insert.Epoch (insertPoolDepositRefunds, insertRewards)
2626
import Cardano.DbSync.Era.Shelley.Validate (validateEpochRewards)
2727
import Cardano.DbSync.Error
28-
2928
import Cardano.DbSync.LedgerState (ApplyResult (..), LedgerEvent (..),
3029
applyBlockAndSnapshot, defaultApplyResult)
3130
import Cardano.DbSync.LocalStateQuery
32-
31+
import Cardano.DbSync.Rollback
3332
import Cardano.DbSync.Types
3433
import Cardano.DbSync.Util
3534

@@ -41,6 +40,7 @@ import Control.Monad.Logger (LoggingT)
4140
import Control.Monad.Trans.Control (MonadBaseControl)
4241
import Control.Monad.Trans.Except.Extra (newExceptT)
4342

43+
import qualified Data.ByteString.Short as SBS
4444
import qualified Data.Map.Strict as Map
4545
import qualified Data.Set as Set
4646
import qualified Data.Strict.Maybe as Strict
@@ -49,6 +49,8 @@ import Database.Persist.SqlBackend.Internal
4949
import Database.Persist.SqlBackend.Internal.StatementCache
5050

5151
import Ouroboros.Consensus.Cardano.Block (HardForkBlock (..))
52+
import qualified Ouroboros.Consensus.HardFork.Combinator as Consensus
53+
import Ouroboros.Network.Block (blockHash, blockNo, getHeaderFields)
5254

5355

5456
insertListBlocks
@@ -58,38 +60,68 @@ insertListBlocks env blocks = do
5860
backend <- getBackend env
5961
DB.runDbIohkLogging backend tracer .
6062
runExceptT $ do
61-
traverse_ (applyAndInsert env) blocks
63+
traverse_ (applyAndInsertBlockMaybe env) blocks
6264
where
6365
tracer = getTrace env
6466

65-
applyAndInsert
67+
applyAndInsertBlockMaybe
6668
:: SyncEnv -> CardanoBlock -> ExceptT SyncNodeError (ReaderT SqlBackend (LoggingT IO)) ()
67-
applyAndInsert env cblk = do
69+
applyAndInsertBlockMaybe env cblk = do
6870
!applyRes <- liftIO mkApplyResult
71+
bl <- liftIO $ isConsistent env
72+
if bl then
73+
-- In the usual case it will be consistent so we don't need to do any queries. Just insert the block
74+
insertBlock env cblk applyRes False
75+
else do
76+
blockIsInDbAlready <- lift (isRight <$> DB.queryBlockHash (SBS.fromShort . Consensus.getOneEraHash $ blockHash cblk))
77+
-- If the block is already in db, do nothing. If not, delete all blocks with greater 'BlockNo' or
78+
-- equal, insert the block and restore consistency between ledger and db.
79+
unless blockIsInDbAlready $ do
80+
liftIO . logInfo tracer $
81+
mconcat
82+
[ "Received block which is not in the db ", textShow (getHeaderFields cblk)
83+
, " Time to restore consistency."]
84+
rollbackFromBlockNo env (blockNo cblk)
85+
insertBlock env cblk applyRes True
86+
liftIO $ setConsistentLevel env Consistent
87+
where
88+
tracer = getTrace env
89+
90+
mkApplyResult :: IO ApplyResult
91+
mkApplyResult = do
92+
if hasLedgerState env then
93+
applyBlockAndSnapshot (envLedger env) cblk
94+
else do
95+
slotDetails <- getSlotDetailsNode (envNoLedgerEnv env) (cardanoBlockSlotNo cblk)
96+
pure $ defaultApplyResult slotDetails
97+
98+
insertBlock
99+
:: SyncEnv -> CardanoBlock -> ApplyResult -> Bool -> ExceptT SyncNodeError (ReaderT SqlBackend (LoggingT IO)) ()
100+
insertBlock env cblk applyRes logBlock = do
69101
!epochEvents <- liftIO $ atomically $ generateNewEpochEvents env (apSlotDetails applyRes)
70102
let !applyResult = applyRes { apEvents = sort $ epochEvents <> apEvents applyRes}
71103
let !details = apSlotDetails applyResult
72104
insertLedgerEvents env (sdEpochNo details) (apEvents applyResult)
73105
insertEpoch details
74-
let firstBlockOfEpoch = hasEpochStartEvent (apEvents applyResult)
106+
let shouldLog = hasEpochStartEvent (apEvents applyResult) || logBlock
75107
let isMember poolId = Set.member poolId (apPoolsRegistered applyResult)
76108
case cblk of
77109
BlockByron blk ->
78-
newExceptT $ insertByronBlock env firstBlockOfEpoch blk details
110+
newExceptT $ insertByronBlock env shouldLog blk details
79111
BlockShelley blk -> newExceptT $
80-
insertShelleyBlock env firstBlockOfEpoch (Generic.fromShelleyBlock blk)
112+
insertShelleyBlock env shouldLog (Generic.fromShelleyBlock blk)
81113
details isMember (apNewEpoch applyResult) (apStakeSlice applyResult)
82114
BlockAllegra blk -> newExceptT $
83-
insertShelleyBlock env firstBlockOfEpoch (Generic.fromAllegraBlock blk)
115+
insertShelleyBlock env shouldLog (Generic.fromAllegraBlock blk)
84116
details isMember (apNewEpoch applyResult) (apStakeSlice applyResult)
85117
BlockMary blk -> newExceptT $
86-
insertShelleyBlock env firstBlockOfEpoch (Generic.fromMaryBlock blk)
118+
insertShelleyBlock env shouldLog (Generic.fromMaryBlock blk)
87119
details isMember (apNewEpoch applyResult) (apStakeSlice applyResult)
88120
BlockAlonzo blk -> newExceptT $
89-
insertShelleyBlock env firstBlockOfEpoch (Generic.fromAlonzoBlock (getPrices applyResult) blk)
121+
insertShelleyBlock env shouldLog (Generic.fromAlonzoBlock (getPrices applyResult) blk)
90122
details isMember (apNewEpoch applyResult) (apStakeSlice applyResult)
91123
BlockBabbage blk -> newExceptT $
92-
insertShelleyBlock env firstBlockOfEpoch (Generic.fromBabbageBlock (getPrices applyResult) blk)
124+
insertShelleyBlock env shouldLog (Generic.fromBabbageBlock (getPrices applyResult) blk)
93125
details isMember (apNewEpoch applyResult) (apStakeSlice applyResult)
94126
where
95127
tracer = getTrace env
@@ -103,14 +135,6 @@ applyAndInsert env cblk = do
103135
Strict.Nothing | hasLedgerState env -> Just $ Ledger.Prices minBound minBound
104136
Strict.Nothing -> Nothing
105137

106-
mkApplyResult :: IO ApplyResult
107-
mkApplyResult = do
108-
if hasLedgerState env then
109-
applyBlockAndSnapshot (envLedger env) cblk
110-
else do
111-
slotDetails <- getSlotDetailsNode (envNoLedgerEnv env) (cardanoBlockSlotNo cblk)
112-
pure $ defaultApplyResult slotDetails
113-
114138
-- -------------------------------------------------------------------------------------------------
115139

116140
insertLedgerEvents

cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Insert.hs

+2-2
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ insertShelleyBlock
8686
=> SyncEnv -> Bool -> Generic.Block -> SlotDetails
8787
-> IsPoolMember -> Strict.Maybe Generic.NewEpoch -> Generic.StakeSliceRes
8888
-> ReaderT SqlBackend m (Either SyncNodeError ())
89-
insertShelleyBlock env firstBlockOfEpoch blk details isMember mNewEpoch stakeSlice = do
89+
insertShelleyBlock env shouldLog blk details isMember mNewEpoch stakeSlice = do
9090
runExceptT $ do
9191
mPhid <- lift $ queryPoolKeyWithCache cache CacheNew $ coerceKeyRole $ Generic.blkSlotLeader blk
9292
slid <- lift . DB.insertSlotLeader $ Generic.mkSlotLeader (Generic.unKeyHashRaw $ Generic.blkSlotLeader blk) (eitherToMaybe mPhid)
@@ -152,7 +152,7 @@ insertShelleyBlock env firstBlockOfEpoch blk details isMember mNewEpoch stakeSli
152152
where
153153
logger :: Bool -> Trace IO a -> a -> IO ()
154154
logger followingClosely
155-
| firstBlockOfEpoch = logInfo
155+
| shouldLog = logInfo
156156
| followingClosely = logInfo
157157
| unBlockNo (Generic.blkBlockNo blk) `mod` 5000 == 0 = logInfo
158158
| otherwise = logDebug

cardano-db-sync/src/Cardano/DbSync/Rollback.hs

+46-35
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
module Cardano.DbSync.Rollback
66
( rollbackToPoint
7+
, rollbackFromBlockNo
78
, unsafeRollback
89
) where
910

@@ -21,58 +22,68 @@ import Cardano.DbSync.Error
2122
import Cardano.DbSync.Types
2223
import Cardano.DbSync.Util
2324

24-
import qualified Data.List as List
2525
import Database.Persist.Sql (SqlBackend)
2626

2727
import Ouroboros.Consensus.HardFork.Combinator.AcrossEras (getOneEraHash)
2828

2929
import Ouroboros.Network.Block
3030
import Ouroboros.Network.Point
3131

32+
-- | The decision to delete blocks has been taken and this executes it.
33+
deleteBlocks :: MonadIO m => SyncEnv -> BlockNo -> Bool -> Word64 -> ExceptT SyncNodeError (ReaderT SqlBackend m) ()
34+
deleteBlocks env blkNo deleteEq nBlocks = do
35+
unless (nBlocks == 0) $
36+
liftIO . logInfo trce $
37+
mconcat
38+
[ "Deleting ", textShow nBlocks, " blocks after "
39+
, if deleteEq then " or equal to " else ""
40+
, textShow blkNo
41+
]
42+
-- We need to first cleanup the cache and then delete the blocks from db.
43+
lift $ rollbackCache cache blkNo deleteEq nBlocks
44+
deleted <- lift $ DB.deleteAfterBlockNo blkNo deleteEq
45+
liftIO . logInfo trce $
46+
if deleted
47+
then "Blocks deleted"
48+
else "No blocks need to be deleted"
49+
where
50+
trce :: Trace IO Text
51+
trce = getTrace env
52+
53+
cache :: Cache
54+
cache = envCache env
55+
56+
rollbackFromBlockNo :: MonadIO m => SyncEnv -> BlockNo -> ExceptT SyncNodeError (ReaderT SqlBackend m) ()
57+
rollbackFromBlockNo env blkNo = do
58+
nBlocks <- lift $ DB.queryBlockCountAfterBlockNo (unBlockNo blkNo) True
59+
deleteBlocks env blkNo True (fromIntegral nBlocks)
60+
3261
-- Rollbacks are done in an Era generic way based on the 'Point' we are
3362
-- rolling back to.
34-
rollbackToPoint :: SyncEnv -> CardanoPoint -> IO (Either SyncNodeError ())
35-
rollbackToPoint env point = do
63+
rollbackToPoint :: SyncEnv -> CardanoPoint -> Tip CardanoBlock -> IO (Either SyncNodeError Bool)
64+
rollbackToPoint env point serverTip = do
3665
backend <- getBackend env
3766
DB.runDbIohkNoLogging backend $ runExceptT action
3867
where
3968
trce :: Trace IO Text
4069
trce = getTrace env
4170

42-
cache :: Cache
43-
cache = envCache env
44-
45-
action :: MonadIO m => ExceptT SyncNodeError (ReaderT SqlBackend m) ()
71+
action :: MonadIO m => ExceptT SyncNodeError (ReaderT SqlBackend m) Bool
4672
action = do
47-
liftIO . logInfo trce $ "Rolling back to " <> renderPoint point
48-
xs <- lift $ slotsToDelete (pointSlot point)
49-
unless (null xs) $
50-
-- there may be more deleted blocks than slots, because ebbs don't have
51-
-- a slot. We can only make an estimation here.
73+
blkNo <- liftLookupFail "Rollback.rollbackToPoint.queryBlockNo" $ queryBlockNo point
74+
nBlocks <- lift $ DB.queryBlockCountAfterBlockNo (unBlockNo blkNo) False
75+
if nBlocks <= 50 || not (hasLedgerState env) then do
76+
liftIO . logInfo trce $ "Rolling back to " <> renderPoint point
77+
deleteBlocks env blkNo False (fromIntegral nBlocks)
78+
pure True
79+
else do
5280
liftIO . logInfo trce $
53-
mconcat
54-
[ "Deleting ", textShow (length xs), " blocks up to slot "
55-
, textShow (unSlotNo $ List.head xs)
56-
]
57-
-- We delete the block right after the point we rollback to. This delete
58-
-- should cascade to the rest of the chain.
59-
blkNo <- liftLookupFail "Rollback.rollbackToPoint" $ queryBlockNo point
60-
-- 'length xs' here gives an approximation of the blocks deleted. An approximation
61-
-- is good enough, since it is only used to decide on the best policy and is not
62-
-- important for correctness.
63-
-- We need to first cleanup the cache and then delete the blocks from db.
64-
lift $ rollbackCache cache blkNo (fromIntegral $ length xs)
65-
deleted <- lift $ DB.deleteAfterBlockNo blkNo
66-
liftIO . logInfo trce $
67-
if deleted
68-
then "Blocks deleted"
69-
else "No blocks need to be deleted"
70-
71-
slotsToDelete :: MonadIO m => WithOrigin SlotNo -> ReaderT SqlBackend m [SlotNo]
72-
slotsToDelete wosl =
73-
case wosl of
74-
Origin -> DB.querySlotNos
75-
At sl -> DB.querySlotNosGreaterThan (unSlotNo sl)
81+
mconcat
82+
[ "Delaying rollback of ", textShow nBlocks, " blocks after "
83+
, textShow blkNo, " back to " , renderPoint point
84+
, ". Applying blocks up to current node ", textShow serverTip
85+
]
86+
pure False
7687

7788
queryBlockNo :: MonadIO m => Point CardanoBlock -> ReaderT SqlBackend m (Either DB.LookupFail BlockNo)
7889
queryBlockNo pnt =

0 commit comments

Comments
 (0)