Skip to content

Commit be126ad

Browse files
committed
BucketSnapshotManager manages all concurrency
1 parent 9a4efc4 commit be126ad

15 files changed

+156
-138
lines changed

src/bucket/Bucket.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,6 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
165165
static uint32_t
166166
getBucketVersion(std::shared_ptr<Bucket const> const& bucket);
167167

168-
friend class SearchableBucketSnapshot;
168+
friend class BucketSnapshot;
169169
};
170170
}

src/bucket/BucketList.cpp

+1-13
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
#include "bucket/Bucket.h"
77
#include "bucket/BucketInputIterator.h"
88
#include "bucket/BucketManager.h"
9+
#include "bucket/BucketSnapshot.h"
910
#include "bucket/LedgerCmp.h"
10-
#include "bucket/SearchableBucketSnapshot.h"
1111
#include "crypto/Hex.h"
1212
#include "crypto/Random.h"
1313
#include "crypto/SHA.h"
@@ -454,12 +454,6 @@ BucketList::getLevel(uint32_t i)
454454
return mLevels.at(i);
455455
}
456456

457-
uint32_t
458-
BucketList::getLedgerSeq() const
459-
{
460-
return mLedgerSeq;
461-
}
462-
463457
void
464458
BucketList::resolveAnyReadyFutures()
465459
{
@@ -532,10 +526,6 @@ BucketList::addBatch(Application& app, uint32_t currLedger,
532526
{
533527
ZoneScoped;
534528
releaseAssert(currLedger > 0);
535-
releaseAssert(threadIsMain());
536-
537-
std::lock_guard<std::recursive_mutex> lock(
538-
app.getBucketManager().getBucketListMutex());
539529

540530
std::vector<std::shared_ptr<Bucket>> shadows;
541531
for (auto& level : mLevels)
@@ -647,8 +637,6 @@ BucketList::addBatch(Application& app, uint32_t currLedger,
647637
{
648638
resolveAnyReadyFutures();
649639
}
650-
651-
mLedgerSeq = currLedger;
652640
}
653641

654642
void

src/bucket/BucketList.h

-8
Original file line numberDiff line numberDiff line change
@@ -431,9 +431,6 @@ class BucketList
431431
// nullopt and be initialized at the start of the next cycle.
432432
std::optional<EvictionStatistics> mEvictionStatistics;
433433

434-
// Current ledgerSeq for BucketList state
435-
uint32_t mLedgerSeq{};
436-
437434
public:
438435
// Number of bucket levels in the bucketlist. Every bucketlist in the system
439436
// will have this many levels and it effectively gets wired-in to the
@@ -481,12 +478,8 @@ class BucketList
481478
// Return level `i` of the BucketList.
482479
BucketLevel const& getLevel(uint32_t i) const;
483480

484-
// Return level `i` of the BucketList.
485481
BucketLevel& getLevel(uint32_t i);
486482

487-
// Not thread safe, must hold mBucketListMutex
488-
uint32_t getLedgerSeq() const;
489-
490483
// Return a cumulative hash of the entire bucketlist; this is the hash of
491484
// the concatenation of each level's hash, each of which in turn is the hash
492485
// of the concatenation of the hashes of the `curr` and `snap` buckets.
@@ -535,7 +528,6 @@ class BucketList
535528
// should have spilled due to passing through `currLedger`. The `currLedger`
536529
// and `currProtocolVersion` values should be taken from the ledger at which
537530
// this batch is being added.
538-
// Not thread safe, must hold mBucketListMutex.
539531
void addBatch(Application& app, uint32_t currLedger,
540532
uint32_t currLedgerProtocol,
541533
std::vector<LedgerEntry> const& initEntries,

src/bucket/SearchableBucketListSnapshot.cpp src/bucket/BucketListSnapshot.cpp

+42-22
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// under the Apache License, Version 2.0. See the COPYING file at the root
33
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0
44

5-
#include "bucket/SearchableBucketListSnapshot.h"
5+
#include "bucket/BucketListSnapshot.h"
66
#include "bucket/BucketInputIterator.h"
77
#include "crypto/SecretKey.h"
88
#include "ledger/LedgerTxn.h"
@@ -13,14 +13,40 @@
1313
namespace stellar
1414
{
1515

16+
BucketListSnapshot::BucketListSnapshot(BucketList const& bl, uint32_t ledgerSeq)
17+
: mLedgerSeq(ledgerSeq)
18+
{
19+
releaseAssert(threadIsMain());
20+
21+
for (uint32_t i = 0; i < BucketList::kNumLevels; ++i)
22+
{
23+
auto const& level = bl.getLevel(i);
24+
mLevels.emplace_back(BucketLevelSnapshot(level));
25+
}
26+
}
27+
28+
std::vector<BucketLevelSnapshot> const&
29+
BucketListSnapshot::getLevels() const
30+
{
31+
return mLevels;
32+
}
33+
34+
uint32_t
35+
BucketListSnapshot::getLedgerSeq() const
36+
{
37+
return mLedgerSeq;
38+
}
39+
1640
void
1741
SearchableBucketListSnapshot::loopAllBuckets(
18-
std::function<bool(SearchableBucketSnapshot const&)> f) const
42+
std::function<bool(BucketSnapshot const&)> f) const
1943
{
20-
for (auto const& lev : mLevels)
44+
releaseAssert(mSnapshot);
45+
46+
for (auto const& lev : mSnapshot->getLevels())
2147
{
2248
// Return true if we should exit loop early
23-
auto processBucket = [f](SearchableBucketSnapshot const& b) {
49+
auto processBucket = [f](BucketSnapshot const& b) {
2450
if (b.isEmpty())
2551
{
2652
return false;
@@ -40,7 +66,7 @@ std::shared_ptr<LedgerEntry>
4066
SearchableBucketListSnapshot::getLedgerEntry(LedgerKey const& k)
4167
{
4268
ZoneScoped;
43-
mSnapshotManager.maybeUpdateSnapshot(*this);
69+
mSnapshotManager.maybeUpdateSnapshot(mSnapshot);
4470

4571
if (threadIsMain())
4672
{
@@ -58,7 +84,7 @@ SearchableBucketListSnapshot::getLedgerEntryInternal(LedgerKey const& k)
5884
{
5985
std::shared_ptr<LedgerEntry> result{};
6086

61-
auto f = [&](SearchableBucketSnapshot const& b) {
87+
auto f = [&](BucketSnapshot const& b) {
6288
auto be = b.getBucketEntry(k);
6389
if (be.has_value())
6490
{
@@ -86,7 +112,7 @@ SearchableBucketListSnapshot::loadKeysInternal(
86112

87113
// Make a copy of the key set, this loop is destructive
88114
auto keys = inKeys;
89-
auto f = [&](SearchableBucketSnapshot const& b) {
115+
auto f = [&](BucketSnapshot const& b) {
90116
b.loadKeys(keys, entries);
91117
return keys.empty();
92118
};
@@ -100,7 +126,7 @@ SearchableBucketListSnapshot::loadKeys(
100126
std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys)
101127
{
102128
ZoneScoped;
103-
mSnapshotManager.maybeUpdateSnapshot(*this);
129+
mSnapshotManager.maybeUpdateSnapshot(mSnapshot);
104130

105131
if (threadIsMain())
106132
{
@@ -128,11 +154,11 @@ SearchableBucketListSnapshot::loadPoolShareTrustLinesByAccountAndAsset(
128154

129155
// This query should only be called during TX apply
130156
releaseAssert(threadIsMain());
131-
mSnapshotManager.maybeUpdateSnapshot(*this);
157+
mSnapshotManager.maybeUpdateSnapshot(mSnapshot);
132158

133159
LedgerKeySet trustlinesToLoad;
134160

135-
auto trustLineLoop = [&](SearchableBucketSnapshot const& b) {
161+
auto trustLineLoop = [&](BucketSnapshot const& b) {
136162
for (auto const& poolID : b.getPoolIDsByAsset(asset))
137163
{
138164
LedgerKey trustlineKey(TRUSTLINE);
@@ -159,7 +185,7 @@ SearchableBucketListSnapshot::loadInflationWinners(size_t maxWinners,
159185
int64_t minBalance)
160186
{
161187
ZoneScoped;
162-
mSnapshotManager.maybeUpdateSnapshot(*this);
188+
mSnapshotManager.maybeUpdateSnapshot(mSnapshot);
163189

164190
// This is a legacy query, should only be called by main thread during
165191
// catchup
@@ -170,7 +196,7 @@ SearchableBucketListSnapshot::loadInflationWinners(size_t maxWinners,
170196
UnorderedMap<AccountID, int64_t> voteCount;
171197
UnorderedSet<AccountID> seen;
172198

173-
auto countVotesInBucket = [&](SearchableBucketSnapshot const& b) {
199+
auto countVotesInBucket = [&](BucketSnapshot const& b) {
174200
for (BucketInputIterator in(b.getRawBucket()); in; ++in)
175201
{
176202
BucketEntry const& be = *in;
@@ -246,22 +272,16 @@ SearchableBucketListSnapshot::loadInflationWinners(size_t maxWinners,
246272
return winners;
247273
}
248274

249-
SearchableBucketLevelSnapshot::SearchableBucketLevelSnapshot(
250-
BucketLevel const& level)
275+
BucketLevelSnapshot::BucketLevelSnapshot(BucketLevel const& level)
251276
: curr(level.getCurr()), snap(level.getSnap())
252277
{
253278
}
254279

255-
// This is not thread safe, must call while holding
256-
// BucketManager::mBucketListMutex.
257280
SearchableBucketListSnapshot::SearchableBucketListSnapshot(
258-
BucketSnapshotManager const& snapshotManager, BucketList const& bl)
281+
BucketSnapshotManager const& snapshotManager)
259282
: mSnapshotManager(snapshotManager)
260283
{
261-
for (uint32_t i = 0; i < BucketList::kNumLevels; ++i)
262-
{
263-
auto const& level = bl.getLevel(i);
264-
mLevels.emplace_back(SearchableBucketLevelSnapshot(level));
265-
}
284+
// Initialize snapshot from SnapshotManager
285+
mSnapshotManager.maybeUpdateSnapshot(mSnapshot);
266286
}
267287
}

src/bucket/SearchableBucketListSnapshot.h src/bucket/BucketListSnapshot.h

+27-16
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66

77
#include "bucket/BucketList.h"
88
#include "bucket/BucketManagerImpl.h"
9+
#include "bucket/BucketSnapshot.h"
910
#include "bucket/BucketSnapshotManager.h"
10-
#include "bucket/SearchableBucketSnapshot.h"
1111

1212
namespace medida
1313
{
@@ -17,38 +17,51 @@ class Timer;
1717
namespace stellar
1818
{
1919

20-
struct SearchableBucketLevelSnapshot
20+
struct BucketLevelSnapshot
2121
{
22-
SearchableBucketSnapshot curr;
23-
SearchableBucketSnapshot snap;
22+
BucketSnapshot curr;
23+
BucketSnapshot snap;
2424

25-
SearchableBucketLevelSnapshot(BucketLevel const& level);
25+
BucketLevelSnapshot(BucketLevel const& level);
26+
};
27+
28+
class BucketListSnapshot
29+
{
30+
private:
31+
std::vector<BucketLevelSnapshot> mLevels;
32+
33+
// ledgerSeq that this BucketList snapshot is based off of
34+
uint32_t mLedgerSeq;
35+
36+
public:
37+
BucketListSnapshot(BucketList const& bl, uint32_t ledgerSeq);
38+
39+
std::vector<BucketLevelSnapshot> const& getLevels() const;
40+
uint32_t getLedgerSeq() const;
2641
};
2742

2843
// A lightweight wrapper around the BucketList for thread safe BucketListDB
2944
// lookups
3045
class SearchableBucketListSnapshot : public NonMovableOrCopyable
3146
{
32-
std::vector<SearchableBucketLevelSnapshot> mLevels;
3347
BucketSnapshotManager const& mSnapshotManager;
34-
35-
// ledgerSeq that this BucketList snapshot is based off of
36-
uint32_t mLedgerSeq;
48+
std::unique_ptr<BucketListSnapshot const> mSnapshot{};
3749

3850
// Loops through all buckets, starting with curr at level 0, then snap at
3951
// level 0, etc. Calls f on each bucket. Exits early if function
4052
// returns true
41-
void loopAllBuckets(
42-
std::function<bool(SearchableBucketSnapshot const&)> f) const;
43-
44-
SearchableBucketListSnapshot(BucketSnapshotManager const& snapshotManager,
45-
BucketList const& bl);
53+
void loopAllBuckets(std::function<bool(BucketSnapshot const&)> f) const;
4654

4755
std::vector<LedgerEntry>
4856
loadKeysInternal(std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys);
4957

5058
std::shared_ptr<LedgerEntry> getLedgerEntryInternal(LedgerKey const& k);
5159

60+
SearchableBucketListSnapshot(BucketSnapshotManager const& snapshotManager);
61+
62+
friend std::unique_ptr<SearchableBucketListSnapshot>
63+
BucketSnapshotManager::getSearchableBucketListSnapshot() const;
64+
5265
public:
5366
std::vector<LedgerEntry>
5467
loadKeys(std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys);
@@ -61,7 +74,5 @@ class SearchableBucketListSnapshot : public NonMovableOrCopyable
6174
int64_t minBalance);
6275

6376
std::shared_ptr<LedgerEntry> getLedgerEntry(LedgerKey const& k);
64-
65-
friend class BucketSnapshotManager;
6677
};
6778
}

src/bucket/BucketManager.h

-2
Original file line numberDiff line numberDiff line change
@@ -209,8 +209,6 @@ class BucketManager : NonMovableOrCopyable
209209
virtual void maybeSetIndex(std::shared_ptr<Bucket> b,
210210
std::unique_ptr<BucketIndex const>&& index) = 0;
211211

212-
virtual std::recursive_mutex& getBucketListMutex() const = 0;
213-
214212
// Scans BucketList for non-live entries to evict starting at the entry
215213
// pointed to by EvictionIterator. Scans until `maxEntriesToEvict` entries
216214
// have been evicted or maxEvictionScanSize bytes have been scanned.

src/bucket/BucketManagerImpl.cpp

+9-14
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
#include "bucket/Bucket.h"
77
#include "bucket/BucketInputIterator.h"
88
#include "bucket/BucketList.h"
9+
#include "bucket/BucketListSnapshot.h"
910
#include "bucket/BucketOutputIterator.h"
1011
#include "bucket/BucketSnapshotManager.h"
11-
#include "bucket/SearchableBucketListSnapshot.h"
1212
#include "crypto/Hex.h"
1313
#include "history/HistoryManager.h"
1414
#include "historywork/VerifyBucketWork.h"
@@ -90,7 +90,8 @@ BucketManagerImpl::initialize()
9090
{
9191
mBucketList = std::make_unique<BucketList>();
9292
mSnapshotManager = std::make_unique<BucketSnapshotManager>(
93-
mApp.getMetrics(), *mBucketList, mBucketListMutex);
93+
mApp.getMetrics(),
94+
std::make_unique<BucketListSnapshot>(*mBucketList, 0));
9495
}
9596
}
9697

@@ -833,12 +834,11 @@ BucketManagerImpl::addBatch(Application& app, uint32_t currLedger,
833834
auto timer = mBucketAddBatch.TimeScope();
834835
mBucketObjectInsertBatch.Mark(initEntries.size() + liveEntries.size() +
835836
deadEntries.size());
836-
{
837-
std::lock_guard<std::recursive_mutex> lock(mBucketListMutex);
838-
mBucketList->addBatch(app, currLedger, currLedgerProtocol, initEntries,
839-
liveEntries, deadEntries);
840-
}
837+
mBucketList->addBatch(app, currLedger, currLedgerProtocol, initEntries,
838+
liveEntries, deadEntries);
841839
mBucketListSizeCounter.set_count(mBucketList->getSize());
840+
mSnapshotManager->updateCurrentSnapshot(
841+
std::make_unique<BucketListSnapshot>(*mBucketList, currLedger));
842842
}
843843

844844
#ifdef BUILD_TESTS
@@ -918,12 +918,6 @@ BucketManagerImpl::scanForEvictionLegacySQL(AbstractLedgerTxn& ltx,
918918
}
919919
}
920920

921-
std::recursive_mutex&
922-
BucketManagerImpl::getBucketListMutex() const
923-
{
924-
return mBucketListMutex;
925-
}
926-
927921
medida::Meter&
928922
BucketManagerImpl::getBloomMissMeter() const
929923
{
@@ -985,7 +979,6 @@ BucketManagerImpl::assumeState(HistoryArchiveState const& has,
985979
releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST);
986980

987981
{
988-
std::lock_guard<std::recursive_mutex> lock(mBucketListMutex);
989982
for (uint32_t i = 0; i < BucketList::kNumLevels; ++i)
990983
{
991984
auto curr =
@@ -1037,6 +1030,8 @@ BucketManagerImpl::assumeState(HistoryArchiveState const& has,
10371030
}
10381031
}
10391032

1033+
mSnapshotManager->updateCurrentSnapshot(
1034+
std::make_unique<BucketListSnapshot>(*mBucketList, has.currentLedger));
10401035
cleanupStaleFiles();
10411036
}
10421037

0 commit comments

Comments
 (0)