Skip to content

Commit 796392c

Browse files
committed
assumeState and catchup tests for Hot Archive BucketList
1 parent 3c0b7cf commit 796392c

38 files changed

+1368
-528
lines changed

src/bucket/BucketBase.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,7 @@ BucketBase::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,
385385
}
386386
if (countMergeEvents)
387387
{
388-
bucketManager.incrMergeCounters(mc);
388+
bucketManager.incrMergeCounters<BucketT>(mc);
389389
}
390390

391391
std::vector<Hash> shadowHashes;

src/bucket/BucketManager.cpp

+108-51
Original file line numberDiff line numberDiff line change
@@ -330,18 +330,36 @@ BucketManager::getMergeTimer()
330330
return mBucketSnapMerge;
331331
}
332332

333+
template <>
334+
MergeCounters
335+
BucketManager::readMergeCounters<LiveBucket>()
336+
{
337+
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
338+
return mLiveMergeCounters;
339+
}
340+
341+
template <>
333342
MergeCounters
334-
BucketManager::readMergeCounters()
343+
BucketManager::readMergeCounters<HotArchiveBucket>()
344+
{
345+
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
346+
return mHotArchiveMergeCounters;
347+
}
348+
349+
template <>
350+
void
351+
BucketManager::incrMergeCounters<LiveBucket>(MergeCounters const& delta)
335352
{
336353
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
337-
return mMergeCounters;
354+
mLiveMergeCounters += delta;
338355
}
339356

357+
template <>
340358
void
341-
BucketManager::incrMergeCounters(MergeCounters const& delta)
359+
BucketManager::incrMergeCounters<HotArchiveBucket>(MergeCounters const& delta)
342360
{
343361
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
344-
mMergeCounters += delta;
362+
mHotArchiveMergeCounters += delta;
345363
}
346364

347365
bool
@@ -623,7 +641,7 @@ BucketManager::getMergeFutureInternal(MergeKey const& key,
623641
auto future = promise.get_future().share();
624642
promise.set_value(bucket);
625643
mc.mFinishedMergeReattachments++;
626-
incrMergeCounters(mc);
644+
incrMergeCounters<BucketT>(mc);
627645
return future;
628646
}
629647
}
@@ -638,7 +656,7 @@ BucketManager::getMergeFutureInternal(MergeKey const& key,
638656
"BucketManager::getMergeFuture returning running future for merge {}",
639657
key);
640658
mc.mRunningMergeReattachments++;
641-
incrMergeCounters(mc);
659+
incrMergeCounters<BucketT>(mc);
642660
return i->second;
643661
}
644662

