Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor for threadsafe BucketListDB #4172

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 11 additions & 173 deletions src/bucket/Bucket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,6 @@ Bucket::Bucket()
{
}

XDRInputFileStream&
Bucket::getStream()
{
if (!mStream)
{
mStream = std::make_unique<XDRInputFileStream>();
releaseAssertOrThrow(!mFilename.empty());
mStream->open(mFilename.string());
}
return *mStream;
}

Hash const&
Bucket::getHash() const
{
Expand Down Expand Up @@ -139,157 +127,6 @@ void
Bucket::freeIndex()
{
mIndex.reset(nullptr);
mStream.reset(nullptr);
}

std::optional<BucketEntry>
Bucket::getEntryAtOffset(LedgerKey const& k, std::streamoff pos,
size_t pageSize)
{
ZoneScoped;
auto& stream = getStream();
stream.seek(pos);

BucketEntry be;
if (pageSize == 0)
{
if (stream.readOne(be))
{
return std::make_optional(be);
}
}
else if (stream.readPage(be, k, pageSize))
{
return std::make_optional(be);
}

// Mark entry miss for metrics
getIndex().markBloomMiss();
return std::nullopt;
}

std::optional<BucketEntry>
Bucket::getBucketEntry(LedgerKey const& k)
{
ZoneScoped;
auto pos = getIndex().lookup(k);
if (pos.has_value())
{
return getEntryAtOffset(k, pos.value(), getIndex().getPageSize());
}

return std::nullopt;
}

// When searching for an entry, BucketList calls this function on every bucket.
// Since the input is sorted, we do a binary search for the first key in keys.
// If we find the entry, we remove the found key from keys so that later buckets
// do not load shadowed entries. If we don't find the entry, we do not remove it
// from keys so that it will be searched for again at a lower level.
void
Bucket::loadKeys(std::set<LedgerKey, LedgerEntryIdCmp>& keys,
std::vector<LedgerEntry>& result)
{
ZoneScoped;

auto currKeyIt = keys.begin();
auto const& index = getIndex();
auto indexIter = index.begin();
while (currKeyIt != keys.end() && indexIter != index.end())
{
auto [offOp, newIndexIter] = index.scan(indexIter, *currKeyIt);
indexIter = newIndexIter;
if (offOp)
{
auto entryOp =
getEntryAtOffset(*currKeyIt, *offOp, getIndex().getPageSize());
if (entryOp)
{
if (entryOp->type() != DEADENTRY)
{
result.push_back(entryOp->liveEntry());
}

currKeyIt = keys.erase(currKeyIt);
continue;
}
}

++currKeyIt;
}
}

void
Bucket::loadPoolShareTrustLinessByAccount(
AccountID const& accountID, UnorderedSet<LedgerKey>& deadTrustlines,
UnorderedMap<LedgerKey, LedgerEntry>& liquidityPoolKeyToTrustline,
LedgerKeySet& liquidityPoolKeys)
{
ZoneScoped;

// Takes a LedgerKey or LedgerEntry::_data_t, returns true if entry is a
// poolshare trusline for the given accountID
auto trustlineCheck = [&accountID](auto const& entry) {
return entry.type() == TRUSTLINE &&
entry.trustLine().asset.type() == ASSET_TYPE_POOL_SHARE &&
entry.trustLine().accountID == accountID;
};

// Get upper and lower bound for poolshare trustline range associated
// with this account
auto searchRange = getIndex().getPoolshareTrustlineRange(accountID);
if (searchRange.first == 0)
{
// No poolshare trustlines, exit
return;
}

BucketEntry be;
auto& stream = getStream();
stream.seek(searchRange.first);
while (stream && stream.pos() < searchRange.second && stream.readOne(be))
{
LedgerEntry entry;
switch (be.type())
{
case LIVEENTRY:
case INITENTRY:
entry = be.liveEntry();
break;
case DEADENTRY:
{
auto key = be.deadEntry();

// If we find a valid trustline key and we have not seen the
// key yet, mark it as dead so we do not load a shadowed version
// later
if (trustlineCheck(key))
{
deadTrustlines.emplace(key);
}
continue;
}
case METAENTRY:
default:
throw std::invalid_argument("Indexed METAENTRY");
}

// If this is a pool share trustline that matches the accountID and
// is not shadowed, add it to results
if (trustlineCheck(entry.data) &&
deadTrustlines.find(LedgerEntryKey(entry)) == deadTrustlines.end())
{
auto const& poolshareID =
entry.data.trustLine().asset.liquidityPoolID();

LedgerKey key;
key.type(LIQUIDITY_POOL);
key.liquidityPool().liquidityPoolID = poolshareID;

liquidityPoolKeyToTrustline.emplace(key, entry);
liquidityPoolKeys.emplace(key);
}
}
}

#ifdef BUILD_TESTS
Expand Down Expand Up @@ -837,12 +674,12 @@ mergeCasesWithEqualKeys(MergeCounters& mc, BucketInputIterator& oi,
}

bool
Bucket::scanForEviction(AbstractLedgerTxn& ltx, EvictionIterator& iter,
uint64_t& bytesToScan, uint32_t& maxEntriesToEvict,
uint32_t ledgerSeq,
medida::Counter& entriesEvictedCounter,
medida::Counter& bytesScannedForEvictionCounter,
std::optional<EvictionMetrics>& metrics)
Bucket::scanForEvictionLegacySQL(
AbstractLedgerTxn& ltx, EvictionIterator& iter, uint64_t& bytesToScan,
uint32_t& maxEntriesToEvict, uint32_t ledgerSeq,
medida::Counter& entriesEvictedCounter,
medida::Counter& bytesScannedForEvictionCounter,
std::optional<EvictionStatistics>& stats) const
{
ZoneScoped;
if (isEmpty())
Expand All @@ -857,7 +694,8 @@ Bucket::scanForEviction(AbstractLedgerTxn& ltx, EvictionIterator& iter,
return true;
}

auto& stream = getStream();
XDRInputFileStream stream{};
stream.open(mFilename);
stream.seek(iter.bucketFileOffset);

BucketEntry be;
Expand Down Expand Up @@ -898,10 +736,10 @@ Bucket::scanForEviction(AbstractLedgerTxn& ltx, EvictionIterator& iter,
if (shouldEvict())
{
ZoneNamedN(evict, "evict entry", true);
if (metrics.has_value())
if (stats.has_value())
{
++metrics->numEntriesEvicted;
metrics->evictedEntriesAgeSum +=
++stats->numEntriesEvicted;
stats->evictedEntriesAgeSum +=
ledgerSeq - liveUntilLedger;
}

Expand Down
61 changes: 16 additions & 45 deletions src/bucket/Bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ namespace stellar
class AbstractLedgerTxn;
class Application;
class BucketManager;
struct EvictionMetrics;
struct EvictionStatistics;

class Bucket : public std::enable_shared_from_this<Bucket>,
public NonMovableOrCopyable
Expand All @@ -49,22 +49,9 @@ class Bucket : public std::enable_shared_from_this<Bucket>,

std::unique_ptr<BucketIndex const> mIndex{};

// Lazily-constructed and retained for read path.
std::unique_ptr<XDRInputFileStream> mStream;

// Returns index, throws if index not yet initialized
BucketIndex const& getIndex() const;

// Returns (lazily-constructed) file stream for bucket file. Note
// this might be in some random position left over from a previous read --
// must be seek()'ed before use.
XDRInputFileStream& getStream();

// Loads the bucket entry for LedgerKey k. Starts at file offset pos and
// reads until key is found or the end of the page.
std::optional<BucketEntry>
getEntryAtOffset(LedgerKey const& k, std::streamoff pos, size_t pageSize);

static std::string randomFileName(std::string const& tmpDir,
std::string ext);

Expand Down Expand Up @@ -98,25 +85,6 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
// Sets index, throws if index is already set
void setIndex(std::unique_ptr<BucketIndex const>&& index);

// Loads bucket entry for LedgerKey k.
std::optional<BucketEntry> getBucketEntry(LedgerKey const& k);

// Loads LedgerEntry's for given keys. When a key is found, the
// entry is added to result and the key is removed from keys.
void loadKeys(std::set<LedgerKey, LedgerEntryIdCmp>& keys,
std::vector<LedgerEntry>& result);

// Loads all poolshare trustlines for the given account. Trustlines are
// stored with their corresponding liquidity pool key in
// liquidityPoolKeyToTrustline. All liquidity pool keys corresponding to
// loaded trustlines are also reduntantly stored in liquidityPoolKeys.
// If a trustline key is in deadTrustlines, it is not loaded. Whenever a
// dead trustline is found, its key is added to deadTrustlines.
void loadPoolShareTrustLinessByAccount(
AccountID const& accountID, UnorderedSet<LedgerKey>& deadTrustlines,
UnorderedMap<LedgerKey, LedgerEntry>& liquidityPoolKeyToTrustline,
LedgerKeySet& liquidityPoolKeys);

// At version 11, we added support for INITENTRY and METAENTRY. Before this
// we were only supporting LIVEENTRY and DEADENTRY.
static constexpr ProtocolVersion
Expand All @@ -137,18 +105,6 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
static std::string randomBucketName(std::string const& tmpDir);
static std::string randomBucketIndexName(std::string const& tmpDir);

// Returns false if eof reached, true otherwise. Modifies iter as the bucket
// is scanned. Also modifies bytesToScan and maxEntriesToEvict such that
// after this function returns:
// bytesToScan -= amount_bytes_scanned
// maxEntriesToEvict -= entries_evicted
bool scanForEviction(AbstractLedgerTxn& ltx, EvictionIterator& iter,
uint64_t& bytesToScan, uint32_t& maxEntriesToEvict,
uint32_t ledgerSeq,
medida::Counter& entriesEvictedCounter,
medida::Counter& bytesScannedForEvictionCounter,
std::optional<EvictionMetrics>& metrics);

#ifdef BUILD_TESTS
// "Applies" the bucket to the database. For each entry in the bucket,
// if the entry is init or live, creates or updates the corresponding
Expand All @@ -164,6 +120,19 @@ class Bucket : public std::enable_shared_from_this<Bucket>,

#endif // BUILD_TESTS

// Returns false if eof reached, true otherwise. Modifies iter as the bucket
// is scanned. Also modifies bytesToScan and maxEntriesToEvict such that
// after this function returns:
// bytesToScan -= amount_bytes_scanned
// maxEntriesToEvict -= entries_evicted
bool
scanForEvictionLegacySQL(AbstractLedgerTxn& ltx, EvictionIterator& iter,
uint64_t& bytesToScan, uint32_t& maxEntriesToEvict,
uint32_t ledgerSeq,
medida::Counter& entriesEvictedCounter,
medida::Counter& bytesScannedForEvictionCounter,
std::optional<EvictionStatistics>& stats) const;

// Create a fresh bucket from given vectors of init (created) and live
// (updated) LedgerEntries, and dead LedgerEntryKeys. The bucket will
// be sorted, hashed, and adopted in the provided BucketManager.
Expand Down Expand Up @@ -196,5 +165,7 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
static uint32_t getBucketVersion(std::shared_ptr<Bucket> const& bucket);
static uint32_t
getBucketVersion(std::shared_ptr<Bucket const> const& bucket);

friend class SearchableBucketSnapshot;
};
}
Loading