Skip to content

Commit b8035a1

Browse files
committedFeb 26, 2024
Enable parallel BucketsDB loads
1 parent 06e2d25 commit b8035a1

9 files changed

+105
-40
lines changed
 

‎src/bucket/Bucket.h

+6-7
Original file line numberDiff line numberDiff line change
@@ -125,13 +125,12 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
125125
// after this function returns:
126126
// bytesToScan -= amount_bytes_scanned
127127
// maxEntriesToEvict -= entries_evicted
128-
bool
129-
scanForEvictionLegacySQL(AbstractLedgerTxn& ltx, EvictionIterator& iter,
130-
uint32_t& bytesToScan, uint32_t& maxEntriesToEvict,
131-
uint32_t ledgerSeq,
132-
medida::Counter& entriesEvictedCounter,
133-
medida::Counter& bytesScannedForEvictionCounter,
134-
std::optional<EvictionStatistics>& stats) const;
128+
bool scanForEvictionLegacySQL(
129+
AbstractLedgerTxn& ltx, EvictionIterator& iter, uint32_t& bytesToScan,
130+
uint32_t& remainingEntriesToEvict, uint32_t ledgerSeq,
131+
medida::Counter& entriesEvictedCounter,
132+
medida::Counter& bytesScannedForEvictionCounter,
133+
std::optional<EvictionStatistics>& stats) const;
135134

136135
// Create a fresh bucket from given vectors of init (created) and live
137136
// (updated) LedgerEntries, and dead LedgerEntryKeys. The bucket will

‎src/bucket/BucketManager.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -207,9 +207,11 @@ class BucketManager : NonMovableOrCopyable
207207
virtual void maybeSetIndex(std::shared_ptr<Bucket> b,
208208
std::unique_ptr<BucketIndex const>&& index) = 0;
209209

210-
virtual std::unique_ptr<SearchableBucketListSnapshot const>
210+
virtual std::unique_ptr<SearchableBucketListSnapshot>
211211
getSearchableBucketListSnapshot() const = 0;
212212

213+
virtual std::recursive_mutex& getBucketSnapshotMutex() const = 0;
214+
213215
// Scans BucketList for non-live entries to evict starting at the entry
214216
// pointed to by EvictionIterator. Scans until `maxEntriesToEvict` entries
215217
// have been evicted or maxEvictionScanSize bytes have been scanned.

‎src/bucket/BucketManagerImpl.cpp

+12-6
Original file line numberDiff line numberDiff line change
@@ -907,7 +907,7 @@ BucketManagerImpl::scanForEvictionLegacySQL(AbstractLedgerTxn& ltx,
907907
}
908908
}
909909

910-
std::unique_ptr<SearchableBucketListSnapshot const>
910+
std::unique_ptr<SearchableBucketListSnapshot>
911911
BucketManagerImpl::getSearchableBucketListSnapshot() const
912912
{
913913
releaseAssertOrThrow(mApp.getConfig().isUsingBucketListDB() && mBucketList);
@@ -919,6 +919,12 @@ BucketManagerImpl::getSearchableBucketListSnapshot() const
919919
new SearchableBucketListSnapshot(mApp, *mBucketList));
920920
}
921921

922+
std::recursive_mutex&
923+
BucketManagerImpl::getBucketSnapshotMutex() const
924+
{
925+
return mBucketSnapshotMutex;
926+
}
927+
922928
medida::Meter&
923929
BucketManagerImpl::getBloomMissMeter() const
924930
{
@@ -1025,13 +1031,13 @@ BucketManagerImpl::assumeState(HistoryArchiveState const& has,
10251031
mBucketList->getLevel(i).setNext(nextFuture);
10261032
}
10271033

1028-
mBucketList->restartMerges(mApp, maxProtocolVersion, has.currentLedger);
1034+
if (restartMerges)
1035+
{
1036+
mBucketList->restartMerges(mApp, maxProtocolVersion,
1037+
has.currentLedger);
1038+
}
10291039
}
10301040

1031-
if (restartMerges)
1032-
{
1033-
mBucketList->restartMerges(mApp, maxProtocolVersion, has.currentLedger);
1034-
}
10351041
cleanupStaleFiles();
10361042
}
10371043

‎src/bucket/BucketManagerImpl.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,9 @@ class BucketManagerImpl : public BucketManager
143143
void scanForEvictionLegacySQL(AbstractLedgerTxn& ltx,
144144
uint32_t ledgerSeq) override;
145145

146-
std::unique_ptr<SearchableBucketListSnapshot const>
146+
std::unique_ptr<SearchableBucketListSnapshot>
147147
getSearchableBucketListSnapshot() const override;
148+
std::recursive_mutex& getBucketSnapshotMutex() const override;
148149

149150
medida::Meter& getBloomMissMeter() const override;
150151
medida::Meter& getBloomLookupMeter() const override;

‎src/bucket/SearchableBucketListSnapshot.cpp

+28-16
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,26 @@ SearchableBucketListSnapshot::isWithinAllowedLedgerDrift(
6161
return mLCL >= minimumLCL;
6262
}
6363

