Skip to content

Commit c34156d

Browse files
committed
Draft: Recover SCP messages when replaying from history
This PR enables updating the `scphistory` table during catchup from history. It allows users to specify which archives to use via the `SCP_HISTORY_ARCHIVES` config option. If a user specifies multiple archives, stellar-core will merge the messages from the archives. This is a draft PR because I'm looking for feedback on the approach, but still have some work to do before it is in a mergeable state. Most of the remaining work is to: * Document the new functionality * Write additional tests for: * Merging * Failed downloads * No `SCP_HISTORY_ARCHIVES` * Multiple `SCP_HISTORY_ARCHIVES` * Changes to `ReplayDebugMetaWork` * Integrate changes from stellar#4121 * Address other `TODO`s in the changes
1 parent aaed4eb commit c34156d

31 files changed

+520
-53
lines changed

Builds/VisualStudio/stellar-core.vcxproj

+2
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,7 @@ exit /b 0
506506
<ClCompile Include="..\..\src\herder\TxSetUtils.cpp" />
507507
<ClCompile Include="..\..\src\herder\Upgrades.cpp" />
508508
<ClCompile Include="..\..\src\historywork\BatchDownloadWork.cpp" />
509+
<ClCompile Include="..\..\src\historywork\BestEffortBatchDownloadWork.cpp" />
509510
<ClCompile Include="..\..\src\historywork\CheckSingleLedgerHeaderWork.cpp" />
510511
<ClCompile Include="..\..\src\historywork\DownloadBucketsWork.cpp" />
511512
<ClCompile Include="..\..\src\historywork\DownloadVerifyTxResultsWork.cpp" />
@@ -938,6 +939,7 @@ exit /b 0
938939
<ClInclude Include="..\..\src\herder\TxSetUtils.h" />
939940
<ClInclude Include="..\..\src\herder\Upgrades.h" />
940941
<ClInclude Include="..\..\src\historywork\BatchDownloadWork.h" />
942+
<ClInclude Include="..\..\src\historywork\BestEffortBatchDownloadWork.h" />
941943
<ClInclude Include="..\..\src\historywork\CheckSingleLedgerHeaderWork.h" />
942944
<ClInclude Include="..\..\src\historywork\DownloadBucketsWork.h" />
943945
<ClInclude Include="..\..\src\historywork\DownloadVerifyTxResultsWork.h" />

Builds/VisualStudio/stellar-core.vcxproj.filters

+6
Original file line numberDiff line numberDiff line change
@@ -762,6 +762,9 @@
762762
<ClCompile Include="..\..\src\historywork\BatchDownloadWork.cpp">
763763
<Filter>historyWork</Filter>
764764
</ClCompile>
765+
<ClCompile Include="..\..\src\historywork\BestEffortBatchDownloadWork.cpp">
766+
<Filter>historyWork</Filter>
767+
</ClCompile>
765768
<ClCompile Include="..\..\src\historywork\CheckSingleLedgerHeaderWork.cpp">
766769
<Filter>historyWork</Filter>
767770
</ClCompile>
@@ -1916,6 +1919,9 @@
19161919
<ClInclude Include="..\..\src\historywork\BatchDownloadWork.h">
19171920
<Filter>historyWork</Filter>
19181921
</ClInclude>
1922+
<ClInclude Include="..\..\src\historywork\BestEffortBatchDownloadWork.h">
1923+
<Filter>historyWork</Filter>
1924+
</ClInclude>
19191925
<ClInclude Include="..\..\src\historywork\CheckSingleLedgerHeaderWork.h">
19201926
<Filter>historyWork</Filter>
19211927
</ClInclude>

src/catchup/ApplyBufferedLedgersWork.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ ApplyBufferedLedgersWork::onRun()
5555
lcd.getTxSet()->sizeTxTotal(), lcd.getTxSet()->sizeOpTotalForLogging(),
5656
stellarValueToString(mApp.getConfig(), lcd.getValue()));
5757

58-
auto applyLedger = std::make_shared<ApplyLedgerWork>(mApp, lcd);
58+
// Pass `nullptr` for `hEntries` because SCP messages of buffered ledgers
59+
// have already been logged.
60+
auto applyLedger = std::make_shared<ApplyLedgerWork>(mApp, lcd, nullptr);
5961

