Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bucket cleanup #4093

Merged
merged 4 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 35 additions & 25 deletions src/bucket/Bucket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,33 @@ Bucket::Bucket()
{
}

std::unique_ptr<XDRInputFileStream>
Bucket::openStream()
{
releaseAssertOrThrow(!mFilename.empty());
auto streamPtr = std::make_unique<XDRInputFileStream>();
streamPtr->open(mFilename.string());
return std::move(streamPtr);
}

XDRInputFileStream&
Bucket::getStream()
Bucket::getIndexStream()
{
if (!mStream)
if (!mIndexStream)
{
mStream = std::make_unique<XDRInputFileStream>();
releaseAssertOrThrow(!mFilename.empty());
mStream->open(mFilename.string());
mIndexStream = openStream();
}
return *mStream;
return *mIndexStream;
}

XDRInputFileStream&
Bucket::getEvictionStream()
{
if (!mEvictionStream)
{
mEvictionStream = openStream();
}
return *mEvictionStream;
}

Hash const&
Expand Down Expand Up @@ -139,15 +156,15 @@ void
Bucket::freeIndex()
{
mIndex.reset(nullptr);
mStream.reset(nullptr);
mIndexStream.reset(nullptr);
}

std::optional<BucketEntry>
Bucket::getEntryAtOffset(LedgerKey const& k, std::streamoff pos,
size_t pageSize)
{
ZoneScoped;
auto& stream = getStream();
auto& stream = getIndexStream();
stream.seek(pos);

BucketEntry be;
Expand Down Expand Up @@ -221,7 +238,7 @@ Bucket::loadKeys(std::set<LedgerKey, LedgerEntryIdCmp>& keys,

void
Bucket::loadPoolShareTrustLinessByAccount(
AccountID const& accountID, UnorderedSet<LedgerKey>& deadTrustlines,
AccountID const& accountID, UnorderedSet<LedgerKey>& seenTrustlines,
UnorderedMap<LedgerKey, LedgerEntry>& liquidityPoolKeyToTrustline,
LedgerKeySet& liquidityPoolKeys)
{
Expand All @@ -238,16 +255,16 @@ Bucket::loadPoolShareTrustLinessByAccount(
// Get upper and lower bound for poolshare trustline range associated
// with this account
auto searchRange = getIndex().getPoolshareTrustlineRange(accountID);
if (searchRange.first == 0)
if (!searchRange)
{
// No poolshare trustlines, exit
return;
}

BucketEntry be;
auto& stream = getStream();
stream.seek(searchRange.first);
while (stream && stream.pos() < searchRange.second && stream.readOne(be))
auto& stream = getIndexStream();
stream.seek(searchRange->first);
while (stream && stream.pos() < searchRange->second && stream.readOne(be))
{
LedgerEntry entry;
switch (be.type())
Expand All @@ -265,7 +282,7 @@ Bucket::loadPoolShareTrustLinessByAccount(
// later
if (trustlineCheck(key))
{
deadTrustlines.emplace(key);
seenTrustlines.emplace(key);
}
continue;
}
Expand All @@ -275,10 +292,11 @@ Bucket::loadPoolShareTrustLinessByAccount(
}

// If this is a pool share trustline that matches the accountID and
// is not shadowed, add it to results
// is the newest version of the key, add it to results
if (trustlineCheck(entry.data) &&
deadTrustlines.find(LedgerEntryKey(entry)) == deadTrustlines.end())
seenTrustlines.find(LedgerEntryKey(entry)) == seenTrustlines.end())
{
seenTrustlines.emplace(LedgerEntryKey(entry));
auto const& poolshareID =
entry.data.trustLine().asset.liquidityPoolID();

Expand Down Expand Up @@ -859,7 +877,7 @@ Bucket::scanForEviction(AbstractLedgerTxn& ltx, EvictionIterator& iter,
return true;
}

auto& stream = getStream();
auto& stream = getEvictionStream();
stream.seek(iter.bucketFileOffset);

BucketEntry be;
Expand All @@ -871,12 +889,6 @@ Bucket::scanForEviction(AbstractLedgerTxn& ltx, EvictionIterator& iter,
if (isTemporaryEntry(le.data))
{
ZoneNamedN(maybeEvict, "maybe evict entry", true);
// All Buckets maintain a single stream object. This means
// that if an TTLEntry being loaded exists in the
// same bucket as the entry being evicted, the stream may be
// modified, so we must seek back to the starting position
// after any call to load
auto initialStreamPos = stream.pos();

auto ttlKey = getTTLKey(le);
uint32_t liveUntilLedger = 0;
Expand Down Expand Up @@ -912,8 +924,6 @@ Bucket::scanForEviction(AbstractLedgerTxn& ltx, EvictionIterator& iter,
entriesEvictedCounter.inc();
--remainingEntriesToEvict;
}

stream.seek(initialStreamPos);
}
}

Expand Down
25 changes: 18 additions & 7 deletions src/bucket/Bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,33 @@ class Bucket : public std::enable_shared_from_this<Bucket>,

std::unique_ptr<BucketIndex const> mIndex{};

// Lazily-constructed and retained for read path.
std::unique_ptr<XDRInputFileStream> mStream;
// Lazily-constructed and retained for read path, one for BucketListDB reads
// and one for eviction scans
std::unique_ptr<XDRInputFileStream> mIndexStream;
std::unique_ptr<XDRInputFileStream> mEvictionStream;

// Returns index, throws if index not yet initialized
BucketIndex const& getIndex() const;

// Returns (lazily-constructed) file stream for bucket file. Note
// Returns (lazily-constructed) file stream for bucketDB search. Note
// this might be in some random position left over from a previous read --
// must be seek()'ed before use.
XDRInputFileStream& getStream();
XDRInputFileStream& getIndexStream();

// Returns (lazily-constructed) file stream for eviction scans. Unlike the
// indexStream, this should retain its position in-between calls. However, a
// node performing catchup or joining the network may need to begin evicting
// mid-bucket, so this stream should still be seeked to the proper position
// before reading.
XDRInputFileStream& getEvictionStream();

// Loads the bucket entry for LedgerKey k. Starts at file offset pos and
// reads until key is found or the end of the page.
std::optional<BucketEntry>
getEntryAtOffset(LedgerKey const& k, std::streamoff pos, size_t pageSize);

std::unique_ptr<XDRInputFileStream> openStream();

static std::string randomFileName(std::string const& tmpDir,
std::string ext);

Expand Down Expand Up @@ -110,10 +121,10 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
// stored with their corresponding liquidity pool key in
// liquidityPoolKeyToTrustline. All liquidity pool keys corresponding to
// loaded trustlines are also reduntantly stored in liquidityPoolKeys.
// If a trustline key is in deadTrustlines, it is not loaded. Whenever a
// dead trustline is found, its key is added to deadTrustlines.
// If a trustline key is in seenTrustlines, it is not loaded. Whenever a
// dead trustline is found, its key is added to seenTrustlines.
void loadPoolShareTrustLinessByAccount(
AccountID const& accountID, UnorderedSet<LedgerKey>& deadTrustlines,
AccountID const& accountID, UnorderedSet<LedgerKey>& seenTrustlines,
UnorderedMap<LedgerKey, LedgerEntry>& liquidityPoolKeyToTrustline,
LedgerKeySet& liquidityPoolKeys);

Expand Down
4 changes: 2 additions & 2 deletions src/bucket/BucketIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ class BucketIndex : public NonMovableOrCopyable

// Returns lower bound and upper bound for poolshare trustline entry
// positions associated with the given accountID. If no trustlines found,
// returns std::pair<0, 0>
virtual std::pair<std::streamoff, std::streamoff>
// returns nullopt
virtual std::optional<std::pair<std::streamoff, std::streamoff>>
getPoolshareTrustlineRange(AccountID const& accountID) const = 0;

// Returns page size for index. InidividualIndex returns 0 for page size
Expand Down
4 changes: 2 additions & 2 deletions src/bucket/BucketIndexImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ BucketIndexImpl<IndexT>::scan(Iterator start, LedgerKey const& k) const
}

template <class IndexT>
std::pair<std::streamoff, std::streamoff>
std::optional<std::pair<std::streamoff, std::streamoff>>
BucketIndexImpl<IndexT>::getPoolshareTrustlineRange(
AccountID const& accountID) const
{
Expand All @@ -430,7 +430,7 @@ BucketIndexImpl<IndexT>::getPoolshareTrustlineRange(
lower_bound_pred<typename IndexT::value_type>);
if (startIter == mData.keysToOffset.end())
{
return {};
return std::nullopt;
}

auto endIter =
Expand Down
2 changes: 1 addition & 1 deletion src/bucket/BucketIndexImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ template <class IndexT> class BucketIndexImpl : public BucketIndex
virtual std::pair<std::optional<std::streamoff>, Iterator>
scan(Iterator start, LedgerKey const& k) const override;

virtual std::pair<std::streamoff, std::streamoff>
virtual std::optional<std::pair<std::streamoff, std::streamoff>>
getPoolshareTrustlineRange(AccountID const& accountID) const override;

virtual std::streamoff
Expand Down
34 changes: 19 additions & 15 deletions src/bucket/BucketList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,17 +443,16 @@ BucketList::loadKeys(std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys) const

std::vector<LedgerEntry>
BucketList::loadPoolShareTrustLinesByAccountAndAsset(AccountID const& accountID,
Asset const& asset,
Config const& cfg) const
Asset const& asset) const
{
ZoneScoped;
UnorderedMap<LedgerKey, LedgerEntry> liquidityPoolToTrustline;
UnorderedSet<LedgerKey> deadTrustlines;
UnorderedSet<LedgerKey> seenTrustlines;
LedgerKeySet liquidityPoolKeysToSearch;

// First get all the poolshare trustlines for the given account
auto trustLineLoop = [&](std::shared_ptr<Bucket> b) {
b->loadPoolShareTrustLinessByAccount(accountID, deadTrustlines,
b->loadPoolShareTrustLinessByAccount(accountID, seenTrustlines,
liquidityPoolToTrustline,
liquidityPoolKeysToSearch);
return false; // continue
Expand Down Expand Up @@ -890,6 +889,22 @@ BucketList::scanForEviction(Application& app, AbstractLedgerTxn& ltx,
auto initialLevel = evictionIter.bucketListLevel;
auto initialIsCurr = evictionIter.isCurrBucket;
auto b = getBucketFromIter(evictionIter);

// Check to see if we can finish scanning the bucket before it receives
// an update
auto period = bucketUpdatePeriod(evictionIter.bucketListLevel,
evictionIter.isCurrBucket);

// Ledgers remaining until the current Bucket changes
auto ledgersRemainingToScanBucket = period - (ledgerSeq % period);
auto bytesRemaining = b->getSize() - evictionIter.bucketFileOffset;
if (ledgersRemainingToScanBucket * scanSize < bytesRemaining)
{
CLOG_WARNING(Bucket,
"Bucket too large for current eviction scan size.");
counters.incompleteBucketScan.inc();
}

while (!b->scanForEviction(
ltx, evictionIter, scanSize, maxEntriesToEvict, ledgerSeq,
counters.entriesEvicted, counters.bytesScannedForEviction,
Expand Down Expand Up @@ -946,17 +961,6 @@ BucketList::scanForEviction(Application& app, AbstractLedgerTxn& ltx,
}

b = getBucketFromIter(evictionIter);

// Check to see if we can finish scanning the new bucket before it
// receives an update
auto period = bucketUpdatePeriod(evictionIter.bucketListLevel,
evictionIter.isCurrBucket);
if (period * scanSize < b->getSize())
{
CLOG_WARNING(
Bucket, "Bucket too large for current eviction scan size.");
counters.incompleteBucketScan.inc();
}
}

networkConfig.updateEvictionIterator(ltx, evictionIter);
Expand Down
3 changes: 1 addition & 2 deletions src/bucket/BucketList.h
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,7 @@ class BucketList

std::vector<LedgerEntry>
loadPoolShareTrustLinesByAccountAndAsset(AccountID const& accountID,
Asset const& asset,
Config const& cfg) const;
Asset const& asset) const;

std::vector<InflationWinner> loadInflationWinners(size_t maxWinners,
int64_t minBalance) const;
Expand Down
4 changes: 2 additions & 2 deletions src/bucket/BucketManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -978,8 +978,8 @@ BucketManagerImpl::loadPoolShareTrustLinesByAccountAndAsset(
// This query needs to do a linear scan of certain regions of the
// BucketList, so the number of entries loaded is meaningless
auto timer = recordBulkLoadMetrics("poolshareTrustlines", 0).TimeScope();
return mBucketList->loadPoolShareTrustLinesByAccountAndAsset(
accountID, asset, getConfig());
return mBucketList->loadPoolShareTrustLinesByAccountAndAsset(accountID,
asset);
}

std::vector<InflationWinner>
Expand Down
Loading
Loading