Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 725356c

Browse files
committedJan 30, 2024··
Added concurrency locks for BucketListDB
1 parent 8625d56 commit 725356c

8 files changed

+127
-54
lines changed
 

‎src/bucket/BucketManager.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ class BucketManager : NonMovableOrCopyable
208208
std::unique_ptr<BucketIndex const>&& index) = 0;
209209

210210
virtual std::unique_ptr<SearchableBucketListSnapshot const>
211-
getSearchableBucketListSnapshot(bool isMainThread = false) const = 0;
211+
getSearchableBucketListSnapshot() const = 0;
212212

213213
// Scans BucketList for non-live entries to evict starting at the entry
214214
// pointed to by EvictionIterator. Scans until `maxEntriesToEvict` entries

‎src/bucket/BucketManagerImpl.cpp

+62-47
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
#include "bucket/BucketInputIterator.h"
88
#include "bucket/BucketList.h"
99
#include "bucket/BucketOutputIterator.h"
10-
#include "bucket/SearcahbleBucketListSnapshot.h"
10+
#include "bucket/SearchableBucketListSnapshot.h"
1111
#include "crypto/Hex.h"
1212
#include "history/HistoryManager.h"
1313
#include "historywork/VerifyBucketWork.h"
@@ -180,7 +180,7 @@ std::string const&
180180
BucketManagerImpl::getTmpDir()
181181
{
182182
ZoneScoped;
183-
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
183+
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
184184
if (!mWorkDir)
185185
{
186186
TmpDir t = mTmpDirManager->tmpDir("bucket");
@@ -269,7 +269,7 @@ BucketManagerImpl::getMergeTimer()
269269
MergeCounters
270270
BucketManagerImpl::readMergeCounters()
271271
{
272-
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
272+
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
273273
return mMergeCounters;
274274
}
275275

@@ -359,7 +359,7 @@ MergeCounters::operator==(MergeCounters const& other) const
359359
void
360360
BucketManagerImpl::incrMergeCounters(MergeCounters const& delta)
361361
{
362-
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
362+
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
363363
mMergeCounters += delta;
364364
}
365365

@@ -385,7 +385,7 @@ BucketManagerImpl::adoptFileAsBucket(std::string const& filename,
385385
{
386386
ZoneScoped;
387387
releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST);
388-
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
388+
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
389389

390390
if (mergeKey)
391391
{
@@ -469,7 +469,7 @@ BucketManagerImpl::noteEmptyMergeOutput(MergeKey const& mergeKey)
469469
// because it'd over-identify multiple individual inputs with the empty
470470
// output, potentially retaining far too many inputs, as lots of different
471471
// mergeKeys result in an empty output.
472-
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
472+
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
473473
CLOG_TRACE(Bucket, "BucketManager::noteEmptyMergeOutput({})", mergeKey);
474474
mLiveFutures.erase(mergeKey);
475475
}
@@ -478,7 +478,7 @@ std::shared_ptr<Bucket>
478478
BucketManagerImpl::getBucketIfExists(uint256 const& hash)
479479
{
480480
ZoneScoped;
481-
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
481+
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
482482
auto i = mSharedBuckets.find(hash);
483483
if (i != mSharedBuckets.end())
484484
{
@@ -495,7 +495,7 @@ std::shared_ptr<Bucket>
495495
BucketManagerImpl::getBucketByHash(uint256 const& hash)
496496
{
497497
ZoneScoped;
498-
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
498+
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
499499
if (isZero(hash))
500500
{
501501
return std::make_shared<Bucket>();
@@ -528,7 +528,7 @@ std::shared_future<std::shared_ptr<Bucket>>
528528
BucketManagerImpl::getMergeFuture(MergeKey const& key)
529529
{
530530
ZoneScoped;
531-
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
531+
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
532532
MergeCounters mc;
533533
auto i = mLiveFutures.find(key);
534534
if (i == mLiveFutures.end())
@@ -574,7 +574,7 @@ BucketManagerImpl::putMergeFuture(
574574
{
575575
ZoneScoped;
576576
releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST);
577-
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
577+
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
578578
CLOG_TRACE(
579579
Bucket,
580580
"BucketManager::putMergeFuture storing future for running merge {}",
@@ -586,7 +586,7 @@ BucketManagerImpl::putMergeFuture(
586586
void
587587
BucketManagerImpl::clearMergeFuturesForTesting()
588588
{
589-
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
589+
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
590590
mLiveFutures.clear();
591591
}
592592
#endif
@@ -685,7 +685,7 @@ BucketManagerImpl::cleanupStaleFiles()
685685
return;
686686
}
687687

688-
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
688+
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
689689
auto referenced = getAllReferencedBuckets();
690690
std::transform(std::begin(mSharedBuckets), std::end(mSharedBuckets),
691691
std::inserter(referenced, std::end(referenced)),
@@ -715,7 +715,7 @@ void
715715
BucketManagerImpl::forgetUnreferencedBuckets()
716716
{
717717
ZoneScoped;
718-
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
718+
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
719719
auto referenced = getAllReferencedBuckets();
720720
auto blReferenced = getBucketListReferencedBuckets();
721721

@@ -822,8 +822,11 @@ BucketManagerImpl::addBatch(Application& app, uint32_t currLedger,
822822
auto timer = mBucketAddBatch.TimeScope();
823823
mBucketObjectInsertBatch.Mark(initEntries.size() + liveEntries.size() +
824824
deadEntries.size());
825-
mBucketList->addBatch(app, currLedger, currLedgerProtocol, initEntries,
826-
liveEntries, deadEntries);
825+
{
826+
std::lock_guard<std::recursive_mutex> lock(mBucketSnapshotMutex);
827+
mBucketList->addBatch(app, currLedger, currLedgerProtocol, initEntries,
828+
liveEntries, deadEntries);
829+
}
827830
mBucketListSizeCounter.set_count(mBucketList->getSize());
828831
}
829832

@@ -905,11 +908,15 @@ BucketManagerImpl::scanForEvictionLegacySQL(AbstractLedgerTxn& ltx,
905908
}
906909

907910
std::unique_ptr<SearchableBucketListSnapshot const>
908-
BucketManagerImpl::getSearchableBucketListSnapshot(bool isMainThread) const
911+
BucketManagerImpl::getSearchableBucketListSnapshot() const
909912
{
910-
// TODO: Lock this
911913
releaseAssertOrThrow(mApp.getConfig().isUsingBucketListDB() && mBucketList);
912-
return std::make_unique<SearchableBucketListSnapshot>(mApp, *mBucketList);
914+
915+
std::lock_guard<std::recursive_mutex> lock(mBucketSnapshotMutex);
916+
917+
// Note: cannot use make_unique due to private constructor
918+
return std::unique_ptr<SearchableBucketListSnapshot>(
919+
new SearchableBucketListSnapshot(mApp, *mBucketList));
913920
}
914921

915922
medida::Meter&
@@ -972,47 +979,55 @@ BucketManagerImpl::assumeState(HistoryArchiveState const& has,
972979
ZoneScoped;
973980
releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST);
974981

975-
for (uint32_t i = 0; i < BucketList::kNumLevels; ++i)
976982
{
977-
auto curr = getBucketByHash(hexToBin256(has.currentBuckets.at(i).curr));
978-
auto snap = getBucketByHash(hexToBin256(has.currentBuckets.at(i).snap));
979-
if (!(curr && snap))
983+
std::lock_guard<std::recursive_mutex> lock(mBucketSnapshotMutex);
984+
for (uint32_t i = 0; i < BucketList::kNumLevels; ++i)
980985
{
981-
throw std::runtime_error(
982-
"Missing bucket files while assuming saved BucketList state");
983-
}
986+
auto curr =
987+
getBucketByHash(hexToBin256(has.currentBuckets.at(i).curr));
988+
auto snap =
989+
getBucketByHash(hexToBin256(has.currentBuckets.at(i).snap));
990+
if (!(curr && snap))
991+
{
992+
throw std::runtime_error("Missing bucket files while assuming "
993+
"saved BucketList state");
994+
}
984995

985-
auto const& nextFuture = has.currentBuckets.at(i).next;
986-
std::shared_ptr<Bucket> nextBucket = nullptr;
987-
if (nextFuture.hasOutputHash())
988-
{
989-
nextBucket =
990-
getBucketByHash(hexToBin256(nextFuture.getOutputHash()));
991-
if (!nextBucket)
996+
auto const& nextFuture = has.currentBuckets.at(i).next;
997+
std::shared_ptr<Bucket> nextBucket = nullptr;
998+
if (nextFuture.hasOutputHash())
992999
{
993-
throw std::runtime_error("Missing future bucket files while "
994-
"assuming saved BucketList state");
1000+
nextBucket =
1001+
getBucketByHash(hexToBin256(nextFuture.getOutputHash()));
1002+
if (!nextBucket)
1003+
{
1004+
throw std::runtime_error(
1005+
"Missing future bucket files while "
1006+
"assuming saved BucketList state");
1007+
}
9951008
}
996-
}
9971009

998-
// Buckets on the BucketList should always be indexed when BucketListDB
999-
// enabled
1000-
if (mApp.getConfig().isUsingBucketListDB())
1001-
{
1002-
releaseAssert(curr->isEmpty() || curr->isIndexed());
1003-
releaseAssert(snap->isEmpty() || snap->isIndexed());
1004-
if (nextBucket)
1010+
// Buckets on the BucketList should always be indexed when
1011+
// BucketListDB enabled
1012+
if (mApp.getConfig().isUsingBucketListDB())
10051013
{
1006-
releaseAssert(nextBucket->isEmpty() || nextBucket->isIndexed());
1014+
releaseAssert(curr->isEmpty() || curr->isIndexed());
1015+
releaseAssert(snap->isEmpty() || snap->isIndexed());
1016+
if (nextBucket)
1017+
{
1018+
releaseAssert(nextBucket->isEmpty() ||
1019+
nextBucket->isIndexed());
1020+
}
10071021
}
1022+
1023+
mBucketList->getLevel(i).setCurr(curr);
1024+
mBucketList->getLevel(i).setSnap(snap);
1025+
mBucketList->getLevel(i).setNext(nextFuture);
10081026
}
10091027

1010-
mBucketList->getLevel(i).setCurr(curr);
1011-
mBucketList->getLevel(i).setSnap(snap);
1012-
mBucketList->getLevel(i).setNext(nextFuture);
1028+
mBucketList->restartMerges(mApp, maxProtocolVersion, has.currentLedger);
10131029
}
10141030

1015-
mBucketList->restartMerges(mApp, maxProtocolVersion, has.currentLedger);
10161031
cleanupStaleFiles();
10171032
}
10181033

‎src/bucket/BucketManagerImpl.h

+11-2
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ class BucketManagerImpl : public BucketManager
4141
std::unique_ptr<TmpDirManager> mTmpDirManager;
4242
std::unique_ptr<TmpDir> mWorkDir;
4343
std::map<Hash, std::shared_ptr<Bucket>> mSharedBuckets;
44-
mutable std::recursive_mutex mBucketMutex;
4544
std::unique_ptr<std::string> mLockedBucketDir;
4645
medida::Meter& mBucketObjectInsertBatch;
4746
medida::Timer& mBucketAddBatch;
@@ -53,6 +52,16 @@ class BucketManagerImpl : public BucketManager
5352
EvictionCounters mBucketListEvictionCounters;
5453
MergeCounters mMergeCounters;
5554

55+
// Lock for managing raw Bucket files or the bucket directory. This lock is
56+
// only required for file access, but is not required for logical changes to
57+
// the BucketList (i.e. addBatch).
58+
mutable std::recursive_mutex mBucketFileMutex;
59+
60+
// Lock for logical BucketList changes and snapshots (i.e. addBatch,
61+
// getSearchableSnapshot). This lock is not required for raw Bucket file
62+
// management.
63+
mutable std::recursive_mutex mBucketSnapshotMutex;
64+
5665
bool const mDeleteEntireBucketDirInDtor;
5766

5867
// Records bucket-merges that are currently _live_ in some FutureBucket, in
@@ -135,7 +144,7 @@ class BucketManagerImpl : public BucketManager
135144
uint32_t ledgerSeq) override;
136145

137146
std::unique_ptr<SearchableBucketListSnapshot const>
138-
getSearchableBucketListSnapshot(bool isMainThread) const override;
147+
getSearchableBucketListSnapshot() const override;
139148

140149
medida::Meter& getBloomMissMeter() const override;
141150
medida::Meter& getBloomLookupMeter() const override;

‎src/bucket/SearchableBucketListSnapshot.cpp

+27
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,19 @@ SearchableBucketListSnapshot::getPointLoadTimer(LedgerEntryType t) const
4848
return iter->second;
4949
}
5050

51+
bool
52+
SearchableBucketListSnapshot::isWithinAllowedLedgerDrift(
53+
uint32_t allowedLedgerDrift) const
54+
{
55+
auto currLCL = mApp.getLedgerManager().getLastClosedLedgerNum();
56+
57+
// Edge case: genesis ledger
58+
auto minimumLCL =
59+
allowedLedgerDrift > currLCL ? 0 : currLCL - allowedLedgerDrift;
60+
61+
return mLCL >= minimumLCL;
62+
}
63+
5164
void
5265
SearchableBucketListSnapshot::loopAllBuckets(
5366
std::function<bool(SearchableBucketSnapshot const&)> f) const
@@ -77,6 +90,9 @@ SearchableBucketListSnapshot::getLedgerEntry(LedgerKey const& k) const
7790
ZoneScoped;
7891
auto timer = getPointLoadTimer(k.type()).TimeScope();
7992

93+
// Snapshots not currently supported, all access must be up to date
94+
releaseAssert(isWithinAllowedLedgerDrift(0));
95+
8096
std::shared_ptr<LedgerEntry> result{};
8197

8298
auto f = [&](SearchableBucketSnapshot const& b) {
@@ -106,6 +122,9 @@ SearchableBucketListSnapshot::loadKeys(
106122
ZoneScoped;
107123
auto timer = recordBulkLoadMetrics("prefetch", inKeys.size()).TimeScope();
108124

125+
// Snapshots not currently supported, all access must be up to date
126+
releaseAssert(isWithinAllowedLedgerDrift(0));
127+
109128
std::vector<LedgerEntry> entries;
110129

111130
// Make a copy of the key set, this loop is destructive
@@ -126,6 +145,9 @@ SearchableBucketListSnapshot::loadPoolShareTrustLinesByAccountAndAsset(
126145
ZoneScoped;
127146
auto timer = recordBulkLoadMetrics("poolshareTrustlines", 0).TimeScope();
128147

148+
// Snapshots not currently supported, all access must be up to date
149+
releaseAssert(isWithinAllowedLedgerDrift(0));
150+
129151
UnorderedMap<LedgerKey, LedgerEntry> liquidityPoolToTrustline;
130152
UnorderedSet<LedgerKey> deadTrustlines;
131153
LedgerKeySet liquidityPoolKeysToSearch;
@@ -172,6 +194,9 @@ SearchableBucketListSnapshot::loadInflationWinners(size_t maxWinners,
172194
ZoneScoped;
173195
auto timer = recordBulkLoadMetrics("inflationWinners", 0).TimeScope();
174196

197+
// Snapshots not currently supported, all access must be up to date
198+
releaseAssert(isWithinAllowedLedgerDrift(0));
199+
175200
UnorderedMap<AccountID, int64_t> voteCount;
176201
UnorderedSet<AccountID> seen;
177202

@@ -257,6 +282,8 @@ SearchableBucketLevelSnapshot::SearchableBucketLevelSnapshot(
257282
{
258283
}
259284

285+
// This is not thread safe, must call while holding
286+
// BucketManager::mBucketSnapshotMutex.
260287
SearchableBucketListSnapshot::SearchableBucketListSnapshot(Application& app,
261288
BucketList const& bl)
262289
: mApp(app)

‎src/bucket/SearchableBucketListSnapshot.h

+9-3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0
66

77
#include "bucket/BucketList.h"
8+
#include "bucket/BucketManagerImpl.h"
89
#include "bucket/SearchableBucketSnapshot.h"
910

1011
namespace stellar
@@ -48,11 +49,13 @@ class SearchableBucketListSnapshot : public NonMovable
4849

4950
medida::Timer& getPointLoadTimer(LedgerEntryType t) const;
5051

51-
public:
52-
// TODO: Private constructor so only BucketManager can create this class
53-
// with a mutex check
52+
// Return true is this snapshot is from a ledger within [lcl -
53+
// allowedLedgerDrift, lcl]
54+
bool isWithinAllowedLedgerDrift(uint32_t allowedLedgerDrift) const;
55+
5456
SearchableBucketListSnapshot(Application& app, BucketList const& bl);
5557

58+
public:
5659
std::vector<LedgerEntry>
5760
loadKeys(std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys) const;
5861

@@ -64,5 +67,8 @@ class SearchableBucketListSnapshot : public NonMovable
6467
int64_t minBalance) const;
6568

6669
std::shared_ptr<LedgerEntry> getLedgerEntry(LedgerKey const& k) const;
70+
71+
friend std::unique_ptr<SearchableBucketListSnapshot const>
72+
BucketManagerImpl::getSearchableBucketListSnapshot() const;
6773
};
6874
}

‎src/bucket/SearchableBucketSnapshot.h

+6-1
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,15 @@ class SearchableBucketSnapshot : public NonMovable
3636
std::streamoff pos,
3737
size_t pageSize) const;
3838

39-
public:
39+
// Warning: constructor not thread safe
4040
SearchableBucketSnapshot(std::shared_ptr<Bucket const> const b);
41+
42+
// Only allow copy constructor, is threadsafe
4143
SearchableBucketSnapshot(SearchableBucketSnapshot const& b);
4244
SearchableBucketSnapshot&
4345
operator=(SearchableBucketSnapshot const& b) = delete;
4446

47+
public:
4548
bool isEmpty() const;
4649
std::shared_ptr<Bucket const> getRawBucket() const;
4750

@@ -63,5 +66,7 @@ class SearchableBucketSnapshot : public NonMovable
6366
AccountID const& accountID, UnorderedSet<LedgerKey>& deadTrustlines,
6467
UnorderedMap<LedgerKey, LedgerEntry>& liquidityPoolKeyToTrustline,
6568
LedgerKeySet& liquidityPoolKeys) const;
69+
70+
friend struct SearchableBucketLevelSnapshot;
6671
};
6772
}

‎src/ledger/LedgerManagerImpl.cpp

+9
Original file line numberDiff line numberDiff line change
@@ -427,12 +427,14 @@ LedgerManagerImpl::getDatabase()
427427
uint32_t
428428
LedgerManagerImpl::getLastMaxTxSetSize() const
429429
{
430+
std::lock_guard<std::mutex> lock(mLedgerPointerLock);
430431
return mLastClosedLedger.header.maxTxSetSize;
431432
}
432433

433434
uint32_t
434435
LedgerManagerImpl::getLastMaxTxSetSizeOps() const
435436
{
437+
std::lock_guard<std::mutex> lock(mLedgerPointerLock);
436438
auto n = mLastClosedLedger.header.maxTxSetSize;
437439
return protocolVersionStartsFrom(mLastClosedLedger.header.ledgerVersion,
438440
ProtocolVersion::V_11)
@@ -484,6 +486,7 @@ LedgerManagerImpl::maxSorobanTransactionResources()
484486
int64_t
485487
LedgerManagerImpl::getLastMinBalance(uint32_t ownerCount) const
486488
{
489+
std::lock_guard<std::mutex> lock(mLedgerPointerLock);
487490
auto const& lh = mLastClosedLedger.header;
488491
if (protocolVersionIsBefore(lh.ledgerVersion, ProtocolVersion::V_9))
489492
return (2 + ownerCount) * lh.baseReserve;
@@ -494,18 +497,21 @@ LedgerManagerImpl::getLastMinBalance(uint32_t ownerCount) const
494497
uint32_t
495498
LedgerManagerImpl::getLastReserve() const
496499
{
500+
std::lock_guard<std::mutex> lock(mLedgerPointerLock);
497501
return mLastClosedLedger.header.baseReserve;
498502
}
499503

500504
uint32_t
501505
LedgerManagerImpl::getLastTxFee() const
502506
{
507+
std::lock_guard<std::mutex> lock(mLedgerPointerLock);
503508
return mLastClosedLedger.header.baseFee;
504509
}
505510

506511
LedgerHeaderHistoryEntry const&
507512
LedgerManagerImpl::getLastClosedLedgerHeader() const
508513
{
514+
std::lock_guard<std::mutex> lock(mLedgerPointerLock);
509515
return mLastClosedLedger;
510516
}
511517

@@ -524,12 +530,14 @@ LedgerManagerImpl::getLastClosedLedgerHAS()
524530
uint32_t
525531
LedgerManagerImpl::getLastClosedLedgerNum() const
526532
{
533+
std::lock_guard<std::mutex> lock(mLedgerPointerLock);
527534
return mLastClosedLedger.header.ledgerSeq;
528535
}
529536

530537
SorobanNetworkConfig&
531538
LedgerManagerImpl::getSorobanNetworkConfigInternal()
532539
{
540+
std::lock_guard<std::mutex> lock(mLedgerPointerLock);
533541
releaseAssert(mSorobanNetworkConfig);
534542
return *mSorobanNetworkConfig;
535543
}
@@ -1275,6 +1283,7 @@ LedgerManagerImpl::advanceLedgerPointers(LedgerHeader const& header,
12751283
ledgerAbbrev(header, ledgerHash));
12761284
}
12771285

1286+
std::lock_guard<std::mutex> lock(mLedgerPointerLock);
12781287
mLastClosedLedger.hash = ledgerHash;
12791288
mLastClosedLedger.header = header;
12801289
}

‎src/ledger/LedgerManagerImpl.h

+2
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ class LedgerManagerImpl : public LedgerManager
5151
LedgerHeaderHistoryEntry mLastClosedLedger;
5252
std::optional<SorobanNetworkConfig> mSorobanNetworkConfig;
5353

54+
mutable std::mutex mLedgerPointerLock;
55+
5456
medida::Timer& mTransactionApply;
5557
medida::Histogram& mTransactionCount;
5658
medida::Histogram& mOperationCount;

0 commit comments

Comments
 (0)
Please sign in to comment.