forked from stellar/stellar-core
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathBucket.h
212 lines (175 loc) · 8.71 KB
/
Bucket.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
#pragma once
// Copyright 2015 Stellar Development Foundation and contributors. Licensed
// under the Apache License, Version 2.0. See the COPYING file at the root
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0
#include "bucket/BucketIndex.h"
#include "crypto/Hex.h"
#include "overlay/StellarXDR.h"
#include "util/NonCopyable.h"
#include "util/ProtocolVersion.h"
#include "util/UnorderedMap.h"
#include "util/UnorderedSet.h"
#include "util/XDRStream.h"
#include <optional>
#include <string>
namespace medida
{
class Counter;
class Meter;
}
namespace stellar
{
/**
* Bucket is an immutable container for a sorted set of "Entries" (object ID,
* hash, xdr-message tuples) which is designed to be held in a shared_ptr<>
* which is referenced between threads, to minimize copying. It is therefore
* imperative that it be _really_ immutable, not just faking it.
*
* Two buckets can be merged together efficiently (in a single pass): elements
* from the newer bucket overwrite elements from the older bucket, the rest are
* merged in sorted order, and all elements are hashed while being added.
*/
class AbstractLedgerTxn;
class Application;
class BucketManager;
struct EvictionMetrics;
class Bucket : public std::enable_shared_from_this<Bucket>,
public NonMovableOrCopyable
{
std::filesystem::path const mFilename;
Hash const mHash;
size_t mSize{0};
std::unique_ptr<BucketIndex const> mIndex{};
// Lazily-constructed and retained for read path, one for BucketListDB reads
// and one for eviction scans
std::unique_ptr<XDRInputFileStream> mIndexStream;
std::unique_ptr<XDRInputFileStream> mEvictionStream;
// Returns index, throws if index not yet initialized
BucketIndex const& getIndex() const;
// Returns (lazily-constructed) file stream for bucketDB search. Note
// this might be in some random position left over from a previous read --
// must be seek()'ed before use.
XDRInputFileStream& getIndexStream();
// Returns (lazily-constructed) file stream for eviction scans. Unlike the
// indexStream, this should retain its position in-between calls. However, a
// node performing catchup or joining the network may need to begin evicting
// mid-bucket, so this stream should still be seeked to the proper position
// before reading.
XDRInputFileStream& getEvictionStream();
// Loads the bucket entry for LedgerKey k. Starts at file offset pos and
// reads until key is found or the end of the page.
std::optional<BucketEntry>
getEntryAtOffset(LedgerKey const& k, std::streamoff pos, size_t pageSize);
std::unique_ptr<XDRInputFileStream> openStream();
static std::string randomFileName(std::string const& tmpDir,
std::string ext);
public:
// Create an empty bucket. The empty bucket has hash '000000...' and its
// filename is the empty string.
Bucket();
// Construct a bucket with a given filename and hash. Asserts that the file
// exists, but does not check that the hash is the bucket's hash. Caller
// needs to ensure that.
Bucket(std::string const& filename, Hash const& hash,
std::unique_ptr<BucketIndex const>&& index);
Hash const& getHash() const;
std::filesystem::path const& getFilename() const;
size_t getSize() const;
// Returns true if a BucketEntry that is key-wise identical to the given
// BucketEntry exists in the bucket. For testing.
bool containsBucketIdentity(BucketEntry const& id) const;
bool isEmpty() const;
// Delete index and close file stream
void freeIndex();
// Returns true if bucket is indexed, false otherwise
bool isIndexed() const;
// Sets index, throws if index is already set
void setIndex(std::unique_ptr<BucketIndex const>&& index);
// Loads bucket entry for LedgerKey k.
std::optional<BucketEntry> getBucketEntry(LedgerKey const& k);
// Loads LedgerEntry's for given keys. When a key is found, the
// entry is added to result and the key is removed from keys.
void loadKeys(std::set<LedgerKey, LedgerEntryIdCmp>& keys,
std::vector<LedgerEntry>& result);
// Loads all poolshare trustlines for the given account. Trustlines are
// stored with their corresponding liquidity pool key in
// liquidityPoolKeyToTrustline. All liquidity pool keys corresponding to
// loaded trustlines are also reduntantly stored in liquidityPoolKeys.
// If a trustline key is in seenTrustlines, it is not loaded. Whenever a
// dead trustline is found, its key is added to seenTrustlines.
void loadPoolShareTrustLinessByAccount(
AccountID const& accountID, UnorderedSet<LedgerKey>& seenTrustlines,
UnorderedMap<LedgerKey, LedgerEntry>& liquidityPoolKeyToTrustline,
LedgerKeySet& liquidityPoolKeys);
// At version 11, we added support for INITENTRY and METAENTRY. Before this
// we were only supporting LIVEENTRY and DEADENTRY.
static constexpr ProtocolVersion
FIRST_PROTOCOL_SUPPORTING_INITENTRY_AND_METAENTRY =
ProtocolVersion::V_11;
static constexpr ProtocolVersion FIRST_PROTOCOL_SHADOWS_REMOVED =
ProtocolVersion::V_12;
static void checkProtocolLegality(BucketEntry const& entry,
uint32_t protocolVersion);
static std::vector<BucketEntry>
convertToBucketEntry(bool useInit,
std::vector<LedgerEntry> const& initEntries,
std::vector<LedgerEntry> const& liveEntries,
std::vector<LedgerKey> const& deadEntries);
static std::string randomBucketName(std::string const& tmpDir);
static std::string randomBucketIndexName(std::string const& tmpDir);
// Returns false if eof reached or if Bucket protocol version < 20, true
// otherwise. Modifies iter as the bucket is scanned. Also modifies
// bytesToScan and remainingEntriesToEvict such that after this function
// returns:
// bytesToScan -= amount_bytes_scanned
// remainingEntriesToEvict -= entries_evicted
bool scanForEviction(AbstractLedgerTxn& ltx, EvictionIterator& iter,
uint32_t& bytesToScan,
uint32_t& remainingEntriesToEvict, uint32_t ledgerSeq,
medida::Counter& entriesEvictedCounter,
medida::Counter& bytesScannedForEvictionCounter,
std::optional<EvictionMetrics>& metrics);
#ifdef BUILD_TESTS
// "Applies" the bucket to the database. For each entry in the bucket,
// if the entry is init or live, creates or updates the corresponding
// entry in the database (respectively; if the entry is dead (a
// tombstone), deletes the corresponding entry in the database.
void apply(Application& app) const;
BucketIndex const&
getIndexForTesting() const
{
return getIndex();
}
#endif // BUILD_TESTS
// Create a fresh bucket from given vectors of init (created) and live
// (updated) LedgerEntries, and dead LedgerEntryKeys. The bucket will
// be sorted, hashed, and adopted in the provided BucketManager.
static std::shared_ptr<Bucket>
fresh(BucketManager& bucketManager, uint32_t protocolVersion,
std::vector<LedgerEntry> const& initEntries,
std::vector<LedgerEntry> const& liveEntries,
std::vector<LedgerKey> const& deadEntries, bool countMergeEvents,
asio::io_context& ctx, bool doFsync);
// Merge two buckets together, producing a fresh one. Entries in `oldBucket`
// are overridden in the fresh bucket by keywise-equal entries in
// `newBucket`. Entries are inhibited from the fresh bucket by keywise-equal
// entries in any of the buckets in the provided `shadows` vector.
//
// Each bucket is self-describing in terms of the ledger protocol version it
// was constructed under, and the merge algorithm adjusts to the maximum of
// the versions attached to each input or shadow bucket. The provided
// `maxProtocolVersion` bounds this (for error checking) and should usually
// be the protocol of the ledger header at which the merge is starting. An
// exception will be thrown if any provided bucket versions exceed it.
static std::shared_ptr<Bucket>
merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,
std::shared_ptr<Bucket> const& oldBucket,
std::shared_ptr<Bucket> const& newBucket,
std::vector<std::shared_ptr<Bucket>> const& shadows,
bool keepDeadEntries, bool countMergeEvents, asio::io_context& ctx,
bool doFsync);
static uint32_t getBucketVersion(std::shared_ptr<Bucket> const& bucket);
static uint32_t
getBucketVersion(std::shared_ptr<Bucket const> const& bucket);
};
}