Skip to content

Commit 004a4ba

Browse files
committed
Apply Buckets in-order when BucketListDB enabled
1 parent 7b59b05 commit 004a4ba

11 files changed

+324
-74
lines changed

src/bucket/Bucket.cpp

+8-1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,12 @@ Bucket::isIndexed() const
5353
return static_cast<bool>(mIndex);
5454
}
5555

56+
std::optional<std::pair<std::streamoff, std::streamoff>>
57+
Bucket::getOfferRange() const
58+
{
59+
return getIndex().getOfferRange();
60+
}
61+
5662
void
5763
Bucket::setIndex(std::unique_ptr<BucketIndex const>&& index)
5864
{
@@ -135,13 +141,14 @@ Bucket::apply(Application& app) const
135141
{
136142
ZoneScoped;
137143

144+
std::unordered_set<LedgerKey> emptySet;
138145
BucketApplicator applicator(
139146
app, app.getConfig().LEDGER_PROTOCOL_VERSION,
140147
0 /*set to 0 so we always load from the parent to check state*/,
141148
0 /*set to a level that's not the bottom so we don't treat live entries
142149
as init*/
143150
,
144-
shared_from_this(), [](LedgerEntryType) { return true; });
151+
shared_from_this(), [](LedgerEntryType) { return true; }, emptySet);
145152
BucketApplicator::Counters counters(app.getClock().now());
146153
while (applicator)
147154
{

src/bucket/Bucket.h

+5
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,11 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
8282
// Returns true if bucket is indexed, false otherwise
8383
bool isIndexed() const;
8484

85+
// Returns [lowerBound, upperBound) of file offsets for all offers in the
86+
// bucket, or std::nullopt if no offers exist
87+
std::optional<std::pair<std::streamoff, std::streamoff>>
88+
getOfferRange() const;
89+
8590
// Sets index, throws if index is already set
8691
void setIndex(std::unique_ptr<BucketIndex const>&& index);
8792

src/bucket/BucketApplicator.cpp

+59-2
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,15 @@ BucketApplicator::BucketApplicator(Application& app,
2121
uint32_t minProtocolVersionSeen,
2222
uint32_t level,
2323
std::shared_ptr<Bucket const> bucket,
24-
std::function<bool(LedgerEntryType)> filter)
24+
std::function<bool(LedgerEntryType)> filter,
25+
std::unordered_set<LedgerKey>& seenKeys)
2526
: mApp(app)
2627
, mMaxProtocolVersion(maxProtocolVersion)
2728
, mMinProtocolVersionSeen(minProtocolVersionSeen)
2829
, mLevel(level)
2930
, mBucketIter(bucket)
3031
, mEntryTypeFilter(filter)
32+
, mSeenKeys(seenKeys)
3133
{
3234
auto protocolVersion = mBucketIter.getMetadata().ledgerVersion;
3335
if (protocolVersion > mMaxProtocolVersion)
@@ -37,11 +39,33 @@ BucketApplicator::BucketApplicator(Application& app,
3739
"bucket protocol version {:d} exceeds maxProtocolVersion {:d}"),
3840
protocolVersion, mMaxProtocolVersion));
3941
}
42+
43+
// Only apply offers if BucketListDB is enabled
44+
if (mApp.getConfig().isUsingBucketListDB() && !bucket->isEmpty())
45+
{
46+
auto offsetOp = bucket->getOfferRange();
47+
if (offsetOp)
48+
{
49+
auto [lowOffset, highOffset] = *offsetOp;
50+
mBucketIter.seek(lowOffset);
51+
mUpperBoundOffset = highOffset;
52+
}
53+
else
54+
{
55+
// No offers in Bucket
56+
mOffersRemaining = false;
57+
}
58+
}
4059
}
4160

4261
BucketApplicator::operator bool() const
4362
{
44-
return (bool)mBucketIter;
63+
// There is more work to do (i.e. (bool) *this == true) iff:
64+
// 1. The underlying bucket iterator is not EOF and
65+
// 2. Either BucketListDB is not enabled (so we must apply all entry types)
66+
// or BucketListDB is enabled and we have offers still remaining.
67+
return static_cast<bool>(mBucketIter) &&
68+
(!mApp.getConfig().isUsingBucketListDB() || mOffersRemaining);
4569
}
4670

4771
size_t
@@ -99,11 +123,43 @@ BucketApplicator::advance(BucketApplicator::Counters& counters)
99123

