Skip to content

Commit 7674f7e

Browse files
committed
Minor fixes
1 parent be126ad commit 7674f7e

11 files changed

+137
-109
lines changed

src/bucket/BucketList.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ BucketLevel::getNext()
6363
void
6464
BucketLevel::setNext(FutureBucket const& fb)
6565
{
66-
releaseAssert(threadIsMain());
66+
assertThreadIsMain();
6767
mNextCurr = fb;
6868
}
6969

@@ -82,7 +82,7 @@ BucketLevel::getSnap() const
8282
void
8383
BucketLevel::setCurr(std::shared_ptr<Bucket> b)
8484
{
85-
releaseAssert(threadIsMain());
85+
assertThreadIsMain();
8686
mNextCurr.clear();
8787
mCurr = b;
8888
}
@@ -117,7 +117,7 @@ BucketList::shouldMergeWithEmptyCurr(uint32_t ledger, uint32_t level)
117117
void
118118
BucketLevel::setSnap(std::shared_ptr<Bucket> b)
119119
{
120-
releaseAssert(threadIsMain());
120+
assertThreadIsMain();
121121
mSnap = b;
122122
}
123123

src/bucket/BucketList.h

+1
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,7 @@ class BucketList
478478
// Return level `i` of the BucketList.
479479
BucketLevel const& getLevel(uint32_t i) const;
480480

481+
// Return level `i` of the BucketList.
481482
BucketLevel& getLevel(uint32_t i);
482483

483484
// Return a cumulative hash of the entire bucketlist; this is the hash of

src/bucket/BucketListSnapshot.cpp

+8-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ namespace stellar
1616
BucketListSnapshot::BucketListSnapshot(BucketList const& bl, uint32_t ledgerSeq)
1717
: mLedgerSeq(ledgerSeq)
1818
{
19-
releaseAssert(threadIsMain());
19+
assertThreadIsMain();
2020

2121
for (uint32_t i = 0; i < BucketList::kNumLevels; ++i)
2222
{
@@ -25,6 +25,11 @@ BucketListSnapshot::BucketListSnapshot(BucketList const& bl, uint32_t ledgerSeq)
2525
}
2626
}
2727

