Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite state loading path on startup #4166

Merged
merged 2 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/bucket/BucketManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,10 @@ class BucketManager : NonMovableOrCopyable
checkForMissingBucketsFiles(HistoryArchiveState const& has) = 0;

// Assume state from `has` in BucketList: find and attach all buckets in
// `has`, set current BL. Note: Does not restart merging
// `has`, set current BL.
virtual void assumeState(HistoryArchiveState const& has,
uint32_t maxProtocolVersion) = 0;
uint32_t maxProtocolVersion,
bool restartMerges) = 0;

virtual void shutdown() = 0;

Expand Down
7 changes: 5 additions & 2 deletions src/bucket/BucketManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,7 @@ BucketManagerImpl::checkForMissingBucketsFiles(HistoryArchiveState const& has)

void
BucketManagerImpl::assumeState(HistoryArchiveState const& has,
uint32_t maxProtocolVersion)
uint32_t maxProtocolVersion, bool restartMerges)
{
ZoneScoped;
releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST);
Expand Down Expand Up @@ -1093,7 +1093,10 @@ BucketManagerImpl::assumeState(HistoryArchiveState const& has,
mBucketList->getLevel(i).setNext(nextFuture);
}

mBucketList->restartMerges(mApp, maxProtocolVersion, has.currentLedger);
if (restartMerges)
{
mBucketList->restartMerges(mApp, maxProtocolVersion, has.currentLedger);
}
cleanupStaleFiles();
}

Expand Down
2 changes: 1 addition & 1 deletion src/bucket/BucketManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class BucketManagerImpl : public BucketManager
std::vector<std::string>
checkForMissingBucketsFiles(HistoryArchiveState const& has) override;
void assumeState(HistoryArchiveState const& has,
uint32_t maxProtocolVersion) override;
uint32_t maxProtocolVersion, bool restartMerges) override;
void shutdown() override;

bool isShutdown() const override;
Expand Down
3 changes: 2 additions & 1 deletion src/catchup/ApplyBucketsWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ ApplyBucketsWork::doWork()
CLOG_INFO(History, "ApplyBuckets : done, assuming state");

// After all buckets applied, spawn assumeState work
addWork<AssumeStateWork>(mApplyState, mMaxProtocolVersion);
addWork<AssumeStateWork>(mApplyState, mMaxProtocolVersion,
/* restartMerges */ true);
mSpawnedAssumeStateWork = true;
}

Expand Down
8 changes: 6 additions & 2 deletions src/catchup/AssumeStateWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ namespace stellar
{
AssumeStateWork::AssumeStateWork(Application& app,
HistoryArchiveState const& has,
uint32_t maxProtocolVersion)
uint32_t maxProtocolVersion,
bool restartMerges)
: Work(app, "assume-state", BasicWork::RETRY_NEVER)
, mHas(has)
, mMaxProtocolVersion(maxProtocolVersion)
, mRestartMerges(restartMerges)
{
// Maintain reference to all Buckets in HAS to avoid garbage collection,
// including future buckets that have already finished merging
Expand Down Expand Up @@ -68,8 +70,10 @@ AssumeStateWork::doWork()
// Add bucket files to BucketList and restart merges
auto assumeStateCB = [&has = mHas,
maxProtocolVersion = mMaxProtocolVersion,
restartMerges = mRestartMerges,
&buckets = mBuckets](Application& app) {
app.getBucketManager().assumeState(has, maxProtocolVersion);
app.getBucketManager().assumeState(has, maxProtocolVersion,
restartMerges);

// Drop bucket references once assume state complete since buckets
// now referenced by BucketList
Expand Down
3 changes: 2 additions & 1 deletion src/catchup/AssumeStateWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ class AssumeStateWork : public Work
HistoryArchiveState const& mHas;
uint32_t const mMaxProtocolVersion;
bool mWorkSpawned{false};
bool const mRestartMerges;

// Keep strong reference to buckets in HAS so they are not garbage
// collected during indexing
std::vector<std::shared_ptr<Bucket>> mBuckets{};

public:
AssumeStateWork(Application& app, HistoryArchiveState const& has,
uint32_t maxProtocolVersion);
uint32_t maxProtocolVersion, bool restartMerges);

protected:
State doWork() override;
Expand Down
11 changes: 8 additions & 3 deletions src/ledger/LedgerManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,14 @@ class LedgerManager
// Called by application lifecycle events, system startup.
virtual void startNewLedger() = 0;

// loads the last ledger information from the database
// if handler is set, also loads bucket information and invokes handler.
virtual void loadLastKnownLedger(std::function<void()> handler) = 0;
// loads the last ledger information from the database with the following
// parameters:
// * restoreBucketlist indicates whether to restore the bucket list fully,
// and restart merges
// * isLedgerStateReady indicates whether the ledger state is ready or is
// still being rebuilt (in which case we can't yet load ledger entries)
virtual void loadLastKnownLedger(bool restoreBucketlist,
bool isLedgerStateReady) = 0;

// Return true if core is currently rebuilding in-memory state via local
// catchup
Expand Down
177 changes: 97 additions & 80 deletions src/ledger/LedgerManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,11 +280,12 @@ setLedgerTxnHeader(LedgerHeader const& lh, Application& app)
}