100124
for (; mBucketIter; ++mBucketIter)
101125
{
126+
// Note: mUpperBoundOffset is not inclusive. However, mBucketIter.pos()
127+
// returns the file offset at the end of the currently loaded entry.
128+
// This means we must read until pos is strictly greater than the upper
129+
// bound so that we don't skip the last offer in the range.
130+
auto isUsingBucketListDB = mApp.getConfig().isUsingBucketListDB();
131+
if (isUsingBucketListDB && mBucketIter.pos() > mUpperBoundOffset)
132+
{
133+
mOffersRemaining = false;
134+
break;
135+
}
136+
102137
BucketEntry const& e = *mBucketIter;
103138
Bucket::checkProtocolLegality(e, mMaxProtocolVersion);
104139

105140
if (shouldApplyEntry(mEntryTypeFilter, e))
106141
{
142+
if (isUsingBucketListDB)
143+
{
144+
if (e.type() == LIVEENTRY || e.type() == INITENTRY)
145+
{
146+
auto [_, wasInserted] =
147+
mSeenKeys.emplace(LedgerEntryKey(e.liveEntry()));
148+
149+
// Skip seen keys
150+
if (!wasInserted)
151+
{
152+
continue;
153+
}
154+
}
155+
else
156+
{
157+
// Only apply INIT and LIVE entries
158+
mSeenKeys.emplace(e.deadEntry());
159+
continue;
160+
}
161+
}
162+
107163
counters.mark(e);
108164

109165
if (e.type() == LIVEENTRY || e.type() == INITENTRY)
@@ -148,6 +204,7 @@ BucketApplicator::advance(BucketApplicator::Counters& counters)
148204
}
149205
else
150206
{
207+
releaseAssertOrThrow(!isUsingBucketListDB);
151208
if (protocolVersionIsBefore(
152209
mMinProtocolVersionSeen,
153210
Bucket::

src/bucket/BucketApplicator.h

+11-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
#include "bucket/Bucket.h"
88
#include "bucket/BucketInputIterator.h"
9+
#include "ledger/LedgerHashUtils.h"
910
#include "util/Timer.h"
1011
#include "util/XDRStream.h"
1112
#include <memory>
@@ -28,6 +29,9 @@ class BucketApplicator
2829
BucketInputIterator mBucketIter;
2930
size_t mCount{0};
3031
std::function<bool(LedgerEntryType)> mEntryTypeFilter;
32+
std::unordered_set<LedgerKey>& mSeenKeys;
33+
std::streamoff mUpperBoundOffset;
34+
bool mOffersRemaining{true};
3135

3236
public:
3337
class Counters
@@ -63,10 +67,16 @@ class BucketApplicator
6367
VirtualClock::time_point now);
6468
};
6569

70+
// If newOffersOnly is true, only offers are applied. Additionally, the
71+
// offer is only applied iff:
72+
// 1. They are of type INITENTRY or LIVEENTRY
73+
// 2. The LedgerKey is not in seenKeys
74+
// When this flag is set, each offer key read is added to seenKeys
6675
BucketApplicator(Application& app, uint32_t maxProtocolVersion,
6776
uint32_t minProtocolVersionSeen, uint32_t level,
6877
std::shared_ptr<Bucket const> bucket,
69-
std::function<bool(LedgerEntryType)> filter);
78+
std::function<bool(LedgerEntryType)> filter,
79+
std::unordered_set<LedgerKey>& seenKeys);
7080
operator bool() const;
7181
size_t advance(Counters& counters);
7282

src/bucket/BucketIndex.h

+5
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,11 @@ class BucketIndex : public NonMovableOrCopyable
118118
virtual std::vector<PoolID> const&
119119
getPoolIDsByAsset(Asset const& asset) const = 0;
120120

121+
// Returns lower bound and upper bound for offer entry positions in the
122+
// given bucket, or std::nullopt if no offers exist
123+
virtual std::optional<std::pair<std::streamoff, std::streamoff>>
124+
getOfferRange() const = 0;
125+
121126
// Returns page size for index. InidividualIndex returns 0 for page size
122127
virtual std::streamoff getPageSize() const = 0;
123128

src/bucket/BucketIndexImpl.cpp

+38-12
Original file line numberDiff line numberDiff line change
@@ -450,16 +450,9 @@ BucketIndexImpl<IndexT>::scan(Iterator start, LedgerKey const& k) const
450450

451451
template <class IndexT>
452452
std::optional<std::pair<std::streamoff, std::streamoff>>
453-
BucketIndexImpl<IndexT>::getPoolshareTrustlineRange(
454-
AccountID const& accountID) const
453+
BucketIndexImpl<IndexT>::getOffsetBounds(LedgerKey const& lowerBound,
454+
LedgerKey const& upperBound) const
455455
{
456-
// Get the smallest and largest possible trustline keys for the given
457-
// accountID
458-
auto upperBound = getDummyPoolShareTrustlineKey(
459-
accountID, std::numeric_limits<uint8_t>::max());
460-
auto lowerBound = getDummyPoolShareTrustlineKey(
461-
accountID, std::numeric_limits<uint8_t>::min());
462-
463456
// Get the index iterators for the bounds
464457
auto startIter = std::lower_bound(
465458
mData.keysToOffset.begin(), mData.keysToOffset.end(), lowerBound,
@@ -469,9 +462,9 @@ BucketIndexImpl<IndexT>::getPoolshareTrustlineRange(
469462
return std::nullopt;
470463
}
471464

472-
auto endIter =
473-
std::upper_bound(startIter, mData.keysToOffset.end(), upperBound,
474-
upper_bound_pred<typename IndexT::value_type>);
465+
auto endIter = std::upper_bound(
466+
std::next(startIter), mData.keysToOffset.end(), upperBound,
467+
upper_bound_pred<typename IndexT::value_type>);
475468

476469
// Get file offsets based on lower and upper bound iterators
477470
std::streamoff startOff = startIter->second;
@@ -501,6 +494,39 @@ BucketIndexImpl<IndexT>::getPoolIDsByAsset(Asset const& asset) const
501494
return iter->second;
502495
}
503496

497+
template <class IndexT>
498+
std::optional<std::pair<std::streamoff, std::streamoff>>
499+
BucketIndexImpl<IndexT>::getPoolshareTrustlineRange(
500+
AccountID const& accountID) const
501+
{
502+
// Get the smallest and largest possible trustline keys for the given
503+
// accountID
504+
auto upperBound = getDummyPoolShareTrustlineKey(
505+
accountID, std::numeric_limits<uint8_t>::max());
506+
auto lowerBound = getDummyPoolShareTrustlineKey(
507+
accountID, std::numeric_limits<uint8_t>::min());
508+
509+
return getOffsetBounds(lowerBound, upperBound);
510+
}
511+
512+
template <class IndexT>
513+
std::optional<std::pair<std::streamoff, std::streamoff>>
514+
BucketIndexImpl<IndexT>::getOfferRange() const
515+
{
516+
// Get the smallest and largest possible offer keys
517+
LedgerKey upperBound(OFFER);
518+
upperBound.offer().sellerID.ed25519().fill(
519+
std::numeric_limits<uint8_t>::max());
520+
upperBound.offer().offerID = std::numeric_limits<int64_t>::max();
521+
522+
LedgerKey lowerBound(OFFER);
523+
lowerBound.offer().sellerID.ed25519().fill(
524+
std::numeric_limits<uint8_t>::min());
525+
lowerBound.offer().offerID = std::numeric_limits<int64_t>::min();
526+
527+
return getOffsetBounds(lowerBound, upperBound);
528+
}
529+
504530
#ifdef BUILD_TESTS
505531
template <class IndexT>
506532
bool

src/bucket/BucketIndexImpl.h

+9
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,12 @@ template <class IndexT> class BucketIndexImpl : public BucketIndex
6666
// Saves index to disk, overwriting any preexisting file for this index
6767
void saveToDisk(BucketManager& bm, Hash const& hash) const;
6868

69+
// Returns [lowFileOffset, highFileOffset) that contain the key ranges
70+
// [lowerBound, upperBound]. If no file offsets exist, returns [0, 0]
71+
std::optional<std::pair<std::streamoff, std::streamoff>>
72+
getOffsetBounds(LedgerKey const& lowerBound,
73+
LedgerKey const& upperBound) const;
74+
6975
friend BucketIndex;
7076

7177
public:
@@ -81,6 +87,9 @@ template <class IndexT> class BucketIndexImpl : public BucketIndex
8187
virtual std::vector<PoolID> const&
8288
getPoolIDsByAsset(Asset const& asset) const override;
8389

90+
virtual std::optional<std::pair<std::streamoff, std::streamoff>>
91+
getOfferRange() const override;
92+
8493
virtual std::streamoff
8594
getPageSize() const override
8695
{

src/bucket/BucketInputIterator.cpp

+8-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ BucketInputIterator::loadEntry()
5252
}
5353
}
5454

55-
size_t
55+
std::streamoff
5656
BucketInputIterator::pos()
5757
{
5858
return mIn.pos();
@@ -124,4 +124,11 @@ BucketInputIterator::operator++()
124124
}
125125
return *this;
126126
}
127+
128+
void
129+
BucketInputIterator::seek(std::streamoff offset)
130+
{
131+
mIn.seek(offset);
132+
loadEntry();
133+
}
127134
}

src/bucket/BucketInputIterator.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ class BucketInputIterator
5252

5353
BucketInputIterator& operator++();
5454

55-
size_t pos();
55+
std::streamoff pos();
5656
size_t size() const;
57+
void seek(std::streamoff offset);
5758
};
5859
}

0 commit comments

Comments
 (0)