Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanups and fixes to maintenance #4657

Merged
merged 4 commits into from
Mar 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 0 additions & 30 deletions src/catchup/CatchupConfiguration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,51 +10,21 @@
namespace stellar
{

void
CatchupConfiguration::checkInvariants() const
{
if (mMode == CatchupConfiguration::Mode::LOCAL_BUCKETS_ONLY)
{
releaseAssert(mHAS && mHistoryEntry);
releaseAssert(toLedger() != CatchupConfiguration::CURRENT);
releaseAssert(count() == 0);
}
else
{
releaseAssert(!mHAS && !mHistoryEntry);
}
}

CatchupConfiguration::CatchupConfiguration(LedgerNumHashPair ledgerHashPair,
uint32_t count, Mode mode)
: mCount{count}, mLedgerHashPair{ledgerHashPair}, mMode{mode}
{
checkInvariants();
}

CatchupConfiguration::CatchupConfiguration(HistoryArchiveState has,
LedgerHeaderHistoryEntry lhhe)
: mCount(0)
, mLedgerHashPair(LedgerNumHashPair(lhhe.header.ledgerSeq,
std::make_optional(lhhe.hash)))
, mMode(CatchupConfiguration::Mode::LOCAL_BUCKETS_ONLY)
, mHAS(std::make_optional(has))
, mHistoryEntry(std::make_optional(lhhe))
{
checkInvariants();
}

CatchupConfiguration::CatchupConfiguration(uint32_t toLedger, uint32_t count,
Mode mode)
: mCount{count}, mLedgerHashPair{toLedger, std::nullopt}, mMode{mode}
{
checkInvariants();
}

CatchupConfiguration
CatchupConfiguration::resolve(uint32_t remoteCheckpoint) const
{
checkInvariants();
auto cfg = *this;
if (toLedger() == CatchupConfiguration::CURRENT)
{
Expand Down
29 changes: 1 addition & 28 deletions src/catchup/CatchupConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,13 @@ class CatchupConfiguration
// Do validity checks on all history archive file types for a given
// range, regardless of whether files are used or not
OFFLINE_COMPLETE,
ONLINE,
// Similar to online catchup, except in this mode there is no archive
// lookup and validation, rebuild local state present on disk, while
// buffering ledgers
LOCAL_BUCKETS_ONLY
ONLINE
};
static const uint32_t CURRENT = 0;

CatchupConfiguration(uint32_t toLedger, uint32_t count, Mode mode);
CatchupConfiguration(LedgerNumHashPair ledgerHashPair, uint32_t count,
Mode mode);
CatchupConfiguration(HistoryArchiveState has,
LedgerHeaderHistoryEntry lhhe);

/**
* If toLedger() == CatchupConfiguration::CURRENT it replaces it with
Expand Down Expand Up @@ -104,31 +98,10 @@ class CatchupConfiguration
return mMode == Mode::ONLINE;
}

bool
localBucketsOnly() const
{
return mMode == Mode::LOCAL_BUCKETS_ONLY;
}

std::optional<HistoryArchiveState>
getHAS() const
{
return mHAS;
}

std::optional<LedgerHeaderHistoryEntry>
getHistoryEntry() const
{
return mHistoryEntry;
}

private:
uint32_t mCount;
LedgerNumHashPair mLedgerHashPair;
Mode mMode;
std::optional<HistoryArchiveState> mHAS;
std::optional<LedgerHeaderHistoryEntry> mHistoryEntry;
void checkInvariants() const;
};

uint32_t parseLedger(std::string const& str);
Expand Down
12 changes: 3 additions & 9 deletions src/catchup/CatchupRange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ void
checkCatchupPreconditions(uint32_t lastClosedLedger,
CatchupConfiguration const& configuration)
{
if (lastClosedLedger < LedgerManager::GENESIS_LEDGER_SEQ &&
!configuration.localBucketsOnly())
if (lastClosedLedger < LedgerManager::GENESIS_LEDGER_SEQ)
{
throw std::invalid_argument{"lastClosedLedger == 0"};
}
Expand Down Expand Up @@ -58,7 +57,7 @@ calculateCatchupRange(uint32_t lcl, CatchupConfiguration const& cfg,
}

// All remaining cases have LCL == genesis.
releaseAssert(lcl == init || cfg.localBucketsOnly());
releaseAssert(lcl == init);
LedgerRange fullReplay(init + 1, fullReplayCount);

// Case 2: full replay because count >= target - init.
Expand All @@ -70,8 +69,7 @@ calculateCatchupRange(uint32_t lcl, CatchupConfiguration const& cfg,
// Case 3: special case of buckets only, no replay; only
// possible when targeting the exact end of a checkpoint.
if (cfg.count() == 0 && (HistoryManager::isLastLedgerInCheckpoint(
cfg.toLedger(), hm.getConfig()) ||
cfg.localBucketsOnly()))
cfg.toLedger(), hm.getConfig())))
{
return CatchupRange(cfg.toLedger());
}
Expand Down Expand Up @@ -128,10 +126,6 @@ CatchupRange::CatchupRange(uint32_t lastClosedLedger,
: CatchupRange(calculateCatchupRange(lastClosedLedger, configuration,
historyManager))
{
if (configuration.localBucketsOnly())
{
releaseAssert(applyBuckets());
}
checkInvariants();
}

Expand Down
141 changes: 48 additions & 93 deletions src/catchup/CatchupWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,6 @@ void
CatchupWork::downloadVerifyLedgerChain(CatchupRange const& catchupRange,
LedgerNumHashPair rangeEnd)
{
releaseAssert(!mCatchupConfiguration.localBucketsOnly());

ZoneScoped;
auto verifyRange = catchupRange.getFullRangeIncludingBucketApply();
releaseAssert(verifyRange.mCount != 0);
Expand Down Expand Up @@ -194,9 +192,8 @@ CatchupWork::downloadVerifyTxResults(CatchupRange const& catchupRange)
bool
CatchupWork::alreadyHaveBucketsHistoryArchiveState(uint32_t atCheckpoint) const
{
return mCatchupConfiguration.localBucketsOnly() ||
atCheckpoint == mGetHistoryArchiveStateWork->getHistoryArchiveState()
.currentLedger;
return atCheckpoint ==
mGetHistoryArchiveStateWork->getHistoryArchiveState().currentLedger;
}

WorkSeqPtr
Expand All @@ -213,28 +210,23 @@ CatchupWork::downloadApplyBuckets()
std::vector<std::shared_ptr<BasicWork>> seq;
auto version = mApp.getConfig().LEDGER_PROTOCOL_VERSION;

// Download buckets, or skip if catchup is local
if (!mCatchupConfiguration.localBucketsOnly())
{
std::vector<std::string> hashes =
mBucketHAS->differingBuckets(mLocalState);
auto getBuckets = std::make_shared<DownloadBucketsWork>(
mApp, mBuckets, hashes, *mDownloadDir, mArchive);
seq.push_back(getBuckets);

auto verifyHASCallback = [has = *mBucketHAS](Application& app) {
if (!has.containsValidBuckets(app))
{
CLOG_ERROR(History, "Malformed HAS: invalid buckets");
return false;
}
return true;
};
auto verifyHAS = std::make_shared<WorkWithCallback>(mApp, "verify-has",
verifyHASCallback);
seq.push_back(verifyHAS);
version = mVerifiedLedgerRangeStart.header.ledgerVersion;
}
std::vector<std::string> hashes = mBucketHAS->differingBuckets(mLocalState);
auto getBuckets = std::make_shared<DownloadBucketsWork>(
mApp, mBuckets, hashes, *mDownloadDir, mArchive);
seq.push_back(getBuckets);

auto verifyHASCallback = [has = *mBucketHAS](Application& app) {
if (!has.containsValidBuckets(app))
{
CLOG_ERROR(History, "Malformed HAS: invalid buckets");
return false;
}
return true;
};
auto verifyHAS = std::make_shared<WorkWithCallback>(mApp, "verify-has",
verifyHASCallback);
seq.push_back(verifyHAS);
version = mVerifiedLedgerRangeStart.header.ledgerVersion;

auto applyBuckets = std::make_shared<ApplyBucketsWork>(
mApp, mBuckets, *mBucketHAS, version);
Expand Down Expand Up @@ -291,52 +283,37 @@ BasicWork::State
CatchupWork::getAndMaybeSetHistoryArchiveState()
{
// First, retrieve the HAS

// If we're just doing local catchup, set HAS right away
if (mCatchupConfiguration.localBucketsOnly())
if (!mGetHistoryArchiveStateWork)
{
auto toLedger = mCatchupConfiguration.toLedger() == 0
? "CURRENT"
: std::to_string(mCatchupConfiguration.toLedger());
CLOG_INFO(History,
"Starting catchup with configuration:\n lastClosedLedger: "
"{}\n toLedger: {}\n count: {}",
mApp.getLedgerManager().getLastClosedLedgerNum(), toLedger,
mCatchupConfiguration.count());

auto toCheckpoint =
mCatchupConfiguration.toLedger() == CatchupConfiguration::CURRENT
? CatchupConfiguration::CURRENT
: HistoryManager::checkpointContainingLedger(
mCatchupConfiguration.toLedger(), mApp.getConfig());
// Set retries to 10 to ensure we retry enough in case current
// checkpoint isn't published yet
mGetHistoryArchiveStateWork = addWork<GetHistoryArchiveStateWork>(
toCheckpoint, mArchive, true, 10);
mCurrentWork = mGetHistoryArchiveStateWork;
return State::WORK_RUNNING;
}
else if (mGetHistoryArchiveStateWork->getState() != State::WORK_SUCCESS)
{
mHAS = getCatchupConfiguration().getHAS();
releaseAssert(mHAS.has_value());
return mGetHistoryArchiveStateWork->getState();
}
else
{
// Otherwise, continue with the normal catchup flow: download and verify
// HAS from history archive
if (!mGetHistoryArchiveStateWork)
{
auto toLedger =
mCatchupConfiguration.toLedger() == 0
? "CURRENT"
: std::to_string(mCatchupConfiguration.toLedger());
CLOG_INFO(
History,
"Starting catchup with configuration:\n lastClosedLedger: "
"{}\n toLedger: {}\n count: {}",
mApp.getLedgerManager().getLastClosedLedgerNum(), toLedger,
mCatchupConfiguration.count());

auto toCheckpoint =
mCatchupConfiguration.toLedger() ==
CatchupConfiguration::CURRENT
? CatchupConfiguration::CURRENT
: HistoryManager::checkpointContainingLedger(
mCatchupConfiguration.toLedger(), mApp.getConfig());
// Set retries to 10 to ensure we retry enough in case current
// checkpoint isn't published yet
mGetHistoryArchiveStateWork = addWork<GetHistoryArchiveStateWork>(
toCheckpoint, mArchive, true, 10);
mCurrentWork = mGetHistoryArchiveStateWork;
return State::WORK_RUNNING;
}
else if (mGetHistoryArchiveStateWork->getState() != State::WORK_SUCCESS)
{
return mGetHistoryArchiveStateWork->getState();
}
else
{
mHAS = std::make_optional<HistoryArchiveState>(
mGetHistoryArchiveStateWork->getHistoryArchiveState());
}
mHAS = std::make_optional<HistoryArchiveState>(
mGetHistoryArchiveStateWork->getHistoryArchiveState());
}

// Second, perform some validation
Expand Down Expand Up @@ -448,8 +425,7 @@ CatchupWork::runCatchupStep()
// Bucket and transaction processing has started
if (mCatchupSeq)
{
releaseAssert(mDownloadVerifyLedgersSeq ||
mCatchupConfiguration.localBucketsOnly());
releaseAssert(mDownloadVerifyLedgersSeq);
releaseAssert(mTransactionsVerifyApplySeq ||
!catchupRange.replayLedgers());

Expand Down Expand Up @@ -494,8 +470,7 @@ CatchupWork::runCatchupStep()
// the node will have to catch up again and it will clear the
// ledger because clearRebuildForType has not been called yet.
mApp.getLedgerManager().setLastClosedLedger(
mVerifiedLedgerRangeStart,
!mCatchupConfiguration.localBucketsOnly());
mVerifiedLedgerRangeStart);
mBucketsAppliedEmitted = true;
mBuckets.clear();
mLastApplied =
Expand Down Expand Up @@ -534,22 +509,6 @@ CatchupWork::runCatchupStep()
return mCatchupSeq->getState();
}

// If we're just doing local catchup, setup bucket application and exit
if (mCatchupConfiguration.localBucketsOnly())
{
releaseAssert(catchupRange.applyBuckets());
auto lhhe = mCatchupConfiguration.getHistoryEntry();
releaseAssert(lhhe);
mVerifiedLedgerRangeStart = *lhhe;

mBucketVerifyApplySeq = downloadApplyBuckets();
std::vector<std::shared_ptr<BasicWork>> seq{mBucketVerifyApplySeq};

mCatchupSeq = addWork<WorkSequence>("catchup-seq", seq, RETRY_NEVER);
mCurrentWork = mCatchupSeq;
return State::WORK_RUNNING;
}

// Otherwise, proceed with normal flow. Still waiting for ledger headers
if (mDownloadVerifyLedgersSeq)
{
Expand Down Expand Up @@ -649,10 +608,6 @@ CatchupWork::onFailureRaise()
{
CLOG_WARNING(History, "Catchup failed");
Work::onFailureRaise();
if (mCatchupConfiguration.localBucketsOnly())
{
throw std::runtime_error("Unable to rebuild local state");
}
}

void
Expand Down
6 changes: 2 additions & 4 deletions src/catchup/LedgerApplyManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,8 @@ LedgerApplyManagerImpl::startCatchup(
configuration.toLedger(), lastClosedLedger));
}

// Offline and local catchup types aren't triggered by buffered ledgers
auto offlineCatchup =
configuration.offline() || configuration.localBucketsOnly();
releaseAssert(offlineCatchup == mSyncingLedgers.empty());
// Offline catchup isn't triggered by buffered ledgers
releaseAssert(configuration.offline() == mSyncingLedgers.empty());

releaseAssert(!mCatchupWork);

Expand Down
10 changes: 5 additions & 5 deletions src/herder/HerderPersistence.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,18 @@ class HerderPersistence
std::vector<SCPEnvelope> const& envs,
QuorumTracker::QuorumMap const& qmap) = 0;

static size_t copySCPHistoryToStream(Database& db, soci::session& sess,
static size_t copySCPHistoryToStream(soci::session& sess,
uint32_t ledgerSeq,
uint32_t ledgerCount,
XDROutputFileStream& scpHistory);
// quorum information lookup
static std::optional<Hash>
getNodeQuorumSet(Database& db, soci::session& sess, NodeID const& nodeID);
static SCPQuorumSetPtr getQuorumSet(Database& db, soci::session& sess,
static std::optional<Hash> getNodeQuorumSet(soci::session& sess,
NodeID const& nodeID);
static SCPQuorumSetPtr getQuorumSet(soci::session& sess,
Hash const& qSetHash);

static void dropAll(Database& db);
static void deleteOldEntries(Database& db, uint32_t ledgerSeq,
static void deleteOldEntries(soci::session& sess, uint32_t ledgerSeq,
uint32_t count);
};
}
Loading