@@ -1013,10 +1031,10 @@ BucketManager::snapshotLedger(LedgerHeader& currentHeader)
10131031
currentHeader.ledgerVersion,
10141032
BucketBase::FIRST_PROTOCOL_SUPPORTING_PERSISTENT_EVICTION))
10151033
{
1016-
// TODO: Hash Archive Bucket
1017-
// Dependency: HAS supports Hot Archive BucketList
1018-
1019-
hash = mLiveBucketList->getHash();
1034+
SHA256 hsh;
1035+
hsh.add(mLiveBucketList->getHash());
1036+
hsh.add(mHotArchiveBucketList->getHash());
1037+
hash = hsh.finish();
10201038
}
10211039
else
10221040
{
@@ -1229,51 +1247,71 @@ BucketManager::assumeState(HistoryArchiveState const& has,
12291247
releaseAssert(threadIsMain());
12301248
releaseAssertOrThrow(mConfig.MODE_ENABLES_BUCKETLIST);
12311249

1232-
// TODO: Assume archival bucket state
12331250
// Dependency: HAS supports Hot Archive BucketList
1234-
for (uint32_t i = 0; i < LiveBucketList::kNumLevels; ++i)
1235-
{
1236-
auto curr = getBucketByHashInternal(
1237-
hexToBin256(has.currentBuckets.at(i).curr), mSharedLiveBuckets);
1238-
auto snap = getBucketByHashInternal(
1239-
hexToBin256(has.currentBuckets.at(i).snap), mSharedLiveBuckets);
1240-
if (!(curr && snap))
1241-
{
1242-
throw std::runtime_error("Missing bucket files while assuming "
1243-
"saved live BucketList state");
1244-
}
12451251

1246-
auto const& nextFuture = has.currentBuckets.at(i).next;
1247-
std::shared_ptr<LiveBucket> nextBucket = nullptr;
1248-
if (nextFuture.hasOutputHash())
1252+
auto processBucketList = [&](auto& bl, auto const& hasBuckets) {
1253+
auto kNumLevels = std::remove_reference<decltype(bl)>::type::kNumLevels;
1254+
using BucketT =
1255+
typename std::remove_reference<decltype(bl)>::type::bucket_type;
1256+
for (uint32_t i = 0; i < kNumLevels; ++i)
12491257
{
1250-
nextBucket = getBucketByHashInternal(
1251-
hexToBin256(nextFuture.getOutputHash()), mSharedLiveBuckets);
1252-
if (!nextBucket)
1258+
auto curr =
1259+
getBucketByHash<BucketT>(hexToBin256(hasBuckets.at(i).curr));
1260+
auto snap =
1261+
getBucketByHash<BucketT>(hexToBin256(hasBuckets.at(i).snap));
1262+
if (!(curr && snap))
12531263
{
1254-
throw std::runtime_error(
1255-
"Missing future bucket files while "
1256-
"assuming saved live BucketList state");
1264+
throw std::runtime_error("Missing bucket files while assuming "
1265+
"saved live BucketList state");
12571266
}
1258-
}
12591267

1260-
// Buckets on the BucketList should always be indexed
1261-
releaseAssert(curr->isEmpty() || curr->isIndexed());
1262-
releaseAssert(snap->isEmpty() || snap->isIndexed());
1263-
if (nextBucket)
1264-
{
1265-
releaseAssert(nextBucket->isEmpty() || nextBucket->isIndexed());
1268+
auto const& nextFuture = hasBuckets.at(i).next;
1269+
std::shared_ptr<BucketT> nextBucket = nullptr;
1270+
if (nextFuture.hasOutputHash())
1271+
{
1272+
nextBucket = getBucketByHash<BucketT>(
1273+
hexToBin256(nextFuture.getOutputHash()));
1274+
if (!nextBucket)
1275+
{
1276+
throw std::runtime_error(
1277+
"Missing future bucket files while "
1278+
"assuming saved live BucketList state");
1279+
}
1280+
}
1281+
1282+
// Buckets on the BucketList should always be indexed
1283+
releaseAssert(curr->isEmpty() || curr->isIndexed());
1284+
releaseAssert(snap->isEmpty() || snap->isIndexed());
1285+
if (nextBucket)
1286+
{
1287+
releaseAssert(nextBucket->isEmpty() || nextBucket->isIndexed());
1288+
}
1289+
1290+
bl.getLevel(i).setCurr(curr);
1291+
bl.getLevel(i).setSnap(snap);
1292+
bl.getLevel(i).setNext(nextFuture);
12661293
}
1294+
};
12671295

1268-
mLiveBucketList->getLevel(i).setCurr(curr);
1269-
mLiveBucketList->getLevel(i).setSnap(snap);
1270-
mLiveBucketList->getLevel(i).setNext(nextFuture);
1296+
processBucketList(*mLiveBucketList, has.currentBuckets);
1297+
#ifdef ENABLE_NEXT_PROTOCOL_VERSION_UNSAFE_FOR_PRODUCTION
1298+
if (has.hasHotArchiveBuckets())
1299+
{
1300+
processBucketList(*mHotArchiveBucketList, has.hotArchiveBuckets);
12711301
}
1302+
#endif
12721303

12731304
if (restartMerges)
12741305
{
12751306
mLiveBucketList->restartMerges(mApp, maxProtocolVersion,
12761307
has.currentLedger);
1308+
#ifdef ENABLE_NEXT_PROTOCOL_VERSION_UNSAFE_FOR_PRODUCTION
1309+
if (has.hasHotArchiveBuckets())
1310+
{
1311+
mHotArchiveBucketList->restartMerges(mApp, maxProtocolVersion,
1312+
has.currentLedger);
1313+
}
1314+
#endif
12771315
}
12781316
cleanupStaleFiles(has);
12791317
}
@@ -1580,16 +1618,35 @@ BucketManager::scheduleVerifyReferencedBucketsWork(
15801618
continue;
15811619
}
15821620

1583-
// TODO: Update verify to for ArchiveBucket
1584-
// Dependency: HAS supports Hot Archive BucketList
1585-
auto b = getBucketByHashInternal(h, mSharedLiveBuckets);
1586-
if (!b)
1587-
{
1588-
throw std::runtime_error(fmt::format(
1589-
FMT_STRING("Missing referenced bucket {}"), binToHex(h)));
1590-
}
1591-
seq.emplace_back(std::make_shared<VerifyBucketWork>(
1592-
mApp, b->getFilename().string(), b->getHash(), nullptr));
1621+
auto loadFilenameAndHash = [&]() -> std::pair<std::string, Hash> {
1622+
auto live = getBucketByHashInternal(h, mSharedLiveBuckets);
1623+
if (!live)
1624+
{
1625+
auto hot = getBucketByHashInternal(h, mSharedHotArchiveBuckets);
1626+
1627+
// Check both live and hot archive buckets for hash. If we don't
1628+
// find it in either, we're missing a bucket. Note that live and
1629+
// hot archive buckets are guaranteed to have no hash collisions
1630+
// due to type field in MetaEntry.
1631+
if (!hot)
1632+
{
1633+
throw std::runtime_error(
1634+
fmt::format(FMT_STRING("Missing referenced bucket {}"),
1635+
binToHex(h)));
1636+
}
1637+
return std::make_pair(hot->getFilename().string(),
1638+
hot->getHash());
1639+
}
1640+
else
1641+
{
1642+
return std::make_pair(live->getFilename().string(),
1643+
live->getHash());
1644+
}
1645+
};
1646+
1647+
auto [filename, hash] = loadFilenameAndHash();
1648+
seq.emplace_back(
1649+
std::make_shared<VerifyBucketWork>(mApp, filename, hash, nullptr));
15931650
}
15941651
return mApp.getWorkScheduler().scheduleWork<WorkSequence>(
15951652
"verify-referenced-buckets", seq);

src/bucket/BucketManager.h

+4-3
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@ class BucketManager : NonMovableOrCopyable
103103
medida::Counter& mLiveBucketListSizeCounter;
104104
medida::Counter& mArchiveBucketListSizeCounter;
105105
EvictionCounters mBucketListEvictionCounters;
106-
MergeCounters mMergeCounters;
106+
MergeCounters mLiveMergeCounters;
107+
MergeCounters mHotArchiveMergeCounters;
107108
std::shared_ptr<EvictionStatistics> mEvictionStatistics{};
108109
std::map<LedgerEntryTypeAndDurability, medida::Counter&>
109110
mBucketListEntryCountCounters;
@@ -203,8 +204,8 @@ class BucketManager : NonMovableOrCopyable
203204

204205
// Reading and writing the merge counters is done in bulk, and takes a lock
205206
// briefly; this can be done from any thread.
206-
MergeCounters readMergeCounters();
207-
void incrMergeCounters(MergeCounters const& delta);
207+
template <class BucketT> MergeCounters readMergeCounters();
208+
template <class BucketT> void incrMergeCounters(MergeCounters const& delta);
208209

209210
// Get a reference to a persistent bucket (in the BucketManager's bucket
210211
// directory), from the BucketManager's shared bucket-set.

src/bucket/HotArchiveBucket.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ HotArchiveBucket::fresh(BucketManager& bucketManager, uint32_t protocolVersion,
3636

3737
if (countMergeEvents)
3838
{
39-
bucketManager.incrMergeCounters(mc);
39+
bucketManager.incrMergeCounters<HotArchiveBucket>(mc);
4040
}
4141

4242
return out.getBucket(bucketManager);

src/bucket/HotArchiveBucket.h

+5-5
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,6 @@ typedef BucketOutputIterator<HotArchiveBucket> HotArchiveBucketOutputIterator;
2525
class HotArchiveBucket : public BucketBase,
2626
public std::enable_shared_from_this<HotArchiveBucket>
2727
{
28-
static std::vector<HotArchiveBucketEntry>
29-
convertToBucketEntry(std::vector<LedgerEntry> const& archivedEntries,
30-
std::vector<LedgerKey> const& restoredEntries,
31-
std::vector<LedgerKey> const& deletedEntries);
32-
3328
public:
3429
// Entry type that this bucket stores
3530
using EntryT = HotArchiveBucketEntry;
@@ -91,6 +86,11 @@ class HotArchiveBucket : public BucketBase,
9186
static std::shared_ptr<LoadT>
9287
bucketEntryToLoadResult(std::shared_ptr<EntryT> const& be);
9388

89+
static std::vector<HotArchiveBucketEntry>
90+
convertToBucketEntry(std::vector<LedgerEntry> const& archivedEntries,
91+
std::vector<LedgerKey> const& restoredEntries,
92+
std::vector<LedgerKey> const& deletedEntries);
93+
9494
friend class HotArchiveBucketSnapshot;
9595
};
9696
}

src/bucket/HotArchiveBucketList.h

+2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ namespace stellar
1515
class HotArchiveBucketList : public BucketListBase<HotArchiveBucket>
1616
{
1717
public:
18+
using bucket_type = HotArchiveBucket;
19+
1820
void addBatch(Application& app, uint32_t currLedger,
1921
uint32_t currLedgerProtocol,
2022
std::vector<LedgerEntry> const& archiveEntries,

src/bucket/LiveBucket.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ LiveBucket::fresh(BucketManager& bucketManager, uint32_t protocolVersion,
387387

388388
if (countMergeEvents)
389389
{
390-
bucketManager.incrMergeCounters(mc);
390+
bucketManager.incrMergeCounters<LiveBucket>(mc);
391391
}
392392

393393
return out.getBucket(bucketManager);

src/bucket/LiveBucketList.h

+2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ namespace stellar
1717
class LiveBucketList : public BucketListBase<LiveBucket>
1818
{
1919
public:
20+
using bucket_type = LiveBucket;
21+
2022
// Reset Eviction Iterator position if an incoming spill or upgrade has
2123
// invalidated the previous position
2224
static void updateStartingEvictionIterator(EvictionIterator& iter,

src/bucket/test/BucketListTests.cpp

+4-3
Original file line numberDiff line numberDiff line change
@@ -1139,9 +1139,10 @@ TEST_CASE_VERSIONS("eviction scan", "[bucketlist][archival]")
11391139
// Close ledgers until evicted DEADENTRYs merge with
11401140
// original INITENTRYs. This checks that BucketList
11411141
// invariants are respected
1142-
for (auto initialDeadMerges =
1143-
bm.readMergeCounters().mOldInitEntriesMergedWithNewDead;
1144-
bm.readMergeCounters().mOldInitEntriesMergedWithNewDead <
1142+
for (auto initialDeadMerges = bm.readMergeCounters<LiveBucket>()
1143+
.mOldInitEntriesMergedWithNewDead;
1144+
bm.readMergeCounters<LiveBucket>()
1145+
.mOldInitEntriesMergedWithNewDead <
11451146
initialDeadMerges + tempEntries.size();
11461147
++ledgerSeq)
11471148
{

0 commit comments

Comments
 (0)