64+
void
65+
SearchableBucketListSnapshot::maybeUpdateSnapshot()
66+
{
67+
auto currLCL = mApp.getLedgerManager().getLastClosedLedgerNum();
68+
if (currLCL != mLCL)
69+
{
70+
mLCL = currLCL;
71+
mLevels.clear();
72+
73+
std::lock_guard<std::recursive_mutex> lock(
74+
mApp.getBucketManager().getBucketSnapshotMutex());
75+
auto& bl = mApp.getBucketManager().getBucketList();
76+
for (uint32_t i = 0; i < BucketList::kNumLevels; ++i)
77+
{
78+
auto const& level = bl.getLevel(i);
79+
mLevels.emplace_back(SearchableBucketLevelSnapshot(level));
80+
}
81+
}
82+
}
83+
6484
void
6585
SearchableBucketListSnapshot::loopAllBuckets(
6686
std::function<bool(SearchableBucketSnapshot const&)> f) const
@@ -85,13 +105,11 @@ SearchableBucketListSnapshot::loopAllBuckets(
85105
}
86106

87107
std::shared_ptr<LedgerEntry>
88-
SearchableBucketListSnapshot::getLedgerEntry(LedgerKey const& k) const
108+
SearchableBucketListSnapshot::getLedgerEntry(LedgerKey const& k)
89109
{
90110
ZoneScoped;
91111
auto timer = getPointLoadTimer(k.type()).TimeScope();
92-
93-
// Snapshots not currently supported, all access must be up to date
94-
releaseAssert(isWithinAllowedLedgerDrift(0));
112+
maybeUpdateSnapshot();
95113

96114
std::shared_ptr<LedgerEntry> result{};
97115

@@ -117,13 +135,11 @@ SearchableBucketListSnapshot::getLedgerEntry(LedgerKey const& k) const
117135

118136
std::vector<LedgerEntry>
119137
SearchableBucketListSnapshot::loadKeys(
120-
std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys) const
138+
std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys)
121139
{
122140
ZoneScoped;
123141
auto timer = recordBulkLoadMetrics("prefetch", inKeys.size()).TimeScope();
124-
125-
// Snapshots not currently supported, all access must be up to date
126-
releaseAssert(isWithinAllowedLedgerDrift(0));
142+
maybeUpdateSnapshot();
127143

128144
std::vector<LedgerEntry> entries;
129145

@@ -140,13 +156,11 @@ SearchableBucketListSnapshot::loadKeys(
140156

141157
std::vector<LedgerEntry>
142158
SearchableBucketListSnapshot::loadPoolShareTrustLinesByAccountAndAsset(
143-
AccountID const& accountID, Asset const& asset) const
159+
AccountID const& accountID, Asset const& asset)
144160
{
145161
ZoneScoped;
146162
auto timer = recordBulkLoadMetrics("poolshareTrustlines", 0).TimeScope();
147-
148-
// Snapshots not currently supported, all access must be up to date
149-
releaseAssert(isWithinAllowedLedgerDrift(0));
163+
maybeUpdateSnapshot();
150164

151165
UnorderedMap<LedgerKey, LedgerEntry> liquidityPoolToTrustline;
152166
UnorderedSet<LedgerKey> deadTrustlines;
@@ -189,13 +203,11 @@ SearchableBucketListSnapshot::loadPoolShareTrustLinesByAccountAndAsset(
189203

190204
std::vector<InflationWinner>
191205
SearchableBucketListSnapshot::loadInflationWinners(size_t maxWinners,
192-
int64_t minBalance) const
206+
int64_t minBalance)
193207
{
194208
ZoneScoped;
195209
auto timer = recordBulkLoadMetrics("inflationWinners", 0).TimeScope();
196-
197-
// Snapshots not currently supported, all access must be up to date
198-
releaseAssert(isWithinAllowedLedgerDrift(0));
210+
maybeUpdateSnapshot();
199211

200212
UnorderedMap<AccountID, int64_t> voteCount;
201213
UnorderedSet<AccountID> seen;

‎src/bucket/SearchableBucketListSnapshot.h

+9-6
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ struct SearchableBucketLevelSnapshot
2626

2727
// A lightweight wrapper around the BucketList for thread safe BucketListDB
2828
// lookups
29-
class SearchableBucketListSnapshot : public NonMovable
29+
class SearchableBucketListSnapshot : public NonMovableOrCopyable
3030
{
3131
Application& mApp;
3232
std::vector<SearchableBucketLevelSnapshot> mLevels;
@@ -58,22 +58,25 @@ class SearchableBucketListSnapshot : public NonMovable
5858
// allowedLedgerDrift, lcl]
5959
bool isWithinAllowedLedgerDrift(uint32_t allowedLedgerDrift) const;
6060

61+
// Checks if snapshot is behind the current LCL and updates as necessary
62+
void maybeUpdateSnapshot();
63+
6164
SearchableBucketListSnapshot(Application& app, BucketList const& bl);
6265

6366
public:
6467
std::vector<LedgerEntry>
65-
loadKeys(std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys) const;
68+
loadKeys(std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys);
6669

6770
std::vector<LedgerEntry>
6871
loadPoolShareTrustLinesByAccountAndAsset(AccountID const& accountID,
69-
Asset const& asset) const;
72+
Asset const& asset);
7073

7174
std::vector<InflationWinner> loadInflationWinners(size_t maxWinners,
72-
int64_t minBalance) const;
75+
int64_t minBalance);
7376

74-
std::shared_ptr<LedgerEntry> getLedgerEntry(LedgerKey const& k) const;
77+
std::shared_ptr<LedgerEntry> getLedgerEntry(LedgerKey const& k);
7578

76-
friend std::unique_ptr<SearchableBucketListSnapshot const>
79+
friend std::unique_ptr<SearchableBucketListSnapshot>
7780
BucketManagerImpl::getSearchableBucketListSnapshot() const;
7881
};
7982
}

