Skip to content

Commit 1594eea

Browse files
committed
Minor fixes
1 parent be126ad commit 1594eea

11 files changed

+116
-101
lines changed

src/bucket/BucketList.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ BucketLevel::getNext()
6363
void
6464
BucketLevel::setNext(FutureBucket const& fb)
6565
{
66-
releaseAssert(threadIsMain());
66+
assertThreadIsMain();
6767
mNextCurr = fb;
6868
}
6969

@@ -82,7 +82,7 @@ BucketLevel::getSnap() const
8282
void
8383
BucketLevel::setCurr(std::shared_ptr<Bucket> b)
8484
{
85-
releaseAssert(threadIsMain());
85+
assertThreadIsMain();
8686
mNextCurr.clear();
8787
mCurr = b;
8888
}
@@ -117,7 +117,7 @@ BucketList::shouldMergeWithEmptyCurr(uint32_t ledger, uint32_t level)
117117
void
118118
BucketLevel::setSnap(std::shared_ptr<Bucket> b)
119119
{
120-
releaseAssert(threadIsMain());
120+
assertThreadIsMain();
121121
mSnap = b;
122122
}
123123

src/bucket/BucketList.h

+1
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,7 @@ class BucketList
478478
// Return level `i` of the BucketList.
479479
BucketLevel const& getLevel(uint32_t i) const;
480480

481+
// Return level `i` of the BucketList.
481482
BucketLevel& getLevel(uint32_t i);
482483

483484
// Return a cumulative hash of the entire bucketlist; this is the hash of

src/bucket/BucketListSnapshot.cpp

+8-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ namespace stellar
1616
BucketListSnapshot::BucketListSnapshot(BucketList const& bl, uint32_t ledgerSeq)
1717
: mLedgerSeq(ledgerSeq)
1818
{
19-
releaseAssert(threadIsMain());
19+
assertThreadIsMain();
2020

2121
for (uint32_t i = 0; i < BucketList::kNumLevels; ++i)
2222
{
@@ -25,6 +25,11 @@ BucketListSnapshot::BucketListSnapshot(BucketList const& bl, uint32_t ledgerSeq)
2525
}
2626
}
2727

