Skip to content

Commit 0ba78fb

Browse files
committed
Optimize the parallel tx set building implementation.
Also add a simple benchmark in case further improvements are necessary. The changes resulted in up to 20x speedup for some scenarios on my laptop - Avoid unnecessary copies of clusters (use shared ptr instead) - Precompute conflicting transactions, which speeds up low-conflict scenarios very significantly (and this is what we expect to see the most) - Some minor optimizations to avoid doing work when possible Also updated a flaky test scenario.
1 parent 46e2e52 commit 0ba78fb

File tree

2 files changed

+406
-97
lines changed

2 files changed

+406
-97
lines changed

src/herder/ParallelTxSetBuilder.cpp

+120-86
Original file line numberDiff line numberDiff line change
@@ -44,56 +44,42 @@ struct BuilderTx
4444
{
4545
size_t mId = 0;
4646
uint32_t mInstructions = 0;
47-
BitSet mReadOnlyFootprint;
48-
BitSet mReadWriteFootprint;
47+
// Set of ids of transactions that conflict with this transaction.
48+
BitSet mConflictTxs;
4949

50-
BuilderTx(size_t txId, TransactionFrameBase const& tx,
51-
UnorderedMap<LedgerKey, size_t> const& entryIdMap)
50+
BuilderTx(size_t txId, TransactionFrameBase const& tx)
5251
: mId(txId), mInstructions(tx.sorobanResources().instructions)
5352
{
54-
auto const& footprint = tx.sorobanResources().footprint;
55-
for (auto const& key : footprint.readOnly)
56-
{
57-
mReadOnlyFootprint.set(entryIdMap.at(key));
58-
}
59-
for (auto const& key : footprint.readWrite)
60-
{
61-
mReadWriteFootprint.set(entryIdMap.at(key));
62-
}
6353
}
6454
};
6555

66-
// Cluster of (potentialy transitively) dependent transactions.
56+
// Cluster of (potentially transitively) dependent transactions.
6757
// Transactions are considered to be dependent if the have the same key in
6858
// their footprints and for at least one of them this key belongs to read-write
6959
// footprint.
7060
struct Cluster
7161
{
7262
// Total number of instructions in the cluster. Since transactions are
73-
// dependenent, these are always 'sequential' instructions.
63+
// dependent, these are always 'sequential' instructions.
7464
uint64_t mInstructions = 0;
75-
// Union of read-only footprints of all transactions in the cluster.
76-
BitSet mReadOnlyEntries;
77-
// Union of read-write footprints of all transactions in the cluster.
78-
BitSet mReadWriteEntries;
65+
// Set of ids of transactions that conflict with this cluster.
66+
BitSet mConflictTxs;
7967
// Set of transaction ids in the cluster.
8068
BitSet mTxIds;
8169
// Id of the bin within a stage in which the cluster is packed.
82-
size_t mBinId = 0;
70+
size_t mutable mBinId = 0;
8371

8472
explicit Cluster(BuilderTx const& tx) : mInstructions(tx.mInstructions)
8573
{
86-
mReadOnlyEntries.inplaceUnion(tx.mReadOnlyFootprint);
87-
mReadWriteEntries.inplaceUnion(tx.mReadWriteFootprint);
74+
mConflictTxs.inplaceUnion(tx.mConflictTxs);
8875
mTxIds.set(tx.mId);
8976
}
9077

9178
void
9279
merge(Cluster const& other)
9380
{
9481
mInstructions += other.mInstructions;
95-
mReadOnlyEntries.inplaceUnion(other.mReadOnlyEntries);
96-
mReadWriteEntries.inplaceUnion(other.mReadWriteEntries);
82+
mConflictTxs.inplaceUnion(other.mConflictTxs);
9783
mTxIds.inplaceUnion(other.mTxIds);
9884
}
9985
};
@@ -129,14 +115,12 @@ class Stage
129115
auto conflictingClusters = getConflictingClusters(tx);
130116

