Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

History Archive Support for State Archival #4610

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ TEST_FILES = $(TESTDATA_DIR)/stellar-core_example.cfg $(TESTDATA_DIR)/stellar-co
$(TESTDATA_DIR)/stellar-core_testnet.cfg $(TESTDATA_DIR)/stellar-core_testnet_legacy.cfg \
$(TESTDATA_DIR)/stellar-history.testnet.6714239.json $(TESTDATA_DIR)/stellar-history.livenet.15686975.json \
$(TESTDATA_DIR)/stellar-core_testnet_validator.cfg $(TESTDATA_DIR)/stellar-core_example_validators.cfg \
$(TESTDATA_DIR)/stellar-history.testnet.6714239.networkPassphrase.json
$(TESTDATA_DIR)/stellar-history.testnet.6714239.networkPassphrase.json \
$(TESTDATA_DIR)/stellar-history.testnet.6714239.networkPassphrase.v2.json

BUILT_SOURCES = $(SRC_X_FILES:.x=.h) main/StellarCoreVersion.cpp main/XDRFilesSha256.cpp $(TEST_FILES)

Expand Down
2 changes: 1 addition & 1 deletion src/bucket/BucketBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ BucketBase<BucketT, IndexT>::merge(
}
if (countMergeEvents)
{
bucketManager.incrMergeCounters(mc);
bucketManager.incrMergeCounters<BucketT>(mc);
}

std::vector<Hash> shadowHashes;
Expand Down
192 changes: 126 additions & 66 deletions src/bucket/BucketManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,13 +348,6 @@ BucketManager::getBloomLookupMeter() const
{BucketT::METRIC_STRING, "bloom", "lookups"}, "bloom");
}

MergeCounters
BucketManager::readMergeCounters()
{
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
return mMergeCounters;
}

medida::Meter&
BucketManager::getCacheHitMeter() const
{
Expand All @@ -367,11 +360,36 @@ BucketManager::getCacheMissMeter() const
return mCacheMissMeter;
}

template <>
MergeCounters
BucketManager::readMergeCounters<LiveBucket>()
{
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
return mLiveMergeCounters;
}

template <>
MergeCounters
BucketManager::readMergeCounters<HotArchiveBucket>()
{
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
return mHotArchiveMergeCounters;
}

template <>
void
BucketManager::incrMergeCounters(MergeCounters const& delta)
BucketManager::incrMergeCounters<LiveBucket>(MergeCounters const& delta)
{
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
mMergeCounters += delta;
mLiveMergeCounters += delta;
}

template <>
void
BucketManager::incrMergeCounters<HotArchiveBucket>(MergeCounters const& delta)
{
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
mHotArchiveMergeCounters += delta;
}

bool
Expand Down Expand Up @@ -653,7 +671,7 @@ BucketManager::getMergeFutureInternal(MergeKey const& key,
auto future = promise.get_future().share();
promise.set_value(bucket);
mc.mFinishedMergeReattachments++;
incrMergeCounters(mc);
incrMergeCounters<BucketT>(mc);
return future;
}
}
Expand All @@ -668,7 +686,7 @@ BucketManager::getMergeFutureInternal(MergeKey const& key,
"BucketManager::getMergeFuture returning running future for merge {}",
key);
mc.mRunningMergeReattachments++;
incrMergeCounters(mc);
incrMergeCounters<BucketT>(mc);
return i->second;
}