‎src/bucket/test/BucketListTests.cpp

+42
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "bucket/BucketList.h"
1616
#include "bucket/BucketManager.h"
1717
#include "bucket/BucketOutputIterator.h"
18+
#include "bucket/SearchableBucketListSnapshot.h"
1819
#include "bucket/test/BucketTestUtils.h"
1920
#include "ledger/test/LedgerTestUtils.h"
2021
#include "lib/catch.hpp"
@@ -1114,6 +1115,47 @@ TEST_CASE_VERSIONS("eviction scan", "[bucketlist]")
11141115
});
11151116
}
11161117

1118+
TEST_CASE_VERSIONS("Searchable BucketListDB snapshots", "[bucketlist]")
1119+
{
1120+
VirtualClock clock;
1121+
Config cfg(getTestConfig(0, Config::TESTDB_IN_MEMORY_SQLITE));
1122+
cfg.EXPERIMENTAL_BUCKETLIST_DB = true;
1123+
1124+
auto app = createTestApplication<BucketTestApplication>(clock, cfg);
1125+
for_versions_from(20, *app, [&] {
1126+
LedgerManagerForBucketTests& lm = app->getLedgerManager();
1127+
auto& bm = app->getBucketManager();
1128+
1129+
auto entry =
1130+
LedgerTestUtils::generateValidLedgerEntryOfType(CLAIMABLE_BALANCE);
1131+
entry.data.claimableBalance().amount = 0;
1132+
1133+
auto searchableBL = bm.getSearchableBucketListSnapshot();
1134+
1135+
// Update entry every 5 ledgers so we can see bucket merge events
1136+
for (auto ledgerSeq = 1; ledgerSeq < 101; ++ledgerSeq)
1137+
{
1138+
if ((ledgerSeq - 1) % 5 == 0)
1139+
{
1140+
++entry.data.claimableBalance().amount;
1141+
entry.lastModifiedLedgerSeq = ledgerSeq;
1142+
lm.setNextLedgerEntryBatchForBucketTesting({}, {entry}, {});
1143+
}
1144+
else
1145+
{
1146+
lm.setNextLedgerEntryBatchForBucketTesting({}, {}, {});
1147+
}
1148+
1149+
closeLedger(*app);
1150+
1151+
// Snapshot should automatically update with latest version
1152+
auto loadedEntry =
1153+
searchableBL->getLedgerEntry(LedgerEntryKey(entry));
1154+
REQUIRE((loadedEntry && *loadedEntry == entry));
1155+
}
1156+
});
1157+
}
1158+
11171159
static std::string
11181160
formatX32(uint32_t v)
11191161
{

‎src/ledger/LedgerTxn.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -3371,7 +3371,7 @@ LedgerTxnRoot::Impl::areEntriesMissingInCacheForOffer(OfferEntry const& oe)
33713371
return false;
33723372
}
33733373

3374-
SearchableBucketListSnapshot const&
3374+
SearchableBucketListSnapshot&
33753375
LedgerTxnRoot::Impl::getSearchableBucketListSnapshot() const
33763376
{
33773377
releaseAssert(mApp.getConfig().isUsingBucketListDB());

‎src/ledger/LedgerTxnImpl.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -740,7 +740,7 @@ class LedgerTxnRoot::Impl
740740
mutable BestOffers mBestOffers;
741741
mutable uint64_t mPrefetchHits{0};
742742
mutable uint64_t mPrefetchMisses{0};
743-
mutable std::unique_ptr<SearchableBucketListSnapshot const>
743+
mutable std::unique_ptr<SearchableBucketListSnapshot>
744744
mSearchableBucketListSnapshot{};
745745

746746
size_t mBulkLoadBatchSize;
@@ -874,7 +874,7 @@ class LedgerTxnRoot::Impl
874874

875875
bool areEntriesMissingInCacheForOffer(OfferEntry const& oe);
876876

877-
SearchableBucketListSnapshot const& getSearchableBucketListSnapshot() const;
877+
SearchableBucketListSnapshot& getSearchableBucketListSnapshot() const;
878878

879879
public:
880880
// Constructor has the strong exception safety guarantee

0 commit comments

Comments
 (0)
Please sign in to comment.