void
LedgerManagerImpl::loadLastKnownLedger(function<void()> handler)
LedgerManagerImpl::loadLastKnownLedger(bool restoreBucketlist,
bool isLedgerStateReady)
{
ZoneScoped;
auto ledgerTime = mLedgerClose.TimeScope();

// Step 1. Load LCL state from the DB and extract latest ledger hash
string lastLedger =
mApp.getPersistentState().getState(PersistentState::kLastClosedLedger);

Expand All @@ -293,103 +294,119 @@ LedgerManagerImpl::loadLastKnownLedger(function<void()> handler)
throw std::runtime_error(
"No reference in DB to any last closed ledger");
}
else
{
CLOG_INFO(Ledger, "Last closed ledger (LCL) hash is {}", lastLedger);
Hash lastLedgerHash = hexToBin256(lastLedger);

if (mApp.getConfig().MODE_STORES_HISTORY_LEDGERHEADERS)
CLOG_INFO(Ledger, "Last closed ledger (LCL) hash is {}", lastLedger);
Hash lastLedgerHash = hexToBin256(lastLedger);

// Step 2. Restore LedgerHeader from DB based on the ledger hash derived
// earlier, or verify we're at genesis if in no-history mode
std::optional<LedgerHeader> latestLedgerHeader;
if (mApp.getConfig().MODE_STORES_HISTORY_LEDGERHEADERS)
{
if (mRebuildInMemoryState)
{
if (mRebuildInMemoryState)
LedgerHeader lh;
CLOG_INFO(Ledger,
"Setting empty ledger while core rebuilds state: {}",
ledgerAbbrev(lh));
setLedgerTxnHeader(lh, mApp);
latestLedgerHeader = lh;
}
else
{
auto currentLedger =
LedgerHeaderUtils::loadByHash(getDatabase(), lastLedgerHash);
if (!currentLedger)
{
LedgerHeader lh;
CLOG_INFO(Ledger,
"Setting empty ledger while core rebuilds state: {}",
ledgerAbbrev(lh));
setLedgerTxnHeader(lh, mApp);
throw std::runtime_error("Could not load ledger from database");
}
else
HistoryArchiveState has = getLastClosedLedgerHAS();
if (currentLedger->ledgerSeq != has.currentLedger)
{
auto currentLedger = LedgerHeaderUtils::loadByHash(
getDatabase(), lastLedgerHash);
if (!currentLedger)
{
throw std::runtime_error(
"Could not load ledger from database");
}
HistoryArchiveState has = getLastClosedLedgerHAS();
if (currentLedger->ledgerSeq != has.currentLedger)
{
throw std::runtime_error(
"Invalid database state: last known "
"ledger does not agree with HAS");
}

CLOG_INFO(Ledger, "Loaded LCL header from database: {}",
ledgerAbbrev(*currentLedger));
setLedgerTxnHeader(*currentLedger, mApp);
throw std::runtime_error("Invalid database state: last known "
"ledger does not agree with HAS");
}

CLOG_INFO(Ledger, "Loaded LCL header from database: {}",
ledgerAbbrev(*currentLedger));
setLedgerTxnHeader(*currentLedger, mApp);
latestLedgerHeader = *currentLedger;
}
else
}
else
{
// In no-history mode, this method should only be called when
// the LCL is genesis.
releaseAssertOrThrow(mLastClosedLedger.hash == lastLedgerHash);
releaseAssertOrThrow(mLastClosedLedger.header.ledgerSeq ==
GENESIS_LEDGER_SEQ);
CLOG_INFO(Ledger, "LCL is genesis: {}",
ledgerAbbrev(mLastClosedLedger));
latestLedgerHeader = mLastClosedLedger.header;
}

releaseAssert(latestLedgerHeader.has_value());

// Step 3. Restore BucketList if we're doing a full core startup
// (startServices=true), OR when using BucketListDB
if (restoreBucketlist || mApp.getConfig().isUsingBucketListDB())
{
HistoryArchiveState has = getLastClosedLedgerHAS();
auto missing = mApp.getBucketManager().checkForMissingBucketsFiles(has);
auto pubmissing = mApp.getHistoryManager()
.getMissingBucketsReferencedByPublishQueue();
missing.insert(missing.end(), pubmissing.begin(), pubmissing.end());
if (!missing.empty())
{
// In no-history mode, this method should only be called when
// the LCL is genesis.
releaseAssertOrThrow(mLastClosedLedger.hash == lastLedgerHash);
releaseAssertOrThrow(mLastClosedLedger.header.ledgerSeq ==
GENESIS_LEDGER_SEQ);
CLOG_INFO(Ledger, "LCL is genesis: {}",
ledgerAbbrev(mLastClosedLedger));
CLOG_ERROR(Ledger,
"{} buckets are missing from bucket directory '{}'",
missing.size(), mApp.getBucketManager().getBucketDir());
throw std::runtime_error("Bucket directory is corrupt");
}

if (handler)
if (mApp.getConfig().MODE_ENABLES_BUCKETLIST)
{
HistoryArchiveState has = getLastClosedLedgerHAS();
auto missing =
mApp.getBucketManager().checkForMissingBucketsFiles(has);
auto pubmissing = mApp.getHistoryManager()
.getMissingBucketsReferencedByPublishQueue();
missing.insert(missing.end(), pubmissing.begin(), pubmissing.end());
if (!missing.empty())
// Only restart merges in full startup mode. Many modes in core
// (standalone offline commands, in-memory setup) do not need to
// spin up expensive merge processes.
auto assumeStateWork =
mApp.getWorkScheduler().executeWork<AssumeStateWork>(
has, latestLedgerHeader->ledgerVersion, restoreBucketlist);
if (assumeStateWork->getState() == BasicWork::State::WORK_SUCCESS)
{
CLOG_ERROR(
Ledger, "{} buckets are missing from bucket directory '{}'",
missing.size(), mApp.getBucketManager().getBucketDir());
throw std::runtime_error("Bucket directory is corrupt");
CLOG_INFO(Ledger, "Assumed bucket-state for LCL: {}",
ledgerAbbrev(*latestLedgerHeader));
}
else
{
{
LedgerTxn ltx(mApp.getLedgerTxnRoot());
auto header = ltx.loadHeader();
uint32_t ledgerVersion = header.current().ledgerVersion;
if (mApp.getConfig().MODE_ENABLES_BUCKETLIST)
{
auto assumeStateWork =
mApp.getWorkScheduler()
.executeWork<AssumeStateWork>(has,
ledgerVersion);
if (assumeStateWork->getState() ==
BasicWork::State::WORK_SUCCESS)
{
CLOG_INFO(Ledger,
"Assumed bucket-state for LCL: {}",
ledgerAbbrev(header.current()));
}
else
{
// Work should only fail during graceful shutdown
releaseAssert(mApp.isStopping());
}
}
advanceLedgerPointers(header.current());
}
handler();
// Work should only fail during graceful shutdown
releaseAssertOrThrow(mApp.isStopping());
}
}
else
}

// Step 4. Restore LedgerManager's internal state
advanceLedgerPointers(*latestLedgerHeader);

if (protocolVersionStartsFrom(latestLedgerHeader->ledgerVersion,
SOROBAN_PROTOCOL_VERSION))
{
if (isLedgerStateReady)
{
// Step 5. If ledger state is ready and core is in v20, load network
// configs right away
LedgerTxn ltx(mApp.getLedgerTxnRoot());
advanceLedgerPointers(ltx.loadHeader().current());
updateNetworkConfig(ltx);
}
else
{
// In some modes, e.g. in-memory, core's state is rebuilt
// asynchronously via catchup. In this case, we're not able to load
// the network config at this time, and instead must let catchup do
// it when ready.
CLOG_INFO(Ledger,
"Ledger state is being rebuilt, network config will "
"be loaded once the rebuild is done");
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/ledger/LedgerManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ class LedgerManagerImpl : public LedgerManager

void startNewLedger(LedgerHeader const& genesisLedger);
void startNewLedger() override;
void loadLastKnownLedger(std::function<void()> handler) override;
void loadLastKnownLedger(bool restoreBucketlist,
bool isLedgerStateReady) override;
virtual bool rebuildingInMemoryState() override;
virtual void setupInMemoryStateRebuild() override;

Expand Down
Loading