Skip to content

Commit 8625d56

Browse files
committed
Refactored BucketListDB for concurrency
1 parent 0bb4d1c commit 8625d56

16 files changed

+776
-598
lines changed

src/bucket/Bucket.cpp

+11-173
Original file line numberDiff line numberDiff line change
@@ -77,18 +77,6 @@ Bucket::Bucket()
7777
{
7878
}
7979

80-
XDRInputFileStream&
81-
Bucket::getStream()
82-
{
83-
if (!mStream)
84-
{
85-
mStream = std::make_unique<XDRInputFileStream>();
86-
releaseAssertOrThrow(!mFilename.empty());
87-
mStream->open(mFilename.string());
88-
}
89-
return *mStream;
90-
}
91-
9280
Hash const&
9381
Bucket::getHash() const
9482
{
@@ -139,157 +127,6 @@ void
139127
Bucket::freeIndex()
140128
{
141129
mIndex.reset(nullptr);
142-
mStream.reset(nullptr);
143-
}
144-
145-
std::optional<BucketEntry>
146-
Bucket::getEntryAtOffset(LedgerKey const& k, std::streamoff pos,
147-
size_t pageSize)
148-
{
149-
ZoneScoped;
150-
auto& stream = getStream();
151-
stream.seek(pos);
152-
153-
BucketEntry be;
154-
if (pageSize == 0)
155-
{
156-
if (stream.readOne(be))
157-
{
158-
return std::make_optional(be);
159-
}
160-
}
161-
else if (stream.readPage(be, k, pageSize))
162-
{
163-
return std::make_optional(be);
164-
}
165-
166-
// Mark entry miss for metrics
167-
getIndex().markBloomMiss();
168-
return std::nullopt;
169-
}
170-
171-
std::optional<BucketEntry>
172-
Bucket::getBucketEntry(LedgerKey const& k)
173-
{
174-
ZoneScoped;
175-
auto pos = getIndex().lookup(k);
176-
if (pos.has_value())
177-
{
178-
return getEntryAtOffset(k, pos.value(), getIndex().getPageSize());
179-
}
180-
181-
return std::nullopt;
182-
}
183-
184-
// When searching for an entry, BucketList calls this function on every bucket.
185-
// Since the input is sorted, we do a binary search for the first key in keys.
186-
// If we find the entry, we remove the found key from keys so that later buckets
187-
// do not load shadowed entries. If we don't find the entry, we do not remove it
188-
// from keys so that it will be searched for again at a lower level.
189-
void
190-
Bucket::loadKeys(std::set<LedgerKey, LedgerEntryIdCmp>& keys,
191-
std::vector<LedgerEntry>& result)
192-
{
193-
ZoneScoped;
194-
195-
auto currKeyIt = keys.begin();
196-
auto const& index = getIndex();
197-
auto indexIter = index.begin();
198-
while (currKeyIt != keys.end() && indexIter != index.end())
199-
{
200-
auto [offOp, newIndexIter] = index.scan(indexIter, *currKeyIt);
201-
indexIter = newIndexIter;
202-
if (offOp)
203-
{
204-
auto entryOp =
205-
getEntryAtOffset(*currKeyIt, *offOp, getIndex().getPageSize());
206-
if (entryOp)
207-
{
208-
if (entryOp->type() != DEADENTRY)
209-
{
210-
result.push_back(entryOp->liveEntry());
211-
}
212-
213-
currKeyIt = keys.erase(currKeyIt);
214-
continue;
215-
}
216-
}
217-
218-
++currKeyIt;
219-
}
220-
}
221-
222-
void
223-
Bucket::loadPoolShareTrustLinessByAccount(
224-
AccountID const& accountID, UnorderedSet<LedgerKey>& deadTrustlines,
225-
UnorderedMap<LedgerKey, LedgerEntry>& liquidityPoolKeyToTrustline,
226-
LedgerKeySet& liquidityPoolKeys)
227-
{
228-
ZoneScoped;
229-
230-
// Takes a LedgerKey or LedgerEntry::_data_t, returns true if entry is a
231-
// poolshare trusline for the given accountID
232-
auto trustlineCheck = [&accountID](auto const& entry) {
233-
return entry.type() == TRUSTLINE &&
234-
entry.trustLine().asset.type() == ASSET_TYPE_POOL_SHARE &&
235-
entry.trustLine().accountID == accountID;
236-
};
237-
238-
// Get upper and lower bound for poolshare trustline range associated
239-
// with this account
240-
auto searchRange = getIndex().getPoolshareTrustlineRange(accountID);
241-
if (searchRange.first == 0)
242-
{
243-
// No poolshare trustlines, exit
244-
return;
245-
}
246-
247-
BucketEntry be;
248-
auto& stream = getStream();
249-
stream.seek(searchRange.first);
250-
while (stream && stream.pos() < searchRange.second && stream.readOne(be))
251-
{
252-
LedgerEntry entry;
253-
switch (be.type())
254-
{
255-
case LIVEENTRY:
256-
case INITENTRY:
257-
entry = be.liveEntry();
258-
break;
259-
case DEADENTRY:
260-
{
261-
auto key = be.deadEntry();
262-
263-
// If we find a valid trustline key and we have not seen the
264-
// key yet, mark it as dead so we do not load a shadowed version
265-
// later
266-
if (trustlineCheck(key))
267-
{
268-
deadTrustlines.emplace(key);
269-
}
270-
continue;
271-
}
272-
case METAENTRY:
273-
default:
274-
throw std::invalid_argument("Indexed METAENTRY");
275-
}
276-
277-
// If this is a pool share trustline that matches the accountID and
278-
// is not shadowed, add it to results
279-
if (trustlineCheck(entry.data) &&
280-
deadTrustlines.find(LedgerEntryKey(entry)) == deadTrustlines.end())
281-
{
282-
auto const& poolshareID =
283-
entry.data.trustLine().asset.liquidityPoolID();
284-
285-
LedgerKey key;
286-
key.type(LIQUIDITY_POOL);
287-
key.liquidityPool().liquidityPoolID = poolshareID;
288-
289-
liquidityPoolKeyToTrustline.emplace(key, entry);
290-
liquidityPoolKeys.emplace(key);
291-
}
292-
}
293130
}
294131

295132
#ifdef BUILD_TESTS
@@ -837,12 +674,12 @@ mergeCasesWithEqualKeys(MergeCounters& mc, BucketInputIterator& oi,
837674
}
838675

839676
bool
840-
Bucket::scanForEviction(AbstractLedgerTxn& ltx, EvictionIterator& iter,
841-
uint64_t& bytesToScan, uint32_t& maxEntriesToEvict,
842-
uint32_t ledgerSeq,
843-
medida::Counter& entriesEvictedCounter,
844-
medida::Counter& bytesScannedForEvictionCounter,
845-
std::optional<EvictionMetrics>& metrics)
677+
Bucket::scanForEvictionLegacySQL(
678+
AbstractLedgerTxn& ltx, EvictionIterator& iter, uint64_t& bytesToScan,
679+
uint32_t& maxEntriesToEvict, uint32_t ledgerSeq,
680+
medida::Counter& entriesEvictedCounter,
681+
medida::Counter& bytesScannedForEvictionCounter,
682+
std::optional<EvictionStatistics>& stats) const
846683
{
847684
ZoneScoped;
848685
if (isEmpty())
@@ -857,7 +694,8 @@ Bucket::scanForEviction(AbstractLedgerTxn& ltx, EvictionIterator& iter,
857694
return true;
858695
}
859696

860-
auto& stream = getStream();
697+
XDRInputFileStream stream{};
698+
stream.open(mFilename);
861699
stream.seek(iter.bucketFileOffset);
862700

863701
BucketEntry be;
@@ -898,10 +736,10 @@ Bucket::scanForEviction(AbstractLedgerTxn& ltx, EvictionIterator& iter,
898736
if (shouldEvict())
899737
{
900738
ZoneNamedN(evict, "evict entry", true);
901-
if (metrics.has_value())
739+
if (stats.has_value())
902740
{
903-
++metrics->numEntriesEvicted;
904-
metrics->evictedEntriesAgeSum +=
741+
++stats->numEntriesEvicted;
742+
stats->evictedEntriesAgeSum +=
905743
ledgerSeq - liveUntilLedger;
906744
}
907745

src/bucket/Bucket.h

+16-45
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ namespace stellar
3838
class AbstractLedgerTxn;
3939
class Application;
4040
class BucketManager;
41-
struct EvictionMetrics;
41+
struct EvictionStatistics;
4242

4343
class Bucket : public std::enable_shared_from_this<Bucket>,
4444
public NonMovableOrCopyable
@@ -49,22 +49,9 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
4949

5050
std::unique_ptr<BucketIndex const> mIndex{};
5151

52-
// Lazily-constructed and retained for read path.
53-
std::unique_ptr<XDRInputFileStream> mStream;
54-
5552
// Returns index, throws if index not yet initialized
5653
BucketIndex const& getIndex() const;
5754

58-
// Returns (lazily-constructed) file stream for bucket file. Note
59-
// this might be in some random position left over from a previous read --
60-
// must be seek()'ed before use.
61-
XDRInputFileStream& getStream();
62-
63-
// Loads the bucket entry for LedgerKey k. Starts at file offset pos and
64-
// reads until key is found or the end of the page.
65-
std::optional<BucketEntry>
66-
getEntryAtOffset(LedgerKey const& k, std::streamoff pos, size_t pageSize);
67-
6855
static std::string randomFileName(std::string const& tmpDir,
6956
std::string ext);
7057

@@ -98,25 +85,6 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
9885
// Sets index, throws if index is already set
9986
void setIndex(std::unique_ptr<BucketIndex const>&& index);
10087

101-
// Loads bucket entry for LedgerKey k.
102-
std::optional<BucketEntry> getBucketEntry(LedgerKey const& k);
103-
104-
// Loads LedgerEntry's for given keys. When a key is found, the
105-
// entry is added to result and the key is removed from keys.
106-
void loadKeys(std::set<LedgerKey, LedgerEntryIdCmp>& keys,
107-
std::vector<LedgerEntry>& result);
108-
109-
// Loads all poolshare trustlines for the given account. Trustlines are
110-
// stored with their corresponding liquidity pool key in
111-
// liquidityPoolKeyToTrustline. All liquidity pool keys corresponding to
112-
// loaded trustlines are also reduntantly stored in liquidityPoolKeys.
113-
// If a trustline key is in deadTrustlines, it is not loaded. Whenever a
114-
// dead trustline is found, its key is added to deadTrustlines.
115-
void loadPoolShareTrustLinessByAccount(
116-
AccountID const& accountID, UnorderedSet<LedgerKey>& deadTrustlines,
117-
UnorderedMap<LedgerKey, LedgerEntry>& liquidityPoolKeyToTrustline,
118-
LedgerKeySet& liquidityPoolKeys);
119-
12088
// At version 11, we added support for INITENTRY and METAENTRY. Before this
12189
// we were only supporting LIVEENTRY and DEADENTRY.
12290
static constexpr ProtocolVersion
@@ -137,18 +105,6 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
137105
static std::string randomBucketName(std::string const& tmpDir);
138106
static std::string randomBucketIndexName(std::string const& tmpDir);
139107

140-
// Returns false if eof reached, true otherwise. Modifies iter as the bucket
141-
// is scanned. Also modifies bytesToScan and maxEntriesToEvict such that
142-
// after this function returns:
143-
// bytesToScan -= amount_bytes_scanned
144-
// maxEntriesToEvict -= entries_evicted
145-
bool scanForEviction(AbstractLedgerTxn& ltx, EvictionIterator& iter,
146-
uint64_t& bytesToScan, uint32_t& maxEntriesToEvict,
147-
uint32_t ledgerSeq,
148-
medida::Counter& entriesEvictedCounter,
149-
medida::Counter& bytesScannedForEvictionCounter,
150-
std::optional<EvictionMetrics>& metrics);
151-
152108
#ifdef BUILD_TESTS
153109
// "Applies" the bucket to the database. For each entry in the bucket,
154110
// if the entry is init or live, creates or updates the corresponding
@@ -164,6 +120,19 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
164120

165121
#endif // BUILD_TESTS
166122

123+
// Returns false if eof reached, true otherwise. Modifies iter as the bucket
124+
// is scanned. Also modifies bytesToScan and maxEntriesToEvict such that
125+
// after this function returns:
126+
// bytesToScan -= amount_bytes_scanned
127+
// maxEntriesToEvict -= entries_evicted
128+
bool
129+
scanForEvictionLegacySQL(AbstractLedgerTxn& ltx, EvictionIterator& iter,
130+
uint64_t& bytesToScan, uint32_t& maxEntriesToEvict,
131+
uint32_t ledgerSeq,
132+
medida::Counter& entriesEvictedCounter,
133+
medida::Counter& bytesScannedForEvictionCounter,
134+
std::optional<EvictionStatistics>& stats) const;
135+
167136
// Create a fresh bucket from given vectors of init (created) and live
168137
// (updated) LedgerEntries, and dead LedgerEntryKeys. The bucket will
169138
// be sorted, hashed, and adopted in the provided BucketManager.
@@ -196,5 +165,7 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
196165
static uint32_t getBucketVersion(std::shared_ptr<Bucket> const& bucket);
197166
static uint32_t
198167
getBucketVersion(std::shared_ptr<Bucket const> const& bucket);
168+
169+
friend class SearchableBucketSnapshot;
199170
};
200171
}

0 commit comments

Comments
 (0)