Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify tx queue, remove transaction chains support #4196

Merged
merged 3 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/herder/HerderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ HerderImpl::broadcast(SCPEnvelope const& e)
e.statement.slotIndex);

mSCPMetrics.mEnvelopeEmit.Mark();
mApp.getOverlayManager().broadcastMessage(m, false);
mApp.getOverlayManager().broadcastMessage(m);
}
}

Expand Down
552 changes: 150 additions & 402 deletions src/herder/TransactionQueue.cpp

Large diffs are not rendered by default.

36 changes: 8 additions & 28 deletions src/herder/TransactionQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,23 +72,6 @@ class TransactionQueue
ADD_STATUS_COUNT
};

/*
* Information about queue of transaction for given account. mAge and
* mTotalFees are stored in queue, but mMaxSeq must be computed each
* time (its O(1) anyway).
*/
struct AccountTxQueueInfo
{
SequenceNumber mMaxSeq{0};
int64_t mTotalFees{0};
size_t mQueueSizeOps{0};
size_t mBroadcastQueueOps{0};
uint32_t mAge{0};

friend bool operator==(AccountTxQueueInfo const& x,
AccountTxQueueInfo const& y);
};

/**
* AccountState stores the following information:
* - mTotalFees: the sum of feeBid() over every transaction for which this
Expand All @@ -108,15 +91,12 @@ class TransactionQueue
VirtualClock::time_point mInsertionTime;
bool mSubmittedFromSelf;
};
using TimestampedTransactions = std::vector<TimestampedTx>;
using Transactions = std::vector<TransactionFrameBasePtr>;
struct AccountState
{
int64_t mTotalFees{0};
size_t mQueueSizeOps{0};
size_t mBroadcastQueueOps{0};
uint32_t mAge{0};
TimestampedTransactions mTransactions;
std::optional<TimestampedTx> mTransaction;
};

explicit TransactionQueue(Application& app, uint32 pendingDepth,
Expand All @@ -129,6 +109,9 @@ class TransactionQueue

AddResult tryAdd(TransactionFrameBasePtr tx, bool submittedFromSelf);
void removeApplied(Transactions const& txs);
// Ban transactions that are no longer valid or have insufficient fee;
// transaction per account limit applies here, so `txs` should have no
// duplicate source accounts
void ban(Transactions const& txs);

/**
Expand All @@ -148,7 +131,7 @@ class TransactionQueue
virtual size_t getMaxQueueSizeOps() const = 0;

#ifdef BUILD_TESTS
AccountTxQueueInfo
AccountState
getAccountTransactionQueueInfo(AccountID const& accountID) const;
size_t countBanned(int index) const;
#endif
Expand Down Expand Up @@ -202,18 +185,15 @@ class TransactionQueue
BROADCAST_STATUS_SUCCESS,
BROADCAST_STATUS_SKIPPED
};
BroadcastStatus broadcastTx(AccountState& state, TimestampedTx& tx);
BroadcastStatus broadcastTx(TimestampedTx& tx);
AddResult canAdd(TransactionFrameBasePtr tx,
AccountStates::iterator& stateIter,
TimestampedTransactions::iterator& oldTxIter,
std::vector<std::pair<TxStackPtr, bool>>& txsToEvict);

void releaseFeeMaybeEraseAccountState(TransactionFrameBasePtr tx);

void prepareDropTransaction(AccountState& as, TimestampedTx& tstx);
void dropTransactions(AccountStates::iterator stateIter,
TimestampedTransactions::iterator begin,
TimestampedTransactions::iterator end);
void prepareDropTransaction(AccountState& as);
void dropTransaction(AccountStates::iterator stateIter);

void clearAll();

Expand Down
41 changes: 27 additions & 14 deletions src/herder/test/TransactionQueueTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,14 @@ class TransactionQueueTest
REQUIRE(size - toRemove.size() >=
mTransactionQueue.getTransactions({}).size());
}

// Everything that got removed should have age=0
for (auto const& tx : toRemove)
{
auto txInfo = mTransactionQueue.getAccountTransactionQueueInfo(
tx->getSourceID());
REQUIRE(txInfo.mAge == 0);
}
}

void
Expand Down Expand Up @@ -201,11 +209,16 @@ class TransactionQueueTest
accountState.mAccountID);
REQUIRE(accountTransactionQueueInfo.mTotalFees ==
expectedFees[accountState.mAccountID]);
REQUIRE(accountTransactionQueueInfo.mMaxSeq == seqNum);
auto queueSeqNum =
accountTransactionQueueInfo.mTransaction
? accountTransactionQueueInfo.mTransaction->mTx->getSeqNum()
: 0;
totOps += accountTransactionQueueInfo.mTransaction
? accountTransactionQueueInfo.mTransaction->mTx
->getNumOperations()
: 0;
REQUIRE(queueSeqNum == seqNum);
REQUIRE(accountTransactionQueueInfo.mAge == accountState.mAge);
REQUIRE(accountTransactionQueueInfo.mBroadcastQueueOps ==
accountTransactionQueueInfo.mQueueSizeOps);
totOps += accountTransactionQueueInfo.mQueueSizeOps;

expectedTxs.insert(expectedTxs.end(),
accountState.mAccountTransactions.begin(),
Expand Down Expand Up @@ -543,19 +556,21 @@ testTransactionQueueBasicScenarios()
test.add(txSeqA1T1, TransactionQueue::AddResult::ADD_STATUS_DUPLICATE);
test.check(state);

auto status = TransactionQueue::AddResult::ADD_STATUS_TRY_AGAIN_LATER;
test.add(txSeqA1T2, status);
test.add(txSeqA1T2,
TransactionQueue::AddResult::ADD_STATUS_TRY_AGAIN_LATER);
test.check(state);

// Regardless of seqnum validity, tx is rejected due to limit
// Tx is rejected due to limit or bad seqnum
// too low
test.add(txSeqA1T0, status);
test.add(txSeqA1T0, TransactionQueue::AddResult::ADD_STATUS_ERROR);
test.check(state);
// too high
test.add(txSeqA1T4, status);
test.add(txSeqA1T4,
TransactionQueue::AddResult::ADD_STATUS_TRY_AGAIN_LATER);
test.check(state);
// just right
test.add(txSeqA1T3, status);
test.add(txSeqA1T3,
TransactionQueue::AddResult::ADD_STATUS_TRY_AGAIN_LATER);
test.check(state);
}

Expand Down Expand Up @@ -990,14 +1005,12 @@ TEST_CASE_VERSIONS("TransactionQueue with PreconditionsV2",
test.add(txSeqA1S5MinSeqNum,
TransactionQueue::AddResult::ADD_STATUS_DUPLICATE);

// try fill gap (invalid behavior), but account limit kicks in first
// try to fill in gap with a tx
test.add(txSeqA1S2,
TransactionQueue::AddResult::ADD_STATUS_TRY_AGAIN_LATER);
test.add(txSeqA1S2, TransactionQueue::AddResult::ADD_STATUS_ERROR);

// try to fill in gap with a minSeqNum tx
test.add(txSeqA1S4MinSeqNum,
TransactionQueue::AddResult::ADD_STATUS_TRY_AGAIN_LATER);
TransactionQueue::AddResult::ADD_STATUS_ERROR);

test.check({{{account1, 0, {txSeqA1S5MinSeqNum}}, {account2}}, {}});

Expand Down
5 changes: 2 additions & 3 deletions src/overlay/Floodgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ Floodgate::addRecord(StellarMessage const& msg, Peer::pointer peer, Hash& index)

// send message to anyone you haven't gotten it from
bool
Floodgate::broadcast(StellarMessage const& msg, bool force,
std::optional<Hash> const& hash)
Floodgate::broadcast(StellarMessage const& msg, std::optional<Hash> const& hash)
{
ZoneScoped;
if (mShuttingDown)
Expand All @@ -102,7 +101,7 @@ Floodgate::broadcast(StellarMessage const& msg, bool force,

FloodRecord::pointer fr;
auto result = mFloodMap.find(index);
if (result == mFloodMap.end() || force)
if (result == mFloodMap.end())
{ // no one has sent us this message / start from scratch
fr = std::make_shared<FloodRecord>(
mApp.getHerder().trackingConsensusLedgerIndex(), Peer::pointer());
Expand Down
2 changes: 1 addition & 1 deletion src/overlay/Floodgate.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class Floodgate

// returns true if msg was sent to at least one peer
// The hash required for transactions
bool broadcast(StellarMessage const& msg, bool force,
bool broadcast(StellarMessage const& msg,
std::optional<Hash> const& hash = std::nullopt);

// returns the list of peers that sent us the item with hash `msgID`
Expand Down
2 changes: 1 addition & 1 deletion src/overlay/OverlayManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class OverlayManager
// When passing a transaction message,
// the hash of TransactionEnvelope must be passed also for pull mode.
virtual bool
broadcastMessage(StellarMessage const& msg, bool force = false,
broadcastMessage(StellarMessage const& msg,
std::optional<Hash> const hash = std::nullopt) = 0;

// Make a note in the FloodGate that a given peer has provided us with a
Expand Down
4 changes: 2 additions & 2 deletions src/overlay/OverlayManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1151,11 +1151,11 @@ OverlayManagerImpl::forgetFloodedMsg(Hash const& msgID)
}

bool
OverlayManagerImpl::broadcastMessage(StellarMessage const& msg, bool force,
OverlayManagerImpl::broadcastMessage(StellarMessage const& msg,
std::optional<Hash> const hash)
{
ZoneScoped;
auto res = mFloodGate.broadcast(msg, force, hash);
auto res = mFloodGate.broadcast(msg, hash);
if (res)
{
mOverlayMetrics.mMessagesBroadcast.Mark();
Expand Down
2 changes: 1 addition & 1 deletion src/overlay/OverlayManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class OverlayManagerImpl : public OverlayManager
Hash& msgID) override;
void forgetFloodedMsg(Hash const& msgID) override;
bool
broadcastMessage(StellarMessage const& msg, bool force = false,
broadcastMessage(StellarMessage const& msg,
std::optional<Hash> const hash = std::nullopt) override;
void connectTo(PeerBareAddress const& address) override;

Expand Down
2 changes: 1 addition & 1 deletion src/overlay/SurveyManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ SurveyManager::processTopologyRequest(SurveyRequestMessage const& request) const
void
SurveyManager::broadcast(StellarMessage const& msg) const
{
mApp.getOverlayManager().broadcastMessage(msg, false);
mApp.getOverlayManager().broadcastMessage(msg);
}

void
Expand Down
19 changes: 11 additions & 8 deletions src/overlay/test/FloodTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ TEST_CASE("Flooding", "[flood][overlay][acceptance]")
auto msg = tx1->toStellarMessage();
auto res = inApp->getHerder().recvTransaction(tx1, false);
REQUIRE(res == TransactionQueue::AddResult::ADD_STATUS_PENDING);
inApp->getOverlayManager().broadcastMessage(msg, false,
inApp->getOverlayManager().broadcastMessage(msg,
tx1->getFullHash());
};

Expand All @@ -171,11 +171,14 @@ TEST_CASE("Flooding", "[flood][overlay][acceptance]")

for (auto const& s : sources)
{
okCount += (herder.getTransactionQueue()
.getAccountTransactionQueueInfo(s)
.mMaxSeq == expectedSeq)
? 1
: 0;
auto accState =
herder.getTransactionQueue().getAccountTransactionQueueInfo(
s);
auto seqNum = accState.mTransaction
? accState.mTransaction->mTx->getSeqNum()
: 0;

okCount += !!(seqNum == expectedSeq);
}
bool res = okCount == sources.size();
LOG_DEBUG(DEFAULT_LOG, "{}{}{} / {} authenticated peers: {}",
Expand Down Expand Up @@ -277,7 +280,7 @@ TEST_CASE("Flooding", "[flood][overlay][acceptance]")
auto before =
overlaytestutils::getAdvertisedHashCount(node);
node->getOverlayManager().broadcastMessage(
testTransaction->toStellarMessage(), false,
testTransaction->toStellarMessage(),
testTransaction->getFullHash());
REQUIRE(before ==
overlaytestutils::getAdvertisedHashCount(node));
Expand All @@ -301,7 +304,7 @@ TEST_CASE("Flooding", "[flood][overlay][acceptance]")
auto before =
overlaytestutils::getAdvertisedHashCount(node);
node->getOverlayManager().broadcastMessage(
testTransaction->toStellarMessage(), false,
testTransaction->toStellarMessage(),
testTransaction->getFullHash());

REQUIRE(before + 1 ==
Expand Down
2 changes: 1 addition & 1 deletion src/overlay/test/OverlayManagerTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ class OverlayManagerTests
}
}
auto broadcastTxnMsg = [&](auto msg) {
pm.broadcastMessage(msg, false, xdrSha256(msg.transaction()));
pm.broadcastMessage(msg, xdrSha256(msg.transaction()));
};
broadcastTxnMsg(AtoB);
crank(10);
Expand Down
3 changes: 1 addition & 2 deletions src/simulation/LoadGenerator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2027,8 +2027,7 @@ LoadGenerator::execute(TransactionFramePtr& txf, LoadGenMode mode,
}
else
{
mApp.getOverlayManager().broadcastMessage(msg, false,
txf->getFullHash());
mApp.getOverlayManager().broadcastMessage(msg, txf->getFullHash());
}

return status;
Expand Down
Loading