28+
BucketListSnapshot::BucketListSnapshot(BucketListSnapshot const& snapshot)
29+
: mLevels(snapshot.mLevels), mLedgerSeq(snapshot.mLedgerSeq)
30+
{
31+
}
32+
2833
std::vector<BucketLevelSnapshot> const&
2934
BucketListSnapshot::getLevels() const
3035
{
@@ -153,7 +158,7 @@ SearchableBucketListSnapshot::loadPoolShareTrustLinesByAccountAndAsset(
153158
ZoneScoped;
154159

155160
// This query should only be called during TX apply
156-
releaseAssert(threadIsMain());
161+
assertThreadIsMain();
157162
mSnapshotManager.maybeUpdateSnapshot(mSnapshot);
158163

159164
LedgerKeySet trustlinesToLoad;
@@ -189,7 +194,7 @@ SearchableBucketListSnapshot::loadInflationWinners(size_t maxWinners,
189194

190195
// This is a legacy query, should only be called by main thread during
191196
// catchup
192-
releaseAssert(threadIsMain());
197+
assertThreadIsMain();
193198
auto timer = mSnapshotManager.recordBulkLoadMetrics("inflationWinners", 0)
194199
.TimeScope();
195200

src/bucket/BucketListSnapshot.h

+16-3
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ struct BucketLevelSnapshot
2525
BucketLevelSnapshot(BucketLevel const& level);
2626
};
2727

28-
class BucketListSnapshot
28+
class BucketListSnapshot : public NonMovable
2929
{
3030
private:
3131
std::vector<BucketLevelSnapshot> mLevels;
@@ -36,15 +36,28 @@ class BucketListSnapshot
3636
public:
3737
BucketListSnapshot(BucketList const& bl, uint32_t ledgerSeq);
3838

39+
// Only allow copies via constructor
40+
BucketListSnapshot(BucketListSnapshot const& snapshot);
41+
BucketListSnapshot& operator=(BucketListSnapshot const&) = delete;
42+
3943
std::vector<BucketLevelSnapshot> const& getLevels() const;
4044
uint32_t getLedgerSeq() const;
4145
};
4246

43-
// A lightweight wrapper around the BucketList for thread safe BucketListDB
44-
// lookups
47+
// A lightweight wrapper around BucketListSnapshot for thread safe BucketListDB
48+
// lookups.
49+
//
50+
// Any thread that needs to perform BucketList lookups should retrieve
51+
// a single SearchableBucketListSnapshot instance from
52+
// BucketListSnapshotManager. On each lookup, the SearchableBucketListSnapshot
53+
// instance will check that the current snapshot is up to date via the
54+
// BucketListSnapshotManager and will be refreshed accordingly. Callers can
55+
// assume SearchableBucketListSnapshot is always up to date.
4556
class SearchableBucketListSnapshot : public NonMovableOrCopyable
4657
{
4758
BucketSnapshotManager const& mSnapshotManager;
59+
60+
// Snapshot managed by SnapshotManager
4861
std::unique_ptr<BucketListSnapshot const> mSnapshot{};
4962

5063
// Loops through all buckets, starting with curr at level 0, then snap at

src/bucket/BucketManagerImpl.cpp

+66-60
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,13 @@ BucketManagerImpl::initialize()
8989
if (mApp.getConfig().MODE_ENABLES_BUCKETLIST)
9090
{
9191
mBucketList = std::make_unique<BucketList>();
92-
mSnapshotManager = std::make_unique<BucketSnapshotManager>(
93-
mApp.getMetrics(),
94-
std::make_unique<BucketListSnapshot>(*mBucketList, 0));
92+
93+
if (mApp.getConfig().isUsingBucketListDB())
94+
{
95+
mSnapshotManager = std::make_unique<BucketSnapshotManager>(
96+
mApp.getMetrics(),
97+
std::make_unique<BucketListSnapshot>(*mBucketList, 0));
98+
}
9599
}
96100
}
97101

@@ -185,7 +189,7 @@ std::string const&
185189
BucketManagerImpl::getTmpDir()
186190
{
187191
ZoneScoped;
188-
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
192+
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
189193
if (!mWorkDir)
190194
{
191195
TmpDir t = mTmpDirManager->tmpDir("bucket");
@@ -268,7 +272,8 @@ BucketManagerImpl::getBucketList()
268272
BucketSnapshotManager&
269273
BucketManagerImpl::getBucketSnapshotManager() const
270274
{
271-
releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST);
275+
releaseAssertOrThrow(mApp.getConfig().isUsingBucketListDB());
276+
releaseAssert(mSnapshotManager);
272277
return *mSnapshotManager;
273278
}
274279

@@ -281,7 +286,7 @@ BucketManagerImpl::getMergeTimer()
281286
MergeCounters
282287
BucketManagerImpl::readMergeCounters()
283288
{
284-
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
289+
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
285290
return mMergeCounters;
286291
}
287292

@@ -371,7 +376,7 @@ MergeCounters::operator==(MergeCounters const& other) const
371376
void
372377
BucketManagerImpl::incrMergeCounters(MergeCounters const& delta)
373378
{
374-
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
379+
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
375380
mMergeCounters += delta;
376381
}
377382

@@ -397,7 +402,7 @@ BucketManagerImpl::adoptFileAsBucket(std::string const& filename,
397402
{
398403
ZoneScoped;
399404
releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST);
400-
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
405+
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
401406

402407
if (mergeKey)
403408
{
@@ -481,7 +486,7 @@ BucketManagerImpl::noteEmptyMergeOutput(MergeKey const& mergeKey)
481486
// because it'd over-identify multiple individual inputs with the empty
482487
// output, potentially retaining far too many inputs, as lots of different
483488
// mergeKeys result in an empty output.
484-
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
489+
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
485490
CLOG_TRACE(Bucket, "BucketManager::noteEmptyMergeOutput({})", mergeKey);
486491
mLiveFutures.erase(mergeKey);
487492
}
@@ -490,7 +495,7 @@ std::shared_ptr<Bucket>
490495
BucketManagerImpl::getBucketIfExists(uint256 const& hash)
491496
{
492497
ZoneScoped;
493-
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
498+
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
494499
auto i = mSharedBuckets.find(hash);
495500
if (i != mSharedBuckets.end())
496501
{
@@ -507,7 +512,7 @@ std::shared_ptr<Bucket>
507512
BucketManagerImpl::getBucketByHash(uint256 const& hash)
508513
{
509514
ZoneScoped;
510-
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
515+
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
511516
if (isZero(hash))
512517
{
513518
return std::make_shared<Bucket>();
@@ -540,7 +545,7 @@ std::shared_future<std::shared_ptr<Bucket>>
540545
BucketManagerImpl::getMergeFuture(MergeKey const& key)
541546
{
542547
ZoneScoped;
543-
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
548+
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
544549
MergeCounters mc;
545550
auto i = mLiveFutures.find(key);
546551
if (i == mLiveFutures.end())
@@ -586,7 +591,7 @@ BucketManagerImpl::putMergeFuture(
586591
{
587592
ZoneScoped;
588593
releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST);
589-
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
594+
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
590595
CLOG_TRACE(
591596
Bucket,
592597
"BucketManager::putMergeFuture storing future for running merge {}",
@@ -598,7 +603,7 @@ BucketManagerImpl::putMergeFuture(
598603
void
599604
BucketManagerImpl::clearMergeFuturesForTesting()
600605
{
601-
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
606+
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
602607
mLiveFutures.clear();
603608
}
604609
#endif
@@ -697,7 +702,7 @@ BucketManagerImpl::cleanupStaleFiles()
697702
return;
698703
}
699704

700-
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
705+
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
701706
auto referenced = getAllReferencedBuckets();
702707
std::transform(std::begin(mSharedBuckets), std::end(mSharedBuckets),
703708
std::inserter(referenced, std::end(referenced)),
@@ -727,7 +732,7 @@ void
727732
BucketManagerImpl::forgetUnreferencedBuckets()
728733
{
729734
ZoneScoped;
730-
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
735+
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
731736
auto referenced = getAllReferencedBuckets();
732737
auto blReferenced = getBucketListReferencedBuckets();
733738

@@ -837,8 +842,12 @@ BucketManagerImpl::addBatch(Application& app, uint32_t currLedger,
837842
mBucketList->addBatch(app, currLedger, currLedgerProtocol, initEntries,
838843
liveEntries, deadEntries);
839844
mBucketListSizeCounter.set_count(mBucketList->getSize());
840-
mSnapshotManager->updateCurrentSnapshot(
841-
std::make_unique<BucketListSnapshot>(*mBucketList, currLedger));
845+
846+
if (app.getConfig().isUsingBucketListDB())
847+
{
848+
mSnapshotManager->updateCurrentSnapshot(
849+
std::make_unique<BucketListSnapshot>(*mBucketList, currLedger));
850+
}
842851
}
843852

844853
#ifdef BUILD_TESTS
@@ -978,60 +987,57 @@ BucketManagerImpl::assumeState(HistoryArchiveState const& has,
978987
ZoneScoped;
979988
releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST);
980989

990+
for (uint32_t i = 0; i < BucketList::kNumLevels; ++i)
981991
{
982-
for (uint32_t i = 0; i < BucketList::kNumLevels; ++i)
992+
auto curr = getBucketByHash(hexToBin256(has.currentBuckets.at(i).curr));
993+
auto snap = getBucketByHash(hexToBin256(has.currentBuckets.at(i).snap));
994+
if (!(curr && snap))
983995
{
984-
auto curr =
985-
getBucketByHash(hexToBin256(has.currentBuckets.at(i).curr));
986-
auto snap =
987-
getBucketByHash(hexToBin256(has.currentBuckets.at(i).snap));
988-
if (!(curr && snap))
989-
{
990-
throw std::runtime_error("Missing bucket files while assuming "
991-
"saved BucketList state");
992-
}
996+
throw std::runtime_error("Missing bucket files while assuming "
997+
"saved BucketList state");
998+
}
993999

994-
auto const& nextFuture = has.currentBuckets.at(i).next;
995-
std::shared_ptr<Bucket> nextBucket = nullptr;
996-
if (nextFuture.hasOutputHash())
1000+
auto const& nextFuture = has.currentBuckets.at(i).next;
1001+
std::shared_ptr<Bucket> nextBucket = nullptr;
1002+
if (nextFuture.hasOutputHash())
1003+
{
1004+
nextBucket =
1005+
getBucketByHash(hexToBin256(nextFuture.getOutputHash()));
1006+
if (!nextBucket)
9971007
{
998-
nextBucket =
999-
getBucketByHash(hexToBin256(nextFuture.getOutputHash()));
1000-
if (!nextBucket)
1001-
{
1002-
throw std::runtime_error(
1003-
"Missing future bucket files while "
1004-
"assuming saved BucketList state");
1005-
}
1008+
throw std::runtime_error("Missing future bucket files while "
1009+
"assuming saved BucketList state");
10061010
}
1011+
}
10071012

1008-
// Buckets on the BucketList should always be indexed when
1009-
// BucketListDB enabled
1010-
if (mApp.getConfig().isUsingBucketListDB())
1013+
// Buckets on the BucketList should always be indexed when
1014+
// BucketListDB enabled
1015+
if (mApp.getConfig().isUsingBucketListDB())
1016+
{
1017+
releaseAssert(curr->isEmpty() || curr->isIndexed());
1018+
releaseAssert(snap->isEmpty() || snap->isIndexed());
1019+
if (nextBucket)
10111020
{
1012-
releaseAssert(curr->isEmpty() || curr->isIndexed());
1013-
releaseAssert(snap->isEmpty() || snap->isIndexed());
1014-
if (nextBucket)
1015-
{
1016-
releaseAssert(nextBucket->isEmpty() ||
1017-
nextBucket->isIndexed());
1018-
}
1021+
releaseAssert(nextBucket->isEmpty() || nextBucket->isIndexed());
10191022
}
1020-
1021-
mBucketList->getLevel(i).setCurr(curr);
1022-
mBucketList->getLevel(i).setSnap(snap);
1023-
mBucketList->getLevel(i).setNext(nextFuture);
10241023
}
10251024

1026-
if (restartMerges)
1027-
{
1028-
mBucketList->restartMerges(mApp, maxProtocolVersion,
1029-
has.currentLedger);
1030-
}
1025+
mBucketList->getLevel(i).setCurr(curr);
1026+
mBucketList->getLevel(i).setSnap(snap);
1027+
mBucketList->getLevel(i).setNext(nextFuture);
1028+
}
1029+
1030+
if (restartMerges)
1031+
{
1032+
mBucketList->restartMerges(mApp, maxProtocolVersion, has.currentLedger);
10311033
}
10321034

1033-
mSnapshotManager->updateCurrentSnapshot(
1034-
std::make_unique<BucketListSnapshot>(*mBucketList, has.currentLedger));
1035+
if (mApp.getConfig().isUsingBucketListDB())
1036+
{
1037+
mSnapshotManager->updateCurrentSnapshot(
1038+
std::make_unique<BucketListSnapshot>(*mBucketList,
1039+
has.currentLedger));
1040+
}
10351041
cleanupStaleFiles();
10361042
}
10371043

src/bucket/BucketManagerImpl.h

+1-5
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class BucketManagerImpl : public BucketManager
4343
std::unique_ptr<TmpDirManager> mTmpDirManager;
4444
std::unique_ptr<TmpDir> mWorkDir;
4545
std::map<Hash, std::shared_ptr<Bucket>> mSharedBuckets;
46+
mutable std::recursive_mutex mBucketMutex;
4647
std::unique_ptr<std::string> mLockedBucketDir;
4748
medida::Meter& mBucketObjectInsertBatch;
4849
medida::Timer& mBucketAddBatch;
@@ -54,11 +55,6 @@ class BucketManagerImpl : public BucketManager
5455
EvictionCounters mBucketListEvictionCounters;
5556
MergeCounters mMergeCounters;
5657

57-
// Lock for managing raw Bucket files or the bucket directory. This lock is
58-
// only required for file access, but is not required for logical changes to
59-
// the BucketList (i.e. addBatch).
60-
mutable std::recursive_mutex mBucketFileMutex;
61-
6258
bool const mDeleteEntireBucketDirInDtor;
6359

6460
// Records bucket-merges that are currently _live_ in some FutureBucket, in

0 commit comments

Comments
 (0)