28+
BucketListSnapshot::BucketListSnapshot(BucketListSnapshot const& snapshot)
29+
: mLevels(snapshot.mLevels), mLedgerSeq(snapshot.mLedgerSeq)
30+
{
31+
}
32+
2833
std::vector<BucketLevelSnapshot> const&
2934
BucketListSnapshot::getLevels() const
3035
{
@@ -153,7 +158,7 @@ SearchableBucketListSnapshot::loadPoolShareTrustLinesByAccountAndAsset(
153158
ZoneScoped;
154159

155160
// This query should only be called during TX apply
156-
releaseAssert(threadIsMain());
161+
assertThreadIsMain();
157162
mSnapshotManager.maybeUpdateSnapshot(mSnapshot);
158163

159164
LedgerKeySet trustlinesToLoad;
@@ -189,7 +194,7 @@ SearchableBucketListSnapshot::loadInflationWinners(size_t maxWinners,
189194

190195
// This is a legacy query, should only be called by main thread during
191196
// catchup
192-
releaseAssert(threadIsMain());
197+
assertThreadIsMain();
193198
auto timer = mSnapshotManager.recordBulkLoadMetrics("inflationWinners", 0)
194199
.TimeScope();
195200

src/bucket/BucketListSnapshot.h

+16-3
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ struct BucketLevelSnapshot
2525
BucketLevelSnapshot(BucketLevel const& level);
2626
};
2727

28-
class BucketListSnapshot
28+
class BucketListSnapshot : public NonMovable
2929
{
3030
private:
3131
std::vector<BucketLevelSnapshot> mLevels;
@@ -36,15 +36,28 @@ class BucketListSnapshot
3636
public:
3737
BucketListSnapshot(BucketList const& bl, uint32_t ledgerSeq);
3838

39+
// Only allow copies via constructor
40+
BucketListSnapshot(BucketListSnapshot const& snapshot);
41+
BucketListSnapshot& operator=(BucketListSnapshot const&) = delete;
42+
3943
std::vector<BucketLevelSnapshot> const& getLevels() const;
4044
uint32_t getLedgerSeq() const;
4145
};
4246

43-
// A lightweight wrapper around the BucketList for thread safe BucketListDB
44-
// lookups
47+
// A lightweight wrapper around BucketListSnapshot for thread safe BucketListDB
48+
// lookups.
49+
//
50+
// Any thread that needs to perform BucketList lookups should retrieve
51+
// a single SearchableBucketListSnapshot instance from
52+
// BucketListSnapshotManager. On each lookup, the SearchableBucketListSnapshot
53+
// instance will check that the current snapshot is up to date via the
54+
// BucketListSnapshotManager and will be refreshed accordingly. Callers can
55+
// assume SearchableBucketListSnapshot is always up to date.
4556
class SearchableBucketListSnapshot : public NonMovableOrCopyable
4657
{
4758
BucketSnapshotManager const& mSnapshotManager;
59+
60+
// Snapshot managed by SnapshotManager
4861
std::unique_ptr<BucketListSnapshot const> mSnapshot{};
4962

5063
// Loops through all buckets, starting with curr at level 0, then snap at

src/bucket/BucketManagerImpl.cpp

+45-52
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ std::string const&
185185
BucketManagerImpl::getTmpDir()
186186
{
187187
ZoneScoped;
188-
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
188+
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
189189
if (!mWorkDir)
190190
{
191191
TmpDir t = mTmpDirManager->tmpDir("bucket");
@@ -281,7 +281,7 @@ BucketManagerImpl::getMergeTimer()
281281
MergeCounters
282282
BucketManagerImpl::readMergeCounters()
283283
{
284-
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
284+
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
285285
return mMergeCounters;
286286
}
287287

@@ -371,7 +371,7 @@ MergeCounters::operator==(MergeCounters const& other) const
371371
void
372372
BucketManagerImpl::incrMergeCounters(MergeCounters const& delta)
373373
{
374-
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
374+
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
375375
mMergeCounters += delta;
376376
}
377377

@@ -397,7 +397,7 @@ BucketManagerImpl::adoptFileAsBucket(std::string const& filename,
397397
{
398398
ZoneScoped;
399399
releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST);
400-
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
400+
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
401401

402402
if (mergeKey)
403403
{
@@ -481,7 +481,7 @@ BucketManagerImpl::noteEmptyMergeOutput(MergeKey const& mergeKey)
481481
// because it'd over-identify multiple individual inputs with the empty
482482
// output, potentially retaining far too many inputs, as lots of different
483483
// mergeKeys result in an empty output.
484-
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
484+
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
485485
CLOG_TRACE(Bucket, "BucketManager::noteEmptyMergeOutput({})", mergeKey);
486486
mLiveFutures.erase(mergeKey);
487487
}
@@ -490,7 +490,7 @@ std::shared_ptr<Bucket>
490490
BucketManagerImpl::getBucketIfExists(uint256 const& hash)
491491
{
492492
ZoneScoped;
493-
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
493+
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
494494
auto i = mSharedBuckets.find(hash);
495495
if (i != mSharedBuckets.end())
496496
{
@@ -507,7 +507,7 @@ std::shared_ptr<Bucket>
507507
BucketManagerImpl::getBucketByHash(uint256 const& hash)
508508
{
509509
ZoneScoped;
510-
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
510+
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
511511
if (isZero(hash))
512512
{
513513
return std::make_shared<Bucket>();
@@ -540,7 +540,7 @@ std::shared_future<std::shared_ptr<Bucket>>
540540
BucketManagerImpl::getMergeFuture(MergeKey const& key)
541541
{
542542
ZoneScoped;
543-
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
543+
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
544544
MergeCounters mc;
545545
auto i = mLiveFutures.find(key);
546546
if (i == mLiveFutures.end())
@@ -586,7 +586,7 @@ BucketManagerImpl::putMergeFuture(
586586
{
587587
ZoneScoped;
588588
releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST);
589-
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
589+
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
590590
CLOG_TRACE(
591591
Bucket,
592592
"BucketManager::putMergeFuture storing future for running merge {}",
@@ -598,7 +598,7 @@ BucketManagerImpl::putMergeFuture(
598598
void
599599
BucketManagerImpl::clearMergeFuturesForTesting()
600600
{
601-
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
601+
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
602602
mLiveFutures.clear();
603603
}
604604
#endif
@@ -697,7 +697,7 @@ BucketManagerImpl::cleanupStaleFiles()
697697
return;
698698
}
699699

700-
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
700+
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
701701
auto referenced = getAllReferencedBuckets();
702702
std::transform(std::begin(mSharedBuckets), std::end(mSharedBuckets),
703703
std::inserter(referenced, std::end(referenced)),
@@ -727,7 +727,7 @@ void
727727
BucketManagerImpl::forgetUnreferencedBuckets()
728728
{
729729
ZoneScoped;
730-
std::lock_guard<std::recursive_mutex> lock(mBucketFileMutex);
730+
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
731731
auto referenced = getAllReferencedBuckets();
732732
auto blReferenced = getBucketListReferencedBuckets();
733733

@@ -978,56 +978,49 @@ BucketManagerImpl::assumeState(HistoryArchiveState const& has,
978978
ZoneScoped;
979979
releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST);
980980

981+
for (uint32_t i = 0; i < BucketList::kNumLevels; ++i)
981982
{
982-
for (uint32_t i = 0; i < BucketList::kNumLevels; ++i)
983+
auto curr = getBucketByHash(hexToBin256(has.currentBuckets.at(i).curr));
984+
auto snap = getBucketByHash(hexToBin256(has.currentBuckets.at(i).snap));
985+
if (!(curr && snap))
983986
{
984-
auto curr =
985-
getBucketByHash(hexToBin256(has.currentBuckets.at(i).curr));
986-
auto snap =
987-
getBucketByHash(hexToBin256(has.currentBuckets.at(i).snap));
988-
if (!(curr && snap))
989-
{
990-
throw std::runtime_error("Missing bucket files while assuming "
991-
"saved BucketList state");
992-
}
987+
throw std::runtime_error("Missing bucket files while assuming "
988+
"saved BucketList state");
989+
}
993990

994-
auto const& nextFuture = has.currentBuckets.at(i).next;
995-
std::shared_ptr<Bucket> nextBucket = nullptr;
996-
if (nextFuture.hasOutputHash())
991+
auto const& nextFuture = has.currentBuckets.at(i).next;
992+
std::shared_ptr<Bucket> nextBucket = nullptr;
993+
if (nextFuture.hasOutputHash())
994+
{
995+
nextBucket =
996+
getBucketByHash(hexToBin256(nextFuture.getOutputHash()));
997+
if (!nextBucket)
997998
{
998-
nextBucket =
999-
getBucketByHash(hexToBin256(nextFuture.getOutputHash()));
1000-
if (!nextBucket)
1001-
{
1002-
throw std::runtime_error(
1003-
"Missing future bucket files while "
1004-
"assuming saved BucketList state");
1005-
}
999+
throw std::runtime_error("Missing future bucket files while "
1000+
"assuming saved BucketList state");
10061001
}
1002+
}
10071003

1008-
// Buckets on the BucketList should always be indexed when
1009-
// BucketListDB enabled
1010-
if (mApp.getConfig().isUsingBucketListDB())
1004+
// Buckets on the BucketList should always be indexed when
1005+
// BucketListDB enabled
1006+
if (mApp.getConfig().isUsingBucketListDB())
1007+
{
1008+
releaseAssert(curr->isEmpty() || curr->isIndexed());
1009+
releaseAssert(snap->isEmpty() || snap->isIndexed());
1010+
if (nextBucket)
10111011
{
1012-
releaseAssert(curr->isEmpty() || curr->isIndexed());
1013-
releaseAssert(snap->isEmpty() || snap->isIndexed());
1014-
if (nextBucket)
1015-
{
1016-
releaseAssert(nextBucket->isEmpty() ||
1017-
nextBucket->isIndexed());
1018-
}
1012+
releaseAssert(nextBucket->isEmpty() || nextBucket->isIndexed());
10191013
}
1020-
1021-
mBucketList->getLevel(i).setCurr(curr);
1022-
mBucketList->getLevel(i).setSnap(snap);
1023-
mBucketList->getLevel(i).setNext(nextFuture);
10241014
}
10251015

1026-
if (restartMerges)
1027-
{
1028-
mBucketList->restartMerges(mApp, maxProtocolVersion,
1029-
has.currentLedger);
1030-
}
1016+
mBucketList->getLevel(i).setCurr(curr);
1017+
mBucketList->getLevel(i).setSnap(snap);
1018+
mBucketList->getLevel(i).setNext(nextFuture);
1019+
}
1020+
1021+
if (restartMerges)
1022+
{
1023+
mBucketList->restartMerges(mApp, maxProtocolVersion, has.currentLedger);
10311024
}
10321025

10331026
mSnapshotManager->updateCurrentSnapshot(

src/bucket/BucketManagerImpl.h

+1-5
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class BucketManagerImpl : public BucketManager
4343
std::unique_ptr<TmpDirManager> mTmpDirManager;
4444
std::unique_ptr<TmpDir> mWorkDir;
4545
std::map<Hash, std::shared_ptr<Bucket>> mSharedBuckets;
46+
mutable std::recursive_mutex mBucketMutex;
4647
std::unique_ptr<std::string> mLockedBucketDir;
4748
medida::Meter& mBucketObjectInsertBatch;
4849
medida::Timer& mBucketAddBatch;
@@ -54,11 +55,6 @@ class BucketManagerImpl : public BucketManager
5455
EvictionCounters mBucketListEvictionCounters;
5556
MergeCounters mMergeCounters;
5657

57-
// Lock for managing raw Bucket files or the bucket directory. This lock is
58-
// only required for file access, but is not required for logical changes to
59-
// the BucketList (i.e. addBatch).
60-
mutable std::recursive_mutex mBucketFileMutex;
61-
6258
bool const mDeleteEntireBucketDirInDtor;
6359

6460
// Records bucket-merges that are currently _live_ in some FutureBucket, in

src/bucket/BucketSnapshot.cpp

+5-2
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,20 @@ namespace stellar
1515
BucketSnapshot::BucketSnapshot(std::shared_ptr<Bucket const> const b)
1616
: mBucket(b)
1717
{
18+
releaseAssert(mBucket);
1819
}
1920

2021
BucketSnapshot::BucketSnapshot(BucketSnapshot const& b)
2122
: mBucket(b.mBucket), mStream(nullptr)
2223
{
24+
releaseAssert(mBucket);
2325
}
2426

2527
bool
2628
BucketSnapshot::isEmpty() const
2729
{
28-
return mBucket || mBucket->isEmpty();
30+
releaseAssert(mBucket);
31+
return mBucket->isEmpty();
2932
}
3033

3134
std::optional<BucketEntry>
@@ -135,10 +138,10 @@ BucketSnapshot::getPoolIDsByAsset(Asset const& asset) const
135138
XDRInputFileStream&
136139
BucketSnapshot::getStream() const
137140
{
141+
releaseAssertOrThrow(!isEmpty());
138142
if (!mStream)
139143
{
140144
mStream = std::make_unique<XDRInputFileStream>();
141-
releaseAssertOrThrow(mBucket && !mBucket->isEmpty());
142145
mStream->open(mBucket->getFilename().string());
143146
}
144147
return *mStream;

src/bucket/BucketSnapshot.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ class XDRInputFileStream;
2020
// A lightweight wrapper around Bucket for thread safe BucketListDB lookups
2121
class BucketSnapshot : public NonMovable
2222
{
23-
std::shared_ptr<Bucket const> mBucket;
23+
std::shared_ptr<Bucket const> const mBucket;
2424

2525
// Lazily-constructed and retained for read path.
2626
mutable std::unique_ptr<XDRInputFileStream> mStream{};
@@ -40,7 +40,7 @@ class BucketSnapshot : public NonMovable
4040

4141
// Only allow copy constructor, is threadsafe
4242
BucketSnapshot(BucketSnapshot const& b);
43-
BucketSnapshot& operator=(BucketSnapshot const& b) = delete;
43+
BucketSnapshot& operator=(BucketSnapshot const&) = delete;
4444

4545
public:
4646
bool isEmpty() const;

0 commit comments

Comments
 (0)