6062
auto predicate = [](Application& app) {
6163
auto& bl = app.getBucketManager().getBucketList();

src/catchup/ApplyCheckpointWork.cpp

+81-7
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,9 @@
2424
namespace stellar
2525
{
2626

27-
ApplyCheckpointWork::ApplyCheckpointWork(Application& app,
28-
TmpDir const& downloadDir,
29-
LedgerRange const& range,
30-
OnFailureCallback cb)
27+
ApplyCheckpointWork::ApplyCheckpointWork(
28+
Application& app, TmpDir const& downloadDir, LedgerRange const& range,
29+
OnFailureCallback cb, std::shared_ptr<TmpDirVec const>& scpDownloadDirs)
3130
: BasicWork(app,
3231
"apply-ledgers-" + fmt::format(FMT_STRING("{}-{}"),
3332
range.mFirst, range.limit()),
@@ -36,6 +35,7 @@ ApplyCheckpointWork::ApplyCheckpointWork(Application& app,
3635
, mLedgerRange(range)
3736
, mCheckpoint(
3837
app.getHistoryManager().checkpointContainingLedger(range.mFirst))
38+
, mSCPDownloadDirs(scpDownloadDirs)
3939
, mOnFailure(cb)
4040
{
4141
// Ledger range check to enforce application of a single checkpoint
@@ -69,6 +69,11 @@ ApplyCheckpointWork::closeFiles()
6969
{
7070
mHdrIn.close();
7171
mTxIn.close();
72+
for (auto& scpInfo : mSCPCheckpointInfo)
73+
{
74+
scpInfo.scpHistoryIn.close();
75+
}
76+
mSCPCheckpointInfo.clear();
7277
mFilesOpen = false;
7378
}
7479

@@ -83,8 +88,7 @@ void
8388
ApplyCheckpointWork::openInputFiles()
8489
{
8590
ZoneScoped;
86-
mHdrIn.close();
87-
mTxIn.close();
91+
closeFiles();
8892
FileTransferInfo hi(mDownloadDir, HISTORY_FILE_TYPE_LEDGER, mCheckpoint);
8993
FileTransferInfo ti(mDownloadDir, HISTORY_FILE_TYPE_TRANSACTIONS,
9094
mCheckpoint);
@@ -95,6 +99,32 @@ ApplyCheckpointWork::openInputFiles()
9599
mTxIn.open(ti.localPath_nogz());
96100
mTxHistoryEntry = TransactionHistoryEntry();
97101
mHeaderHistoryEntry = LedgerHeaderHistoryEntry();
102+
103+
if (mSCPDownloadDirs)
104+
{
105+
// Initialize `SCPCheckpointInfo`s for each SCP history download
106+
// directory.
107+
for (auto const& scpDir : *mSCPDownloadDirs)
108+
{
109+
FileTransferInfo si(*scpDir, HISTORY_FILE_TYPE_SCP, mCheckpoint);
110+
CLOG_DEBUG(History, "Saving SCP messages from {}",
111+
si.localPath_nogz());
112+
mSCPCheckpointInfo.emplace_back();
113+
SCPCheckpointInfo& scpInfo = mSCPCheckpointInfo.back();
114+
scpInfo.scpHistoryEntry = std::make_shared<SCPHistoryEntry>();
115+
try
116+
{
117+
scpInfo.scpHistoryIn.open(si.localPath_nogz());
118+
}
119+
catch (FileSystemException const&)
120+
{
121+
// File doesn't exist for this checkpoint. That's ok. Skip it.
122+
mSCPCheckpointInfo.pop_back();
123+
continue;
124+
}
125+
}
126+
}
127+
98128
mFilesOpen = true;
99129
}
100130

@@ -251,6 +281,49 @@ ApplyCheckpointWork::getNextLedgerCloseData()
251281
std::make_optional<Hash>(mHeaderHistoryEntry.hash));
252282
}
253283

284+
std::unique_ptr<SCPHistoryEntryVec>
285+
ApplyCheckpointWork::getNextSCPHistoryEntries()
286+
{
287+
ZoneScoped;
288+
auto ret = std::make_unique<SCPHistoryEntryVec>();
289+
uint32_t ledgerSeq = mApp.getLedgerManager().getLastClosedLedgerNum();
290+
for (SCPCheckpointInfo& info : mSCPCheckpointInfo)
291+
{
292+
XDRInputFileStream& in = info.scpHistoryIn;
293+
std::shared_ptr<SCPHistoryEntry>& entry = info.scpHistoryEntry;
294+
do
295+
{
296+
uint32 scpHistSeq = entry->v0().ledgerMessages.ledgerSeq;
297+
298+
if (scpHistSeq <= ledgerSeq)
299+
{
300+
// Catching up to `ledgerSeq + 1`
301+
CLOG_DEBUG(History, "Skipping SCP messages for ledger {}",
302+
scpHistSeq);
303+
}
304+
else if (scpHistSeq == ledgerSeq + 1)
305+
{
306+
// Caught up
307+
CLOG_DEBUG(History, "Loaded SCP messages for ledger {}",
308+
ledgerSeq);
309+
ret->push_back(entry);
310+
break;
311+
}
312+
else
313+
{
314+
// Ahead. This archive does not have messages for `ledgerSeq+1`
315+
// TODO: Log which archive is missing the messages (also applies
316+
// to other logging statements in this function)
317+
CLOG_WARNING(History,
318+
"Archive missing SCP messages for ledger {}",
319+
scpHistSeq);
320+
break;
321+
}
322+
} while (in && in.readOne(*entry));
323+
}
324+
return ret;
325+
}
326+
254327
BasicWork::State
255328
ApplyCheckpointWork::onRun()
256329
{
@@ -308,7 +381,8 @@ ApplyCheckpointWork::onRun()
308381
return State::WORK_RUNNING;
309382
}
310383

311-
auto applyLedger = std::make_shared<ApplyLedgerWork>(mApp, *lcd);
384+
auto applyLedger = std::make_shared<ApplyLedgerWork>(
385+
mApp, *lcd, getNextSCPHistoryEntries());
312386

313387
auto predicate = [](Application& app) {
314388
auto& bl = app.getBucketManager().getBucketList();

src/catchup/ApplyCheckpointWork.h

+26-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
#pragma once
66

7+
#include "catchup/ApplyLedgerWork.h"
78
#include "herder/LedgerCloseData.h"
89
#include "herder/TxSetFrame.h"
910
#include "history/HistoryArchive.h"
@@ -20,6 +21,18 @@ namespace stellar
2021
class TmpDir;
2122
struct LedgerHeaderHistoryEntry;
2223

24+
using TmpDirVec = std::vector<std::shared_ptr<TmpDir const>>;
25+
26+
// This struct stores information about SCP messages in a single history
27+
// checkpoint
28+
struct SCPCheckpointInfo
29+
{
30+
// Input stream holding `SCPHistoryEntry`s
31+
XDRInputFileStream scpHistoryIn;
32+
// Most recent SCP history entry read from `scpHistoryIn`
33+
std::shared_ptr<SCPHistoryEntry> scpHistoryEntry;
34+
};
35+
2336
/**
2437
* This class is responsible for applying transactions stored in files on
2538
* temporary directory (downloadDir) to local ledger. It requires two sets of
@@ -45,11 +58,17 @@ class ApplyCheckpointWork : public BasicWork
4558
TmpDir const& mDownloadDir;
4659
LedgerRange const mLedgerRange;
4760
uint32_t const mCheckpoint;
61+
// The directories containing downloaded SCP history. May be null if no such
62+
// directories exist.
63+
std::shared_ptr<TmpDirVec const> mSCPDownloadDirs;
4864

4965
XDRInputFileStream mHdrIn;
5066
XDRInputFileStream mTxIn;
5167
TransactionHistoryEntry mTxHistoryEntry;
5268
LedgerHeaderHistoryEntry mHeaderHistoryEntry;
69+
// Vector containing each archive's SCP messages for the current checkpoint
70+
// being processed
71+
std::vector<SCPCheckpointInfo> mSCPCheckpointInfo;
5372
OnFailureCallback mOnFailure;
5473

5574
bool mFilesOpen{false};
@@ -61,11 +80,17 @@ class ApplyCheckpointWork : public BasicWork
6180

6281
std::shared_ptr<LedgerCloseData> getNextLedgerCloseData();
6382

83+
// Returns a vector holding SCP messages from each archive for the ledger
84+
// being processed. This vector may be smaller than the total number of
85+
// archives if some archives did not contain messages for the ledger.
86+
std::unique_ptr<SCPHistoryEntryVec> getNextSCPHistoryEntries();
87+
6488
void closeFiles();
6589

6690
public:
6791
ApplyCheckpointWork(Application& app, TmpDir const& downloadDir,
68-
LedgerRange const& range, OnFailureCallback cb);
92+
LedgerRange const& range, OnFailureCallback cb,
93+
std::shared_ptr<TmpDirVec const>& scpDownloadDirs);
6994
~ApplyCheckpointWork() = default;
7095
std::string getStatus() const override;
7196
void onFailureRaise() override;

src/catchup/ApplyLedgerWork.cpp

+11-2
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,23 @@
33
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0
44

55
#include "catchup/ApplyLedgerWork.h"
6+
#include "herder/HerderPersistence.h"
67
#include "ledger/LedgerManager.h"
78
#include "main/Application.h"
89
#include <Tracy.hpp>
910
#include <fmt/format.h>
1011

1112
namespace stellar
1213
{
13-
ApplyLedgerWork::ApplyLedgerWork(Application& app,
14-
LedgerCloseData const& ledgerCloseData)
14+
ApplyLedgerWork::ApplyLedgerWork(
15+
Application& app, LedgerCloseData const& ledgerCloseData,
16+
std::unique_ptr<SCPHistoryEntryVec const> hEntries)
1517
: BasicWork(
1618
app, "apply-ledger-" + std::to_string(ledgerCloseData.getLedgerSeq()),
1719
BasicWork::RETRY_NEVER)
20+
, mApp(app)
1821
, mLedgerCloseData(ledgerCloseData)
22+
, mHEntries(std::move(hEntries))
1923
{
2024
}
2125

@@ -24,6 +28,11 @@ ApplyLedgerWork::onRun()
2428
{
2529
ZoneScoped;
2630
mApp.getLedgerManager().closeLedger(mLedgerCloseData);
31+
if (mHEntries)
32+
{
33+
mApp.getHerderPersistence().copySCPHistoryFromEntries(
34+
*mHEntries, mLedgerCloseData.getLedgerSeq());
35+
}
2736
return BasicWork::State::WORK_SUCCESS;
2837
}
2938

src/catchup/ApplyLedgerWork.h

+8-1
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,23 @@
66

77
#include "herder/LedgerCloseData.h"
88
#include "work/Work.h"
9+
#include <memory>
910

1011
namespace stellar
1112
{
1213

14+
using SCPHistoryEntryVec = std::vector<std::shared_ptr<SCPHistoryEntry>>;
15+
1316
class ApplyLedgerWork : public BasicWork
1417
{
18+
Application& mApp;
1519
LedgerCloseData const mLedgerCloseData;
20+
// SCP messages for the ledger to be applied
21+
std::unique_ptr<SCPHistoryEntryVec const> mHEntries;
1622

1723
public:
18-
ApplyLedgerWork(Application& app, LedgerCloseData const& ledgerCloseData);
24+
ApplyLedgerWork(Application& app, LedgerCloseData const& ledgerCloseData,
25+
std::unique_ptr<SCPHistoryEntryVec const> hEntries);
1926

2027
std::string getStatus() const override;
2128

src/catchup/CatchupWork.cpp

+30-4
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@
1414
#include "catchup/VerifyLedgerChainWork.h"
1515
#include "herder/Herder.h"
1616
#include "history/FileTransferInfo.h"
17+
#include "history/HistoryArchiveManager.h"
1718
#include "history/HistoryManager.h"
1819
#include "historywork/BatchDownloadWork.h"
20+
#include "historywork/BestEffortBatchDownloadWork.h"
1921
#include "historywork/DownloadBucketsWork.h"
2022
#include "historywork/DownloadVerifyTxResultsWork.h"
2123
#include "historywork/GetAndUnzipRemoteFileWork.h"
@@ -81,7 +83,7 @@ CatchupWork::CatchupWork(Application& app,
8183
std::shared_ptr<HistoryArchive> archive)
8284
: Work(app, "catchup", BasicWork::RETRY_NEVER)
8385
, mLocalState{app.getLedgerManager().getLastClosedLedgerHAS()}
84-
, mDownloadDir{std::make_unique<TmpDir>(
86+
, mDownloadDir{std::make_shared<TmpDir>(
8587
mApp.getTmpDirManager().tmpDir(getName()))}
8688
, mCatchupConfiguration{catchupConfiguration}
8789
, mArchive{archive}
@@ -150,6 +152,7 @@ CatchupWork::doReset()
150152
mHAS.reset();
151153
mBucketHAS.reset();
152154
mRetainedBuckets.clear();
155+
mSCPDownloadDirs->clear();
153156
}
154157

155158
void
@@ -166,7 +169,7 @@ CatchupWork::downloadVerifyLedgerChain(CatchupRange const& catchupRange,
166169
// Batch download has default retries ("a few") to ensure we rotate through
167170
// archives
168171
auto getLedgers = std::make_shared<BatchDownloadWork>(
169-
mApp, checkpointRange, HISTORY_FILE_TYPE_LEDGER, *mDownloadDir,
172+
mApp, checkpointRange, HISTORY_FILE_TYPE_LEDGER, mDownloadDir,
170173
mArchive);
171174
mRangeEndPromise = std::promise<LedgerNumHashPair>();
172175
mRangeEndFuture = mRangeEndPromise.get_future().share();
@@ -179,9 +182,31 @@ CatchupWork::downloadVerifyLedgerChain(CatchupRange const& catchupRange,
179182
mApp, *mDownloadDir, verifyRange, mLastClosedLedgerHashPair,
180183
mRangeEndFuture, std::move(fatalFailurePromise));
181184

185+
std::vector<std::shared_ptr<BasicWork>> seq{getLedgers, mVerifyLedgers};
186+
187+
Config const& cfg = mApp.getConfig();
188+
// TODO: Test with this flag set to `false`. An empty `mSCPDownloadDirs`
189+
// should prevent any further stages from running.
190+
if (cfg.MODE_STORES_HISTORY_MISC)
191+
{
192+
for (std::string const& scpHistoryArchive : cfg.SCP_HISTORY_ARCHIVES)
193+
{
194+
CLOG_DEBUG(History, "Downloading SCP history from {}",
195+
scpHistoryArchive);
196+
std::shared_ptr<HistoryArchive> scpArchive =
197+
mApp.getHistoryArchiveManager().getHistoryArchive(
198+
scpHistoryArchive);
199+
200+
auto scpDownloadDir = std::make_shared<TmpDir>(
201+
mApp.getTmpDirManager().tmpDir("scp-history-" + scpHistoryArchive));
202+
mSCPDownloadDirs->push_back(scpDownloadDir);
203+
seq.emplace_back(std::make_shared<BestEffortBatchDownloadWork>(
204+
mApp, checkpointRange, HISTORY_FILE_TYPE_SCP, scpDownloadDir,
205+
scpArchive));
206+
}
207+
}
182208
// Never retry the sequence: downloads already have retries, and there's no
183209
// point retrying verification
184-
std::vector<std::shared_ptr<BasicWork>> seq{getLedgers, mVerifyLedgers};
185210
mDownloadVerifyLedgersSeq = addWork<WorkSequence>(
186211
"download-verify-ledgers-seq", seq, BasicWork::RETRY_NEVER);
187212
mCurrentWork = mDownloadVerifyLedgersSeq;
@@ -305,7 +330,8 @@ CatchupWork::downloadApplyTransactions(CatchupRange const& catchupRange)
305330
auto waitForPublish = mCatchupConfiguration.offline();
306331
auto range = catchupRange.getReplayRange();
307332
mTransactionsVerifyApplySeq = std::make_shared<DownloadApplyTxsWork>(
308-
mApp, *mDownloadDir, range, mLastApplied, waitForPublish, mArchive);
333+
mApp, *mDownloadDir, range, mLastApplied, waitForPublish,
334+
mSCPDownloadDirs, mArchive);
309335
}
310336

311337
BasicWork::State

0 commit comments

Comments
 (0)