Skip to content

Commit 0865066

Browse files
committed
Added state archival getledgerentry http endpoint
1 parent c6d900c commit 0865066

5 files changed

+525
-13
lines changed

src/bucket/BucketSnapshotManager.cpp

+34-6
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,17 @@ BucketSnapshotManager::recordBulkLoadMetrics(std::string const& label,
115115
return iter->second;
116116
}
117117

118+
namespace
119+
{
120+
template <typename T, typename U>
121+
bool
122+
needsUpdate(std::shared_ptr<T const> const& snapshot,
123+
SnapshotPtrT<U> const& curr)
124+
{
125+
return !snapshot || snapshot->getLedgerSeq() < curr->getLedgerSeq();
126+
}
127+
}
128+
118129
void
119130
BucketSnapshotManager::maybeCopySearchableBucketListSnapshot(
120131
SearchableSnapshotConstPtr& snapshot)
@@ -123,9 +134,7 @@ BucketSnapshotManager::maybeCopySearchableBucketListSnapshot(
123134
// modified. Rather, a thread is checking it's copy against the canonical
124135
// snapshot, so use a shared lock.
125136
std::shared_lock<std::shared_mutex> lock(mSnapshotMutex);
126-
127-
if (!snapshot ||
128-
snapshot->getLedgerSeq() < mCurrLiveSnapshot->getLedgerSeq())
137+
if (needsUpdate(snapshot, mCurrLiveSnapshot))
129138
{
130139
snapshot = copySearchableLiveBucketListSnapshot();
131140
}
@@ -139,14 +148,33 @@ BucketSnapshotManager::maybeCopySearchableHotArchiveBucketListSnapshot(
139148
// modified. Rather, a thread is checking it's copy against the canonical
140149
// snapshot, so use a shared lock.
141150
std::shared_lock<std::shared_mutex> lock(mSnapshotMutex);
142-
143-
if (!snapshot ||
144-
snapshot->getLedgerSeq() < mCurrHotArchiveSnapshot->getLedgerSeq())
151+
if (needsUpdate(snapshot, mCurrHotArchiveSnapshot))
145152
{
146153
snapshot = copySearchableHotArchiveBucketListSnapshot();
147154
}
148155
}
149156

157+
void
158+
BucketSnapshotManager::maybeCopyLiveAndHotArchiveSnapshots(
159+
SearchableSnapshotConstPtr& liveSnapshot,
160+
SearchableHotArchiveSnapshotConstPtr& hotArchiveSnapshot)
161+
{
162+
// The canonical snapshot held by the BucketSnapshotManager is not being
163+
// modified. Rather, a thread is checking it's copy against the canonical
164+
// snapshot, so use a shared lock. For consistency we hold the lock while
165+
// updating both snapshots.
166+
std::shared_lock<std::shared_mutex> lock(mSnapshotMutex);
167+
if (needsUpdate(liveSnapshot, mCurrLiveSnapshot))
168+
{
169+
liveSnapshot = copySearchableLiveBucketListSnapshot();
170+
}
171+
172+
if (needsUpdate(hotArchiveSnapshot, mCurrHotArchiveSnapshot))
173+
{
174+
hotArchiveSnapshot = copySearchableHotArchiveBucketListSnapshot();
175+
}
176+
}
177+
150178
void
151179
BucketSnapshotManager::updateCurrentSnapshot(
152180
SnapshotPtrT<LiveBucket>&& liveSnapshot,

src/bucket/BucketSnapshotManager.h

+7
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,13 @@ class BucketSnapshotManager : NonMovableOrCopyable
9999
void maybeCopySearchableHotArchiveBucketListSnapshot(
100100
SearchableHotArchiveSnapshotConstPtr& snapshot);
101101

102+
// This function is the same as snapshot refreshers above, but guarantees
103+
// that both snapshots are consistent with the same lcl. This is required
104+
// when querying both snapshot types as part of the same query.
105+
void maybeCopyLiveAndHotArchiveSnapshots(
106+
SearchableSnapshotConstPtr& liveSnapshot,
107+
SearchableHotArchiveSnapshotConstPtr& hotArchiveSnapshot);
108+
102109
// All metric recording functions must only be called by the main thread
103110
void startPointLoadTimer() const;
104111
void endPointLoadTimer(LedgerEntryType t, bool bloomMiss) const;

src/main/QueryServer.cpp

+220-5
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
#include "bucket/BucketSnapshotManager.h"
77
#include "bucket/SearchableBucketList.h"
88
#include "ledger/LedgerTxnImpl.h"
9+
#include "ledger/LedgerTypeUtils.h"
910
#include "util/Logging.h"
1011
#include "util/XDRStream.h" // IWYU pragma: keep
12+
#include "util/types.h"
1113
#include <exception>
1214
#include <json/json.h>
1315

@@ -54,7 +56,12 @@ namespace stellar
5456
{
5557
QueryServer::QueryServer(const std::string& address, unsigned short port,
5658
int maxClient, size_t threadPoolSize,
57-
BucketSnapshotManager& bucketSnapshotManager)
59+
BucketSnapshotManager& bucketSnapshotManager
60+
#ifdef BUILD_TESTS
61+
,
62+
bool useMainThreadForTesting
63+
#endif
64+
)
5865
: mServer(address, port, maxClient, threadPoolSize)
5966
, mBucketSnapshotManager(bucketSnapshotManager)
6067
{
@@ -63,12 +70,28 @@ QueryServer::QueryServer(const std::string& address, unsigned short port,
6370

6471
mServer.add404(std::bind(&QueryServer::notFound, this, _1, _2, _3));
6572
addRoute("getledgerentryraw", &QueryServer::getLedgerEntryRaw);
73+
addRoute("getledgerentry", &QueryServer::getLedgerEntry);
6674

67-
auto workerPids = mServer.start();
68-
for (auto pid : workerPids)
75+
#ifdef BUILD_TESTS
76+
if (useMainThreadForTesting)
6977
{
70-
mBucketListSnapshots[pid] = std::move(
71-
bucketSnapshotManager.copySearchableLiveBucketListSnapshot());
78+
mBucketListSnapshots[std::this_thread::get_id()] =
79+
bucketSnapshotManager.copySearchableLiveBucketListSnapshot();
80+
mHotArchiveBucketListSnapshots[std::this_thread::get_id()] =
81+
bucketSnapshotManager.copySearchableHotArchiveBucketListSnapshot();
82+
}
83+
else
84+
#endif
85+
{
86+
auto workerPids = mServer.start();
87+
for (auto pid : workerPids)
88+
{
89+
mBucketListSnapshots[pid] =
90+
bucketSnapshotManager.copySearchableLiveBucketListSnapshot();
91+
mHotArchiveBucketListSnapshots[pid] =
92+
bucketSnapshotManager
93+
.copySearchableHotArchiveBucketListSnapshot();
94+
}
7295
}
7396
}
7497

@@ -190,4 +213,196 @@ QueryServer::getLedgerEntryRaw(std::string const& params,
190213
retStr = Json::FastWriter().write(root);
191214
return true;
192215
}
216+
217+
// This query needs to load all the given ledger entries and their "state"
218+
// (live, archived, evicted, new). This requires a loading entry and TTL from
219+
// the live BucketList and then checking the Hot Archive for any keys we didn't
220+
// find. We do three passes:
221+
// 1. Load all keys from the live BucketList
222+
// 2. For any Soroban keys not in the live BucketList, load them from the Hot
223+
// Archive
224+
// 3. Load TTL keys for any live Soroban entries found in 1.
225+
bool
226+
QueryServer::getLedgerEntry(std::string const& params, std::string const& body,
227+
std::string& retStr)
228+
{
229+
ZoneScoped;
230+
Json::Value root;
231+
232+
std::map<std::string, std::vector<std::string>> paramMap;
233+
httpThreaded::server::server::parsePostParams(body, paramMap);
234+
235+
auto keys = paramMap["key"];
236+
auto snapshotLedger = parseOptionalParam<uint32_t>(paramMap, "ledgerSeq");
237+
238+
if (keys.empty())
239+
{
240+
throw std::invalid_argument(
241+
"Must specify ledger key in POST body: key=<LedgerKey in base64 "
242+
"XDR format>");
243+
}
244+
245+
// Get snapshots for both live and hot archive bucket lists
246+
auto& liveBl = mBucketListSnapshots.at(std::this_thread::get_id());
247+
auto& hotArchiveBl =
248+
mHotArchiveBucketListSnapshots.at(std::this_thread::get_id());
249+
250+
// orderedNotFoundKeys is a set of keys we have not yet found (not in live
251+
// BucketList or in an archived state in the Hot Archive)
252+
LedgerKeySet orderedNotFoundKeys;
253+
for (auto const& key : keys)
254+
{
255+
LedgerKey k;
256+
fromOpaqueBase64(k, key);
257+
258+
// Check for TTL keys which are not allowed
259+
if (k.type() == TTL)
260+
{
261+
retStr = "TTL keys are not allowed";
262+
return false;
263+
}
264+
265+
orderedNotFoundKeys.emplace(k);
266+
}
267+
268+
mBucketSnapshotManager.maybeCopyLiveAndHotArchiveSnapshots(liveBl,
269+
hotArchiveBl);
270+
271+
std::vector<LedgerEntry> liveEntries;
272+
std::vector<HotArchiveBucketEntry> archivedEntries;
273+
uint32_t ledgerSeq =
274+
snapshotLedger ? *snapshotLedger : liveBl->getLedgerSeq();
275+
root["ledgerSeq"] = ledgerSeq;
276+
277+
auto liveEntriesOp =
278+
liveBl->loadKeysFromLedger(orderedNotFoundKeys, ledgerSeq);
279+
280+
// Return 404 if ledgerSeq not found
281+
if (!liveEntriesOp)
282+
{
283+
retStr = "LedgerSeq not found";
284+
return false;
285+
}
286+
287+
liveEntries = std::move(*liveEntriesOp);
288+
289+
// Remove keys found in live bucketList
290+
for (auto const& le : liveEntries)
291+
{
292+
orderedNotFoundKeys.erase(LedgerEntryKey(le));
293+
}
294+
295+
LedgerKeySet hotArchiveKeysToSearch;
296+
for (auto const& lk : orderedNotFoundKeys)
297+
{
298+
if (isSorobanEntry(lk))
299+
{
300+
hotArchiveKeysToSearch.emplace(lk);
301+
}
302+
}
303+
304+
// Only query archive for remaining keys
305+
if (!hotArchiveKeysToSearch.empty())
306+
{
307+
auto archivedEntriesOp =
308+
hotArchiveBl->loadKeysFromLedger(hotArchiveKeysToSearch, ledgerSeq);
309+
if (!archivedEntriesOp)
310+
{
311+
retStr = "LedgerSeq not found";
312+
return false;
313+
}
314+
archivedEntries = std::move(*archivedEntriesOp);
315+
}
316+
317+
// Collect TTL keys for Soroban entries in the live BucketList
318+
LedgerKeySet ttlKeys;
319+
for (auto const& le : liveEntries)
320+
{
321+
if (isSorobanEntry(le.data))
322+
{
323+
ttlKeys.emplace(getTTLKey(le));
324+
}
325+
}
326+
327+
std::vector<LedgerEntry> ttlEntries;
328+
if (!ttlKeys.empty())
329+
{
330+
// We haven't updated the live snapshot so we will never not have the
331+
// requested ledgerSeq and return nullopt.
332+
ttlEntries =
333+
std::move(liveBl->loadKeysFromLedger(ttlKeys, ledgerSeq).value());
334+
}
335+
336+
std::unordered_map<LedgerKey, LedgerEntry> ttlMap;
337+
for (auto const& ttlEntry : ttlEntries)
338+
{
339+
ttlMap.emplace(LedgerEntryKey(ttlEntry), ttlEntry);
340+
}
341+
342+
// Process live entries
343+
for (auto const& le : liveEntries)
344+
{
345+
Json::Value entry;
346+
entry["e"] = toOpaqueBase64(le);
347+
348+
// Check TTL for Soroban entries
349+
if (isSorobanEntry(le.data))
350+
{
351+
auto ttlIter = ttlMap.find(getTTLKey(le));
352+
releaseAssertOrThrow(ttlIter != ttlMap.end());
353+
if (isLive(ttlIter->second, ledgerSeq))
354+
{
355+
entry["state"] = "live";
356+
entry["ttl"] = ttlIter->second.data.ttl().liveUntilLedgerSeq;
357+
}
358+
else
359+
{
360+
entry["state"] = "archived";
361+
}
362+
}
363+
else
364+
{
365+
entry["state"] = "live";
366+
}
367+
368+
root["entries"].append(entry);
369+
}
370+
371+
// Process archived entries - all are evicted since they come from hot
372+
// archive
373+
for (auto const& be : archivedEntries)
374+
{
375+
// If we get to this point, we know the key is not in the live
376+
// BucketList, so if we get a DELETED or RESTORED entry, the entry is
377+
// new wrt ledger state.
378+
if (be.type() != HOT_ARCHIVE_ARCHIVED)
379+
{
380+
continue;
381+
}
382+
383+
auto const& le = be.archivedEntry();
384+
385+
// At this point we've "found" the key and know it's archived, so remove
386+
// it from our search set
387+
orderedNotFoundKeys.erase(LedgerEntryKey(le));
388+
389+
Json::Value entry;
390+
entry["e"] = toOpaqueBase64(le);
391+
entry["state"] = "evicted";
392+
root["entries"].append(entry);
393+
}
394+
395+
// Since we removed entries found in the live BucketList and archived
396+
// entries found in the Hot Archive, any remaining keys must be new.
397+
for (auto const& key : orderedNotFoundKeys)
398+
{
399+
Json::Value entry;
400+
entry["e"] = toOpaqueBase64(key);
401+
entry["state"] = "new";
402+
root["entries"].append(entry);
403+
}
404+
405+
retStr = Json::FastWriter().write(root);
406+
return true;
407+
}
193408
}

src/main/QueryServer.h

+16-2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ class QueryServer
2929
std::unordered_map<std::thread::id, SearchableSnapshotConstPtr>
3030
mBucketListSnapshots;
3131

32+
std::unordered_map<std::thread::id, SearchableHotArchiveSnapshotConstPtr>
33+
mHotArchiveBucketListSnapshots;
34+
3235
BucketSnapshotManager& mBucketSnapshotManager;
3336

3437
bool safeRouter(HandlerRoute route, std::string const& params,
@@ -39,14 +42,25 @@ class QueryServer
3942

4043
void addRoute(std::string const& name, HandlerRoute route);
4144

45+
#ifdef BUILD_TESTS
46+
public:
47+
#endif
4248
// Returns raw LedgerKeys for the given keys from the Live BucketList. Does
4349
// not query other BucketLists or reason about archival.
4450
bool getLedgerEntryRaw(std::string const& params, std::string const& body,
4551
std::string& retStr);
4652

53+
bool getLedgerEntry(std::string const& params, std::string const& body,
54+
std::string& retStr);
55+
4756
public:
4857
QueryServer(const std::string& address, unsigned short port, int maxClient,
4958
size_t threadPoolSize,
50-
BucketSnapshotManager& bucketSnapshotManager);
59+
BucketSnapshotManager& bucketSnapshotManager
60+
#ifdef BUILD_TESTS
61+
,
62+
bool useMainThreadForTesting = false
63+
#endif
64+
);
5165
};
52-
}
66+
}

0 commit comments

Comments
 (0)