Expand Down Expand Up @@ -1044,10 +1062,10 @@ BucketManager::snapshotLedger(LedgerHeader& currentHeader)
HotArchiveBucket::
FIRST_PROTOCOL_SUPPORTING_PERSISTENT_EVICTION))
{
// TODO: Hash Archive Bucket
// Dependency: HAS supports Hot Archive BucketList

hash = mLiveBucketList->getHash();
SHA256 hsh;
hsh.add(mLiveBucketList->getHash());
hsh.add(mHotArchiveBucketList->getHash());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It lives!

hash = hsh.finish();
}
else
{
Expand Down Expand Up @@ -1094,7 +1112,8 @@ BucketManager::startBackgroundEvictionScan(uint32_t ledgerSeq,
mSnapshotManager->copySearchableLiveBucketListSnapshot();
auto const& sas = cfg.stateArchivalSettings();

using task_t = std::packaged_task<EvictionResultCandidates()>;
using task_t =
std::packaged_task<std::unique_ptr<EvictionResultCandidates>()>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious: why the extra layer of indirection here with the unique_ptr?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I threw a bunch of consts in EvictionResultCandidates, which removed the default copy constructor and copy assignment operator, which I need in BucketManager.cpp::1154. Eviction candidates can be pretty large, so we probably don't want to copy them anyway. I think the correct solution is to make EvictionResultCandidates and use unique_ptr everywhere.

// MSVC gotcha: searchableBL has to be shared_ptr because MSVC wants to
// copy this lambda, otherwise we could use unique_ptr.
auto task = std::make_shared<task_t>(
Expand Down Expand Up @@ -1129,14 +1148,14 @@ BucketManager::resolveBackgroundEvictionScan(

// If eviction related settings changed during the ledger, we have to
// restart the scan
if (!evictionCandidates.isValid(ledgerSeq,
networkConfig.stateArchivalSettings()))
if (!evictionCandidates->isValid(ledgerSeq, ledgerVers,
networkConfig.stateArchivalSettings()))
{
startBackgroundEvictionScan(ledgerSeq, ledgerVers, networkConfig);
evictionCandidates = mEvictionFuture.get();
}

auto& eligibleEntries = evictionCandidates.eligibleEntries;
auto& eligibleEntries = evictionCandidates->eligibleEntries;

for (auto iter = eligibleEntries.begin(); iter != eligibleEntries.end();)
{
Expand All @@ -1154,7 +1173,7 @@ BucketManager::resolveBackgroundEvictionScan(
auto remainingEntriesToEvict =
networkConfig.stateArchivalSettings().maxEntriesToArchive;
auto entryToEvictIter = eligibleEntries.begin();
auto newEvictionIterator = evictionCandidates.endOfRegionIterator;
auto newEvictionIterator = evictionCandidates->endOfRegionIterator;

// Return vectors include both evicted entry and associated TTL
std::vector<LedgerKey> deletedKeys;
Expand Down Expand Up @@ -1194,7 +1213,7 @@ BucketManager::resolveBackgroundEvictionScan(
// region
if (remainingEntriesToEvict != 0)
{
newEvictionIterator = evictionCandidates.endOfRegionIterator;
newEvictionIterator = evictionCandidates->endOfRegionIterator;
}

networkConfig.updateEvictionIterator(ltx, newEvictionIterator);
Expand Down Expand Up @@ -1250,53 +1269,71 @@ BucketManager::assumeState(HistoryArchiveState const& has,
releaseAssert(threadIsMain());
releaseAssertOrThrow(mConfig.MODE_ENABLES_BUCKETLIST);

// TODO: Assume archival bucket state
// Dependency: HAS supports Hot Archive BucketList
for (uint32_t i = 0; i < LiveBucketList::kNumLevels; ++i)
{
auto curr = getBucketByHashInternal(
hexToBin256(has.currentBuckets.at(i).curr), mSharedLiveBuckets);
auto snap = getBucketByHashInternal(
hexToBin256(has.currentBuckets.at(i).snap), mSharedLiveBuckets);
if (!(curr && snap))
auto processBucketList = [&](auto& bl, auto const& hasBuckets) {
auto kNumLevels = std::remove_reference<decltype(bl)>::type::kNumLevels;
using BucketT =
typename std::remove_reference<decltype(bl)>::type::bucket_type;
for (uint32_t i = 0; i < kNumLevels; ++i)
{
throw std::runtime_error("Missing bucket files while assuming "
"saved live BucketList state");
}
auto curr =
getBucketByHash<BucketT>(hexToBin256(hasBuckets.at(i).curr));
auto snap =
getBucketByHash<BucketT>(hexToBin256(hasBuckets.at(i).snap));
if (!(curr && snap))
{
throw std::runtime_error("Missing bucket files while assuming "
"saved live BucketList state");
}

auto const& nextFuture = has.currentBuckets.at(i).next;
std::shared_ptr<LiveBucket> nextBucket = nullptr;
if (nextFuture.hasOutputHash())
{
nextBucket = getBucketByHashInternal(
hexToBin256(nextFuture.getOutputHash()), mSharedLiveBuckets);
if (!nextBucket)
auto const& nextFuture = hasBuckets.at(i).next;
std::shared_ptr<BucketT> nextBucket = nullptr;
if (nextFuture.hasOutputHash())
{
throw std::runtime_error(
"Missing future bucket files while "
"assuming saved live BucketList state");
nextBucket = getBucketByHash<BucketT>(
hexToBin256(nextFuture.getOutputHash()));
if (!nextBucket)
{
throw std::runtime_error(
"Missing future bucket files while "
"assuming saved live BucketList state");
}
}
}

// Buckets on the BucketList should always be indexed
releaseAssert(curr->isEmpty() || curr->isIndexed());
releaseAssert(snap->isEmpty() || snap->isIndexed());
if (nextBucket)
{
releaseAssert(nextBucket->isEmpty() || nextBucket->isIndexed());
// Buckets on the BucketList should always be indexed
releaseAssert(curr->isEmpty() || curr->isIndexed());
releaseAssert(snap->isEmpty() || snap->isIndexed());
if (nextBucket)
{
releaseAssert(nextBucket->isEmpty() || nextBucket->isIndexed());
}

bl.getLevel(i).setCurr(curr);
bl.getLevel(i).setSnap(snap);
bl.getLevel(i).setNext(nextFuture);
}
};

mLiveBucketList->getLevel(i).setCurr(curr);
mLiveBucketList->getLevel(i).setSnap(snap);
mLiveBucketList->getLevel(i).setNext(nextFuture);
processBucketList(*mLiveBucketList, has.currentBuckets);
#ifdef ENABLE_NEXT_PROTOCOL_VERSION_UNSAFE_FOR_PRODUCTION
if (has.hasHotArchiveBuckets())
{
processBucketList(*mHotArchiveBucketList, has.hotArchiveBuckets);
}
#endif

mLiveBucketList->maybeInitializeCaches(mConfig);

if (restartMerges)
{
mLiveBucketList->restartMerges(mApp, maxProtocolVersion,
has.currentLedger);
#ifdef ENABLE_NEXT_PROTOCOL_VERSION_UNSAFE_FOR_PRODUCTION
if (has.hasHotArchiveBuckets())
{
mHotArchiveBucketList->restartMerges(mApp, maxProtocolVersion,
has.currentLedger);
}
#endif
}
cleanupStaleFiles(has);
}
Expand Down Expand Up @@ -1372,7 +1409,8 @@ BucketManager::loadCompleteLedgerState(HistoryArchiveState const& has)
std::vector<std::pair<Hash, std::string>> hashes;
for (uint32_t i = LiveBucketList::kNumLevels; i > 0; --i)
{
HistoryStateBucket const& hsb = has.currentBuckets.at(i - 1);
HistoryStateBucket<LiveBucket> const& hsb =
has.currentBuckets.at(i - 1);
hashes.emplace_back(hexToBin256(hsb.snap),
fmt::format(FMT_STRING("snap {:d}"), i - 1));
hashes.emplace_back(hexToBin256(hsb.curr),
Expand Down Expand Up @@ -1549,7 +1587,7 @@ BucketManager::visitLedgerEntries(
std::vector<std::pair<Hash, std::string>> hashes;
for (uint32_t i = 0; i < LiveBucketList::kNumLevels; ++i)
{
HistoryStateBucket const& hsb = has.currentBuckets.at(i);
HistoryStateBucket<LiveBucket> const& hsb = has.currentBuckets.at(i);
hashes.emplace_back(hexToBin256(hsb.curr),
fmt::format(FMT_STRING("curr {:d}"), i));
hashes.emplace_back(hexToBin256(hsb.snap),
Expand Down Expand Up @@ -1599,7 +1637,9 @@ BucketManager::scheduleVerifyReferencedBucketsWork(
// Persist a map of indexes so we don't have dangling references in
// VerifyBucketsWork. We don't actually need to use the indexes created by
// VerifyBucketsWork here, so a throwaway static map is fine.
static std::map<int, std::unique_ptr<LiveBucketIndex const>> indexMap;
static std::map<int, std::unique_ptr<LiveBucketIndex const>> liveIndexMap;
static std::map<int, std::unique_ptr<HotArchiveBucketIndex const>>
hotIndexMap;

int i = 0;
for (auto const& h : hashes)
Expand All @@ -1609,19 +1649,39 @@ BucketManager::scheduleVerifyReferencedBucketsWork(
continue;
}

// TODO: Update verify to for ArchiveBucket
// Dependency: HAS supports Hot Archive BucketList
auto b = getBucketByHashInternal(h, mSharedLiveBuckets);
if (!b)
auto maybeLiveBucket = getBucketByHashInternal(h, mSharedLiveBuckets);
if (!maybeLiveBucket)
{
throw std::runtime_error(fmt::format(
FMT_STRING("Missing referenced bucket {}"), binToHex(h)));
auto hotBucket =
getBucketByHashInternal(h, mSharedHotArchiveBuckets);

// Check both live and hot archive buckets for hash. If we don't
// find it in either, we're missing a bucket. Note that live and
// hot archive buckets are guaranteed to have no hash collisions
// due to type field in MetaEntry.
if (!hotBucket)
{
throw std::runtime_error(fmt::format(
FMT_STRING("Missing referenced bucket {}"), binToHex(h)));
}

auto filename = hotBucket->getFilename().string();
auto hash = hotBucket->getHash();
auto [indexIter, _] = hotIndexMap.emplace(i++, nullptr);

seq.emplace_back(
std::make_shared<VerifyBucketWork<HotArchiveBucket>>(
mApp, filename, hash, indexIter->second, nullptr));
}
else
{
auto filename = maybeLiveBucket->getFilename().string();
auto hash = maybeLiveBucket->getHash();
auto [indexIter, _] = liveIndexMap.emplace(i++, nullptr);

auto [indexIter, _] = indexMap.emplace(i++, nullptr);
seq.emplace_back(std::make_shared<VerifyBucketWork>(
mApp, b->getFilename().string(), b->getHash(), indexIter->second,
nullptr));
seq.emplace_back(std::make_shared<VerifyBucketWork<LiveBucket>>(
mApp, filename, hash, indexIter->second, nullptr));
}
}
return mApp.getWorkScheduler().scheduleWork<WorkSequence>(
"verify-referenced-buckets", seq);
Expand Down
9 changes: 5 additions & 4 deletions src/bucket/BucketManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,15 @@ class BucketManager : NonMovableOrCopyable
medida::Meter& mCacheHitMeter;
medida::Meter& mCacheMissMeter;
EvictionCounters mBucketListEvictionCounters;
MergeCounters mMergeCounters;
MergeCounters mLiveMergeCounters;
MergeCounters mHotArchiveMergeCounters;
std::shared_ptr<EvictionStatistics> mEvictionStatistics{};
std::map<LedgerEntryTypeAndDurability, medida::Counter&>
mBucketListEntryCountCounters;
std::map<LedgerEntryTypeAndDurability, medida::Counter&>
mBucketListEntrySizeCounters;

std::future<EvictionResultCandidates> mEvictionFuture{};
std::future<std::unique_ptr<EvictionResultCandidates>> mEvictionFuture{};

// Copy app's config for thread-safe access
Config const mConfig;
Expand Down Expand Up @@ -204,8 +205,8 @@ class BucketManager : NonMovableOrCopyable

// Reading and writing the merge counters is done in bulk, and takes a lock
// briefly; this can be done from any thread.
MergeCounters readMergeCounters();
void incrMergeCounters(MergeCounters const& delta);
template <class BucketT> MergeCounters readMergeCounters();
template <class BucketT> void incrMergeCounters(MergeCounters const& delta);

// Get a reference to a persistent bucket (in the BucketManager's bucket
// directory), from the BucketManager's shared bucket-set.
Expand Down
20 changes: 18 additions & 2 deletions src/bucket/BucketUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,26 @@ MergeCounters::operator==(MergeCounters const& other) const
// Check that eviction scan is based off of current ledger snapshot and that
// archival settings have not changed
bool
EvictionResultCandidates::isValid(uint32_t currLedger,
EvictionResultCandidates::isValid(uint32_t currLedgerSeq,
uint32_t currLedgerVers,
StateArchivalSettings const& currSas) const
{
return initialLedger == currLedger &&
// If the eviction scan started before a protocol upgrade, and the protocol
// upgrade changes eviction scan behavior during the scan, we need
// to restart with the new protocol version. We only care about
// `FIRST_PROTOCOL_SUPPORTING_PERSISTENT_EVICTION`, other upgrades don't
// affect evictions scans.
if (protocolVersionIsBefore(
initialLedgerVers,
LiveBucket::FIRST_PROTOCOL_SUPPORTING_PERSISTENT_EVICTION) &&
protocolVersionStartsFrom(
currLedgerVers,
LiveBucket::FIRST_PROTOCOL_SUPPORTING_PERSISTENT_EVICTION))
{
return false;
}

return initialLedgerSeq == currLedgerSeq &&
initialSas.maxEntriesToArchive == currSas.maxEntriesToArchive &&
initialSas.evictionScanSize == currSas.evictionScanSize &&
initialSas.startingEvictionScanLevel ==
Expand Down
Loading