Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Continue to capture SCP messages for previous ledger in database #4121

Merged
merged 1 commit into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/herder/HerderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,18 @@ HerderImpl::processExternalized(uint64 slotIndex, StellarValue const& value,
// save the SCP messages in the database
if (mApp.getConfig().MODE_STORES_HISTORY_MISC)
{
ZoneNamedN(updateSCPHistoryZone, "update SCP history", true);
if (slotIndex != 0)
{
// Save any new SCP messages received about the previous ledger.
// NOTE: This call uses an empty `QuorumTracker::QuorumMap` because
// there is no new quorum map for the previous ledger.
mApp.getHerderPersistence().saveSCPHistory(
static_cast<uint32>(slotIndex - 1),
getSCP().getExternalizingState(slotIndex - 1),
QuorumTracker::QuorumMap());
}
// Store SCP messages received about the current ledger being closed.
mApp.getHerderPersistence().saveSCPHistory(
static_cast<uint32>(slotIndex),
getSCP().getExternalizingState(slotIndex),
Expand Down
22 changes: 17 additions & 5 deletions src/herder/HerderPersistenceImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ HerderPersistenceImpl::saveSCPHistory(uint32_t seq,
st.execute(true);
}
}

// Prepare multi-row insert into scphistory
std::vector<std::string> nodeIDs;
std::vector<uint32_t> seqs;
std::vector<std::string> envelopes;
for (auto const& e : envs)
{
auto const& qHash =
Expand All @@ -76,21 +81,28 @@ HerderPersistenceImpl::saveSCPHistory(uint32_t seq,
std::string envelopeEncoded;
envelopeEncoded = decoder::encode_b64(envelopeBytes);

nodeIDs.push_back(nodeIDStrKey);
seqs.push_back(seq);
envelopes.push_back(envelopeEncoded);
}

if (!envs.empty())
{
// Perform multi-row insert into scphistory
auto prepEnv =
Copy link
Contributor

@SirTyson SirTyson Jan 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to change this query to handle collisions or to check for the existence of an entry before inserting it? I think we currently wind up with two copies of every message, since we insert both the currentLedger and currentLedger - 1 messages into the table. INSERT INTO will create a 2nd copy if we insert the same thing twice. I think this is different than the diff between BALLOT and EXTERNALIZE conversation above since this would store identical copies of the same message, but I'm not super well versed on SCP so I could be missing something here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This handles the collision case by deleting all entries from currentLedger before performing the INSERT. This also resolves the CONFIRM vs EXTERNALIZE issue.

db.getPreparedStatement("INSERT INTO scphistory "
"(nodeid, ledgerseq, envelope) VALUES "
"(:n, :l, :e)");

auto& st = prepEnv.statement();
st.exchange(soci::use(nodeIDStrKey));
st.exchange(soci::use(seq));
st.exchange(soci::use(envelopeEncoded));
st.exchange(soci::use(nodeIDs, "n"));
st.exchange(soci::use(seqs, "l"));
st.exchange(soci::use(envelopes, "e"));
st.define_and_bind();
{
ZoneNamedN(insertSCPHistoryZone, "insert scphistory", true);
st.execute(true);
}
if (st.get_affected_rows() != 1)
if (st.get_affected_rows() != envs.size())
{
throw std::runtime_error("Could not update data in SQL");
}
Expand Down
152 changes: 152 additions & 0 deletions src/herder/test/HerderTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5803,3 +5803,155 @@ TEST_CASE("exclude transactions by operation type", "[herder]")
TransactionQueue::AddResult::ADD_STATUS_PENDING);
}
}

// Test that Herder updates the scphistory table with additional messages from
// ledger `n-1` when closing ledger `n`
TEST_CASE("SCP message capture from previous ledger", "[herder]")
{
constexpr uint32_t version =
static_cast<uint32_t>(SOROBAN_PROTOCOL_VERSION);

// Initialize simulation
auto networkID = sha256(getTestConfig().NETWORK_PASSPHRASE);
auto simulation = std::make_shared<Simulation>(
Simulation::OVER_LOOPBACK, networkID, [](int i) {
auto cfg = getTestConfig(i, Config::TESTDB_ON_DISK_SQLITE);
cfg.TESTING_UPGRADE_LEDGER_PROTOCOL_VERSION = version;
return cfg;
});

// Create three validators: A, B, and C
auto validatorAKey = SecretKey::fromSeed(sha256("validator-A"));
auto validatorBKey = SecretKey::fromSeed(sha256("validator-B"));
auto validatorCKey = SecretKey::fromSeed(sha256("validator-C"));

// Put all validators in a quorum set of threshold 2
SCPQuorumSet qset;
qset.threshold = 2;
qset.validators.push_back(validatorAKey.getPublicKey());
qset.validators.push_back(validatorBKey.getPublicKey());
qset.validators.push_back(validatorCKey.getPublicKey());

// Connect validators A and B, but leave C disconnected
auto A = simulation->addNode(validatorAKey, qset);
auto B = simulation->addNode(validatorBKey, qset);
auto C = simulation->addNode(validatorCKey, qset);
simulation->addPendingConnection(validatorAKey.getPublicKey(),
validatorBKey.getPublicKey());
simulation->startAllNodes();

// Crank A and B until they're on ledger 2
simulation->crankUntil(
[&]() {
return A->getLedgerManager().getLastClosedLedgerNum() == 2 &&
B->getLedgerManager().getLastClosedLedgerNum() == 2;
},
4 * Herder::EXP_LEDGER_TIMESPAN_SECONDS, false);

// Check that a node's scphistory table for a given ledger has the correct
// number of entries of each type in `expectedTypes`
auto checkSCPHistoryEntries =
[&](Application::pointer node, uint32_t ledgerNum,
UnorderedMap<SCPStatementType, size_t> const& expectedTypes) {
// Prepare query
auto& db = node->getDatabase();
auto prep = db.getPreparedStatement(
"SELECT envelope FROM scphistory WHERE ledgerseq = :l");
auto& st = prep.statement();
st.exchange(soci::use(ledgerNum));
std::string envStr;
st.exchange(soci::into(envStr));
st.define_and_bind();
st.execute(false);

// Count the number of entries of each type
UnorderedMap<SCPStatementType, size_t> actualTypes;
while (st.fetch())
{
Value v;
decoder::decode_b64(envStr, v);
SCPEnvelope env;
xdr::xdr_from_opaque(v, env);
++actualTypes[env.statement.pledges.type()];
}

return actualTypes == expectedTypes;
};

// Expected counts of scphistory entry types for ledger 2
UnorderedMap<SCPStatementType, size_t> expConfExt = {
{SCPStatementType::SCP_ST_CONFIRM, 1},
{SCPStatementType::SCP_ST_EXTERNALIZE, 1}};
UnorderedMap<SCPStatementType, size_t> exp2Ext = {
{SCPStatementType::SCP_ST_EXTERNALIZE, 2}};

// Examine scphistory tables for A and B for ledger 2. Either A has 1
// CONFIRM and 1 EXTERNALIZE and B has 2 EXTERNALIZEs, or A has 2
// EXTERNALIZEs and B has 1 CONFIRM and 1 EXTERNALIZE.
REQUIRE((checkSCPHistoryEntries(A, 2, expConfExt) &&
checkSCPHistoryEntries(B, 2, exp2Ext)) ^
(checkSCPHistoryEntries(A, 2, exp2Ext) &&
checkSCPHistoryEntries(B, 2, expConfExt)));

// C has no entries in its scphistory table for ledger 2.
REQUIRE(checkSCPHistoryEntries(C, 2, {}));

// Get messages from A and B
HerderImpl& herderA = dynamic_cast<HerderImpl&>(A->getHerder());
HerderImpl& herderB = dynamic_cast<HerderImpl&>(B->getHerder());
std::vector<SCPEnvelope> AEnvs = herderA.getSCP().getLatestMessagesSend(2);
std::vector<SCPEnvelope> BEnvs = herderB.getSCP().getLatestMessagesSend(2);

// Pass A and B's messages to C
for (auto const& env : AEnvs)
{
C->getHerder().recvSCPEnvelope(env);
}
for (auto const& env : BEnvs)
{
C->getHerder().recvSCPEnvelope(env);
}

// Crank C until it is on ledger 2
simulation->crankUntil(
[&]() { return C->getLedgerManager().getLastClosedLedgerNum() == 2; },
4 * Herder::EXP_LEDGER_TIMESPAN_SECONDS, false);

// Get messages from C
HerderImpl& herderC = dynamic_cast<HerderImpl&>(C->getHerder());
std::vector<SCPEnvelope> CEnvs = herderC.getSCP().getLatestMessagesSend(2);

// Pass C's messages to A and B
for (auto const& env : CEnvs)
{
A->getHerder().recvSCPEnvelope(env);
B->getHerder().recvSCPEnvelope(env);
}

// Crank A and B until they're on ledger 3
simulation->crankUntil(
[&]() {
return A->getLedgerManager().getLastClosedLedgerNum() == 3 &&
B->getLedgerManager().getLastClosedLedgerNum() == 3;
},
4 * Herder::EXP_LEDGER_TIMESPAN_SECONDS, false);

// A and B should now each have 3 EXTERNALIZEs in their scphistory table for
// ledger 2. A's CONFIRM entry has been replaced with an EXTERNALIZE.
UnorderedMap<SCPStatementType, size_t> const expectedTypes = {
{SCPStatementType::SCP_ST_EXTERNALIZE, 3}};
REQUIRE(checkSCPHistoryEntries(A, 2, expectedTypes));
REQUIRE(checkSCPHistoryEntries(B, 2, expectedTypes));

// Connect C to B and crank C to catch up with A and B
simulation->addConnection(validatorCKey.getPublicKey(),
validatorBKey.getPublicKey());
simulation->crankUntil(
[&]() { return C->getLedgerManager().getLastClosedLedgerNum() == 3; },
4 * Herder::EXP_LEDGER_TIMESPAN_SECONDS, false);

// C should have 3 EXTERNALIZEs in its scphistory table for ledger 2. This
// check ensures that C does not double count messages from ledger 2 when
// closing ledger 3.
REQUIRE(checkSCPHistoryEntries(C, 2, expectedTypes));
}
Loading