131117
bool packed = false;
132-
// Then, create new clusters by merging the conflicting clusters
118+
// Then, try creating new clusters by merging the conflicting clusters
133119
// together and adding the new transaction to the resulting cluster.
134120
auto newClusters = createNewClusters(tx, conflictingClusters, packed);
135-
releaseAssert(!newClusters.empty());
136-
137-
// If the new cluster exceeds the limit of instructions per cluster,
138-
// we can't add the transaction.
139-
if (newClusters.back().mInstructions > mConfig.mInstructionsPerCluster)
121+
// Fail fast if a new cluster will end up too large to fit into the
122+
// stage.
123+
if (newClusters.empty())
140124
{
141125
return false;
142126
}
@@ -175,9 +159,9 @@ class Stage
175159
for (auto const& cluster : mClusters)
176160
{
177161
size_t txId = 0;
178-
while (cluster.mTxIds.nextSet(txId))
162+
while (cluster->mTxIds.nextSet(txId))
179163
{
180-
visitor(cluster.mBinId, txId);
164+
visitor(cluster->mBinId, txId);
181165
++txId;
182166
}
183167
}
@@ -188,49 +172,38 @@ class Stage
188172
getConflictingClusters(BuilderTx const& tx) const
189173
{
190174
std::unordered_set<Cluster const*> conflictingClusters;
191-
for (Cluster const& cluster : mClusters)
175+
for (auto const& cluster : mClusters)
192176
{
193-
bool isConflicting = tx.mReadOnlyFootprint.intersectionCount(
194-
cluster.mReadWriteEntries) > 0 ||
195-
tx.mReadWriteFootprint.intersectionCount(
196-
cluster.mReadOnlyEntries) > 0 ||
197-
tx.mReadWriteFootprint.intersectionCount(
198-
cluster.mReadWriteEntries) > 0;
199-
if (isConflicting)
177+
if (cluster->mConflictTxs.get(tx.mId))
200178
{
201-
conflictingClusters.insert(&cluster);
179+
conflictingClusters.insert(cluster.get());
202180
}
203181
}
204182
return conflictingClusters;
205183
}
206184

