forked from stellar/stellar-core
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTransactionQueue.h
263 lines (220 loc) · 8.51 KB
/
TransactionQueue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
#pragma once
// Copyright 2019 Stellar Development Foundation and contributors. Licensed
// under the Apache License, Version 2.0. See the COPYING file at the root
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0
#include "crypto/SecretKey.h"
#include "herder/TxQueueLimiter.h"
#include "herder/TxSetFrame.h"
#include "ledger/LedgerTxn.h"
#include "transactions/TransactionFrame.h"
#include "util/HashOfHash.h"
#include "util/Timer.h"
#include "util/XDROperators.h"
#include "xdr/Stellar-transaction.h"
#include "util/UnorderedMap.h"
#include "util/UnorderedSet.h"
#include <chrono>
#include <deque>
#include <memory>
#include <vector>
namespace medida
{
class Counter;
class Timer;
}
namespace stellar
{
class Application;
/**
* TransactionQueue keeps received transactions that are valid and have not yet
* been included in a transaction set.
*
* An accountID is in mAccountStates if and only if it is the fee-source or
* sequence-number-source for at least one transaction in the TransactionQueue.
* This invariant is maintained by releaseFeeMaybeEraseAccountState.
*
* Transactions received from the HTTP "tx" endpoint and the overlay network
* should be added by calling tryAdd. If that succeeds, the transaction may be
* removed later in three ways:
* - removeApplied() should be called after transactions are applied. It removes
* the specified transactions, but leaves transactions with subsequent
* sequence numbers in the TransactionQueue. It also resets the age for the
* sequence-number-source of each specified transaction.
* - ban() should be called after transactions become invalid for any reason.
* Banned transactions cannot be added to the TransactionQueue again for a
* banDepth ledgers.
* - shift() should be called after each ledger close, after removeApplied. It
* increases the age for every account that is the sequence-number-source for
* at least one transaction. If the age becomes greater than or equal to
* pendingDepth, all transactions for that source account are banned. It also
* unbans any transactions that have been banned for more than banDepth
* ledgers.
*/
class TransactionQueue
{
public:
static uint64_t const FEE_MULTIPLIER;
enum class AddResult
{
ADD_STATUS_PENDING = 0,
ADD_STATUS_DUPLICATE,
ADD_STATUS_ERROR,
ADD_STATUS_TRY_AGAIN_LATER,
ADD_STATUS_FILTERED,
ADD_STATUS_COUNT
};
/**
* AccountState stores the following information:
* - mTotalFees: the sum of feeBid() over every transaction for which this
* account is the fee-source (this may include transactions that are not
* in mTransactions)
* - mAge: the number of ledgers that have closed since the last ledger in
* which a transaction in mTransactions was included. This is always 0 if
* mTransactions is empty
* - mTransactions: the list of transactions for which this account is the
* sequence-number-source, ordered by sequence number
*/
struct TimestampedTx
{
TransactionFrameBasePtr mTx;
bool mBroadcasted;
VirtualClock::time_point mInsertionTime;
bool mSubmittedFromSelf;
};
using Transactions = std::vector<TransactionFrameBasePtr>;
struct AccountState
{
int64_t mTotalFees{0};
uint32_t mAge{0};
std::optional<TimestampedTx> mTransaction;
};
explicit TransactionQueue(Application& app, uint32 pendingDepth,
uint32 banDepth, uint32 poolLedgerMultiplier,
bool isSoroban);
virtual ~TransactionQueue();
static std::vector<AssetPair>
findAllAssetPairsInvolvedInPaymentLoops(TransactionFrameBasePtr tx);
AddResult tryAdd(TransactionFrameBasePtr tx, bool submittedFromSelf);
void removeApplied(Transactions const& txs);
// Ban transactions that are no longer valid or have insufficient fee;
// transaction per account limit applies here, so `txs` should have no
// duplicate source accounts
void ban(Transactions const& txs);
/**
* Increase age of each AccountState that has at least one transaction in
* mTransactions. Also increments the age for each banned transaction, and
* unbans transactions for which age equals banDepth.
*/
void shift();
void rebroadcast();
void shutdown();
bool isBanned(Hash const& hash) const;
TransactionFrameBaseConstPtr getTx(Hash const& hash) const;
TxSetTransactions getTransactions(LedgerHeader const& lcl) const;
bool sourceAccountPending(AccountID const& accountID) const;
virtual size_t getMaxQueueSizeOps() const = 0;
#ifdef BUILD_TESTS
AccountState
getAccountTransactionQueueInfo(AccountID const& accountID) const;
size_t countBanned(int index) const;
#endif
protected:
/**
* The AccountState for every account. As noted above, an AccountID is in
* AccountStates iff at least one of the following is true for the
* corresponding AccountState
* - AccountState.mTotalFees > 0
* - !AccountState.mTransactions.empty()
*/
using AccountStates = UnorderedMap<AccountID, AccountState>;
/**
* Banned transactions are stored in deque of depth banDepth, so it is easy
* to unban all transactions that were banned for long enough.
*/
using BannedTransactions = std::deque<UnorderedSet<Hash>>;
Application& mApp;
uint32 const mPendingDepth;
AccountStates mAccountStates;
BannedTransactions mBannedTransactions;
// counters
std::vector<medida::Counter*> mSizeByAge;
medida::Counter& mBannedTransactionsCounter;
medida::Counter& mArbTxSeenCounter;
medida::Counter& mArbTxDroppedCounter;
medida::Timer& mTransactionsDelay;
medida::Timer& mTransactionsSelfDelay;
UnorderedSet<OperationType> mFilteredTypes;
bool mShutdown{false};
bool mWaiting{false};
VirtualTimer mBroadcastTimer;
virtual std::pair<Resource, std::optional<Resource>>
getMaxResourcesToFloodThisPeriod() const = 0;
virtual bool broadcastSome() = 0;
virtual int getFloodPeriod() const = 0;
void broadcast(bool fromCallback);
// broadcasts a single transaction
enum class BroadcastStatus
{
BROADCAST_STATUS_ALREADY,
BROADCAST_STATUS_SUCCESS,
BROADCAST_STATUS_SKIPPED
};
BroadcastStatus broadcastTx(TimestampedTx& tx);
AddResult canAdd(TransactionFrameBasePtr tx,
AccountStates::iterator& stateIter,
std::vector<std::pair<TxStackPtr, bool>>& txsToEvict);
void releaseFeeMaybeEraseAccountState(TransactionFrameBasePtr tx);
void prepareDropTransaction(AccountState& as);
void dropTransaction(AccountStates::iterator stateIter);
void clearAll();
bool isFiltered(TransactionFrameBasePtr tx) const;
std::unique_ptr<TxQueueLimiter> mTxQueueLimiter;
UnorderedMap<AssetPair, uint32_t, AssetPairHash> mArbitrageFloodDamping;
UnorderedMap<Hash, TransactionFrameBasePtr> mKnownTxHashes;
size_t mBroadcastSeed;
#ifdef BUILD_TESTS
public:
size_t getQueueSizeOps() const;
std::optional<int64_t> getInQueueSeqNum(AccountID const& account) const;
std::function<void(TransactionFrameBasePtr&)> mTxBroadcastedEvent;
#endif
};
class SorobanTransactionQueue : public TransactionQueue
{
public:
SorobanTransactionQueue(Application& app, uint32 pendingDepth,
uint32 banDepth, uint32 poolLedgerMultiplier);
int
getFloodPeriod() const override
{
return mApp.getConfig().FLOOD_SOROBAN_TX_PERIOD_MS;
}
size_t getMaxQueueSizeOps() const override;
private:
virtual std::pair<Resource, std::optional<Resource>>
getMaxResourcesToFloodThisPeriod() const override;
virtual bool broadcastSome() override;
std::vector<Resource> mBroadcastOpCarryover;
};
class ClassicTransactionQueue : public TransactionQueue
{
public:
ClassicTransactionQueue(Application& app, uint32 pendingDepth,
uint32 banDepth, uint32 poolLedgerMultiplier);
int
getFloodPeriod() const override
{
return mApp.getConfig().FLOOD_TX_PERIOD_MS;
}
size_t getMaxQueueSizeOps() const override;
private:
virtual std::pair<Resource, std::optional<Resource>>
getMaxResourcesToFloodThisPeriod() const override;
virtual bool broadcastSome() override;
std::vector<Resource> mBroadcastOpCarryover;
};
extern std::array<const char*,
static_cast<int>(
TransactionQueue::AddResult::ADD_STATUS_COUNT)>
TX_STATUS_STRING;
}