207-
std::vector<Cluster>
185+
std::vector<std::shared_ptr<Cluster const>>
208186
createNewClusters(BuilderTx const& tx,
209187
std::unordered_set<Cluster const*> const& txConflicts,
210188
bool& packed)
211189
{
212-
std::vector<Cluster> newClusters;
213-
newClusters.reserve(mClusters.size());
214-
for (auto const& cluster : mClusters)
215-
{
216-
if (txConflicts.find(&cluster) == txConflicts.end())
217-
{
218-
newClusters.push_back(cluster);
219-
}
220-
}
221-
222-
newClusters.emplace_back(tx);
190+
int64_t newInstructions = tx.mInstructions;
223191
for (auto const* cluster : txConflicts)
224192
{
225-
newClusters.back().merge(*cluster);
193+
newInstructions += cluster->mInstructions;
226194
}
195+
227196
// Fast-fail condition to ensure that the new cluster doesn't exceed
228197
// the instructions limit.
229-
if (newClusters.back().mInstructions > mConfig.mInstructionsPerCluster)
198+
if (newInstructions > mConfig.mInstructionsPerCluster)
230199
{
231-
return newClusters;
200+
return {};
201+
}
202+
auto newCluster = std::make_shared<Cluster>(tx);
203+
for (auto const* cluster : txConflicts)
204+
{
205+
newCluster->merge(*cluster);
232206
}
233-
234207
// Remove the clusters that were merged from their respective bins.
235208
for (auto const& cluster : txConflicts)
236209
{
@@ -244,16 +217,27 @@ class Stage
244217
// the bin-packing from scratch.
245218
for (size_t binId = 0; binId < mConfig.mClustersPerStage; ++binId)
246219
{
247-
if (mBinInstructions[binId] + newClusters.back().mInstructions <=
220+
if (mBinInstructions[binId] + newCluster->mInstructions <=
248221
mConfig.mInstructionsPerCluster)
249222
{
250-
mBinInstructions[binId] += newClusters.back().mInstructions;
251-
mBinPacking[binId].inplaceUnion(newClusters.back().mTxIds);
252-
newClusters.back().mBinId = binId;
223+
mBinInstructions[binId] += newCluster->mInstructions;
224+
mBinPacking[binId].inplaceUnion(newCluster->mTxIds);
225+
newCluster->mBinId = binId;
253226
packed = true;
254227
break;
255228
}
256229
}
230+
231+
std::vector<std::shared_ptr<Cluster const>> newClusters;
232+
newClusters.reserve(mClusters.size() + 1 - txConflicts.size());
233+
for (auto const& cluster : mClusters)
234+
{
235+
if (txConflicts.find(cluster.get()) == txConflicts.end())
236+
{
237+
newClusters.push_back(cluster);
238+
}
239+
}
240+
newClusters.push_back(newCluster);
257241
// If we couldn't pack the new cluster without full bin-packing, we
258242
// recover the state of the bins (so that the transaction is not
259243
// considered to have been added yet).
@@ -273,31 +257,33 @@ class Stage
273257
// This has around 11/9 maximum approximation ratio, which probably has
274258
// the best complexity/performance tradeoff out of all the heuristics.
275259
std::vector<BitSet>
276-
binPacking(std::vector<Cluster>& clusters,
260+
binPacking(std::vector<std::shared_ptr<Cluster const>>& clusters,
277261
std::vector<uint64_t>& binInsns) const
278262
{
279263
// We could consider dropping the sort here in order to save some time
280264
// and using just the first-fit heuristic, but that also raises the
281265
// approximation ratio to 1.7.
282266
std::sort(clusters.begin(), clusters.end(),
283267
[](auto const& a, auto const& b) {
284-
return a.mInstructions > b.mInstructions;
268+
return a->mInstructions > b->mInstructions;
285269
});
286270
size_t const binCount = mConfig.mClustersPerStage;
287271
std::vector<BitSet> bins(binCount);
288272
binInsns.resize(binCount);
273+
std::vector<size_t> newBinId(clusters.size());
289274
// Just add every cluster into the first bin it fits into.
290-
for (auto& cluster : clusters)
275+
for (size_t clusterId = 0; clusterId < clusters.size(); ++clusterId)
291276
{
277+
auto const& cluster = clusters[clusterId];
292278
bool packed = false;
293279
for (size_t i = 0; i < binCount; ++i)
294280
{
295-
if (binInsns[i] + cluster.mInstructions <=
281+
if (binInsns[i] + cluster->mInstructions <=
296282
mConfig.mInstructionsPerCluster)
297283
{
298-
binInsns[i] += cluster.mInstructions;
299-
bins[i].inplaceUnion(cluster.mTxIds);
300-
cluster.mBinId = i;
284+
binInsns[i] += cluster->mInstructions;
285+
bins[i].inplaceUnion(cluster->mTxIds);
286+
newBinId[clusterId] = i;
301287
packed = true;
302288
break;
303289
}
@@ -307,10 +293,14 @@ class Stage
307293
return std::vector<BitSet>();
308294
}
309295
}
296+
for (size_t clusterId = 0; clusterId < clusters.size(); ++clusterId)
297+
{
298+
clusters[clusterId]->mBinId = newBinId[clusterId];
299+
}
310300
return bins;
311301
}
312302

313-
std::vector<Cluster> mClusters;
303+
std::vector<std::shared_ptr<Cluster const>> mClusters;
314304
std::vector<BitSet> mBinPacking;
315305
std::vector<uint64_t> mBinInstructions;
316306
int64_t mInstructions = 0;
@@ -327,35 +317,79 @@ buildSurgePricedParallelSorobanPhase(
327317
std::vector<bool>& hadTxNotFittingLane)
328318
{
329319
ZoneScoped;
330-
// Map all the entries in the footprints to integers in order to be able to
331-
// use the bitset operations.
332-
UnorderedMap<LedgerKey, size_t> entryIdMap;
333-
auto addToMap = [&entryIdMap](LedgerKey const& key) {
334-
auto sz = entryIdMap.size();
335-
entryIdMap.emplace(key, sz);
336-
};
337-
for (auto const& txFrame : txFrames)
320+
// Simplify the transactions to the minimum necessary amount of data.
321+
std::unordered_map<TransactionFrameBaseConstPtr, BuilderTx const*>
322+
builderTxForTx;
323+
std::vector<std::unique_ptr<BuilderTx>> builderTxs;
324+
builderTxs.reserve(txFrames.size());
325+
for (size_t i = 0; i < txFrames.size(); ++i)
338326
{
327+
auto const& txFrame = txFrames[i];
328+
builderTxs.emplace_back(std::make_unique<BuilderTx>(i, *txFrame));
329+
builderTxForTx.emplace(txFrame, builderTxs.back().get());
330+
}
331+
332+
// Before trying to include any transactions, find all the pairs of the
333+
// conflicting transactions and mark the conflicts in the builderTxs.
334+
//
335+
// In order to find the conflicts, we build the maps from the footprint
336+
// keys to transactions, then mark the conflicts between the transactions
337+
// that share RW key, or between the transactions that share RO and RW key.
338+
//
339+
// The approach here is optimized towards the low number of conflicts,
340+
// specifically when there are no conflicts at all, the complexity is just
341+
// O(total_footprint_entry_count). The worst case is roughly
342+
// O(max_tx_footprint_size * transaction_count ^ 2), which is equivalent
343+
// to the complexity of the straightforward approach of iterating over all
344+
// the transaction pairs.
345+
//
346+
// This also has the further optimization potential: we could populate the
347+
// key maps and even the conflicting transactions eagerly in tx queue, thus
348+
// amortizing the costs across the whole ledger duration.
349+
UnorderedMap<LedgerKey, std::vector<size_t>> txsWithRoKey;
350+
UnorderedMap<LedgerKey, std::vector<size_t>> txsWithRwKey;
351+
for (size_t i = 0; i < txFrames.size(); ++i)
352+
{
353+
auto const& txFrame = txFrames[i];
339354
auto const& footprint = txFrame->sorobanResources().footprint;
340355
for (auto const& key : footprint.readOnly)
341356
{
342-
addToMap(key);
357+
txsWithRoKey[key].push_back(i);
343358
}
344359
for (auto const& key : footprint.readWrite)
345360
{
346-
addToMap(key);
361+
txsWithRwKey[key].push_back(i);
347362
}
348363
}
349364

350-
// Simplify the transactions to the minimum necessary amount of data.
351-
std::unordered_map<TransactionFrameBaseConstPtr, BuilderTx> builderTxForTx;
352-
for (size_t i = 0; i < txFrames.size(); ++i)
365+
for (auto const& [key, rwTxIds] : txsWithRwKey)
353366
{
354-
auto const& txFrame = txFrames[i];
355-
builderTxForTx.emplace(txFrame, BuilderTx(i, *txFrame, entryIdMap));
367+
// RW-RW conflicts
368+
for (size_t i = 0; i < rwTxIds.size(); ++i)
369+
{
370+
for (size_t j = i + 1; j < rwTxIds.size(); ++j)
371+
{
372+
builderTxs[rwTxIds[i]]->mConflictTxs.set(rwTxIds[j]);
373+
builderTxs[rwTxIds[j]]->mConflictTxs.set(rwTxIds[i]);
374+
}
375+
}
376+
// RO-RW conflicts
377+
auto roIt = txsWithRoKey.find(key);
378+
if (roIt != txsWithRoKey.end())
379+
{
380+
auto const& roTxIds = roIt->second;
381+
for (size_t i = 0; i < roTxIds.size(); ++i)
382+
{
383+
for (size_t j = 0; j < rwTxIds.size(); ++j)
384+
{
385+
builderTxs[roTxIds[i]]->mConflictTxs.set(rwTxIds[j]);
386+
builderTxs[rwTxIds[j]]->mConflictTxs.set(roTxIds[i]);
387+
}
388+
}
389+
}
356390
}
357391

358-
// Process the transactions in the surge pricing (drecreasing fee) order.
392+
// Process the transactions in the surge pricing (decreasing fee) order.
359393
// This also automatically ensures that the resource limits are respected
360394
// for all the dimensions besides instructions.
361395
SurgePricingPriorityQueue queue(
@@ -378,7 +412,7 @@ buildSurgePricedParallelSorobanPhase(
378412
releaseAssert(builderTxIt != builderTxForTx.end());
379413
for (auto& stage : stages)
380414
{
381-
if (stage.tryAdd(builderTxIt->second))
415+
if (stage.tryAdd(*builderTxIt->second))
382416
{
383417
added = true;
384418
break;

0 commit comments

Comments
 (0)