Skip to content

Commit bc795f9

Browse files
authored
Merge branch 'mempool' into docker_build
2 parents 5dfe714 + 46cd53c commit bc795f9

File tree

4 files changed

+304
-71
lines changed

4 files changed

+304
-71
lines changed

src/new_index/mempool.rs

Lines changed: 65 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -206,9 +206,7 @@ impl Mempool {
206206
TxHistoryInfo::Funding(info) => {
207207
// Liquid requires some additional information from the txo that's not available in the TxHistoryInfo index.
208208
#[cfg(feature = "liquid")]
209-
let txo = self
210-
.lookup_txo(&entry.get_funded_outpoint())
211-
.expect("missing txo");
209+
let txo = self.lookup_txo(&entry.get_funded_outpoint())?;
212210

213211
Some(Utxo {
214212
txid: deserialize(&info.txid).expect("invalid txid"),
@@ -345,7 +343,9 @@ impl Mempool {
345343
}
346344
};
347345
// Add new transactions
348-
self.add(to_add)?;
346+
if to_add.len() > self.add(to_add) {
347+
debug!("Mempool update added less transactions than expected");
348+
}
349349
// Remove missing transactions
350350
self.remove(to_remove);
351351

@@ -370,39 +370,66 @@ impl Mempool {
370370
pub fn add_by_txid(&mut self, daemon: &Daemon, txid: &Txid) -> Result<()> {
371371
if self.txstore.get(txid).is_none() {
372372
if let Ok(tx) = daemon.getmempooltx(txid) {
373-
self.add(vec![tx])?;
373+
if self.add(vec![tx]) == 0 {
374+
return Err(format!(
375+
"Unable to add {txid} to mempool likely due to missing parents."
376+
)
377+
.into());
378+
}
374379
}
375380
}
376-
377381
Ok(())
378382
}
379383

380-
fn add(&mut self, txs: Vec<Transaction>) -> Result<()> {
384+
/// Add transactions to the mempool.
385+
///
386+
/// The return value is the number of transactions processed.
387+
#[must_use = "Must deal with [[input vec's length]] > [[result]]."]
388+
fn add(&mut self, txs: Vec<Transaction>) -> usize {
381389
self.delta
382390
.with_label_values(&["add"])
383391
.observe(txs.len() as f64);
384392
let _timer = self.latency.with_label_values(&["add"]).start_timer();
393+
let txlen = txs.len();
394+
if txlen == 0 {
395+
return 0;
396+
}
397+
debug!("Adding {} transactions to Mempool", txlen);
385398

386-
let mut txids = vec![];
399+
let mut txids = Vec::with_capacity(txs.len());
387400
// Phase 1: add to txstore
388401
for tx in txs {
389402
let txid = tx.txid();
390403
txids.push(txid);
391404
self.txstore.insert(txid, tx);
392405
}
393-
// Phase 2: index history and spend edges (can fail if some txos cannot be found)
394-
let txos = match self.lookup_txos(&self.get_prevouts(&txids)) {
395-
Ok(txos) => txos,
396-
Err(err) => {
397-
warn!("lookup txouts failed: {}", err);
398-
// TODO: should we remove txids from txstore?
399-
return Ok(());
400-
}
401-
};
406+
407+
// Phase 2: index history and spend edges (some txos can be missing)
408+
let txos = self.lookup_txos(&self.get_prevouts(&txids));
409+
410+
// Count how many transactions were actually processed.
411+
let mut processed_count = 0;
412+
413+
// Phase 3: Iterate over the transactions and do the following:
414+
// 1. Find all of the TxOuts of each input parent using `txos`
415+
// 2. If any parent wasn't found, skip parsing this transaction
416+
// 3. Insert TxFeeInfo into info.
417+
// 4. Push TxOverview into recent tx queue.
418+
// 5. Create the Spend and Fund TxHistory structs for inputs + outputs
419+
// 6. Insert all TxHistory into history.
420+
// 7. Insert the tx edges into edges (HashMap of (Outpoint, (Txid, vin)))
421+
// 8. (Liquid only) Parse assets of tx.
402422
for txid in txids {
403-
let tx = self.txstore.get(&txid).expect("missing mempool tx");
423+
let tx = self.txstore.get(&txid).expect("missing tx from txstore");
424+
425+
let prevouts = match extract_tx_prevouts(tx, &txos, false) {
426+
Ok(v) => v,
427+
Err(e) => {
428+
warn!("Skipping tx {txid} missing parent error: {e}");
429+
continue;
430+
}
431+
};
404432
let txid_bytes = full_hash(&txid[..]);
405-
let prevouts = extract_tx_prevouts(tx, &txos, false)?;
406433

407434
// Get feeinfo for caching and recent tx overview
408435
let feeinfo = TxFeeInfo::new(tx, &prevouts, self.config.network_type);
@@ -472,18 +499,26 @@ impl Mempool {
472499
&mut self.asset_history,
473500
&mut self.asset_issuance,
474501
);
502+
503+
processed_count += 1;
475504
}
476505

477-
Ok(())
506+
processed_count
478507
}
479508

480-
pub fn lookup_txo(&self, outpoint: &OutPoint) -> Result<TxOut> {
509+
/// Returns None if the lookup fails (mempool transaction RBF-ed etc.)
510+
pub fn lookup_txo(&self, outpoint: &OutPoint) -> Option<TxOut> {
481511
let mut outpoints = BTreeSet::new();
482512
outpoints.insert(*outpoint);
483-
Ok(self.lookup_txos(&outpoints)?.remove(outpoint).unwrap())
513+
// This can possibly be None now
514+
self.lookup_txos(&outpoints).remove(outpoint)
484515
}
485516

486-
pub fn lookup_txos(&self, outpoints: &BTreeSet<OutPoint>) -> Result<HashMap<OutPoint, TxOut>> {
517+
/// For a given set of OutPoints, return a HashMap<OutPoint, TxOut>
518+
///
519+
/// Not all OutPoints from mempool transactions are guaranteed to be there.
520+
/// Ensure you deal with the None case in your logic.
521+
pub fn lookup_txos(&self, outpoints: &BTreeSet<OutPoint>) -> HashMap<OutPoint, TxOut> {
487522
let _timer = self
488523
.latency
489524
.with_label_values(&["lookup_txos"])
@@ -494,18 +529,21 @@ impl Mempool {
494529
let mempool_txos = outpoints
495530
.iter()
496531
.filter(|outpoint| !confirmed_txos.contains_key(outpoint))
497-
.map(|outpoint| {
532+
.flat_map(|outpoint| {
498533
self.txstore
499534
.get(&outpoint.txid)
500535
.and_then(|tx| tx.output.get(outpoint.vout as usize).cloned())
501536
.map(|txout| (*outpoint, txout))
502-
.chain_err(|| format!("missing outpoint {:?}", outpoint))
537+
.or_else(|| {
538+
warn!("missing outpoint {:?}", outpoint);
539+
None
540+
})
503541
})
504-
.collect::<Result<HashMap<OutPoint, TxOut>>>()?;
542+
.collect::<HashMap<OutPoint, TxOut>>();
505543

506544
let mut txos = confirmed_txos;
507545
txos.extend(mempool_txos);
508-
Ok(txos)
546+
txos
509547
}
510548

511549
fn get_prevouts(&self, txids: &[Txid]) -> BTreeSet<OutPoint> {

src/new_index/query.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,19 @@ impl Query {
7171

7272
pub fn broadcast_raw(&self, txhex: &str) -> Result<Txid> {
7373
let txid = self.daemon.broadcast_raw(txhex)?;
74-
self.mempool
74+
// The important part is whether we succeeded in broadcasting.
75+
// Ignore errors in adding to the cache and show an internal warning.
76+
if let Err(e) = self
77+
.mempool
7578
.write()
7679
.unwrap()
77-
.add_by_txid(&self.daemon, &txid)?;
80+
.add_by_txid(&self.daemon, &txid)
81+
{
82+
warn!(
83+
"broadcast_raw of {txid} succeeded to broadcast \
84+
but failed to add to mempool-electrs Mempool cache: {e}"
85+
);
86+
}
7887
Ok(txid)
7988
}
8089

@@ -118,7 +127,7 @@ impl Query {
118127
.or_else(|| self.mempool().lookup_raw_txn(txid))
119128
}
120129

121-
pub fn lookup_txos(&self, outpoints: &BTreeSet<OutPoint>) -> Result<HashMap<OutPoint, TxOut>> {
130+
pub fn lookup_txos(&self, outpoints: &BTreeSet<OutPoint>) -> HashMap<OutPoint, TxOut> {
122131
// the mempool lookup_txos() internally looks up confirmed txos as well
123132
self.mempool().lookup_txos(outpoints)
124133
}

src/new_index/schema.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,7 @@ impl Indexer {
292292
}
293293

294294
fn add(&self, blocks: &[BlockEntry]) {
295+
debug!("Adding {} blocks to Indexer", blocks.len());
295296
// TODO: skip orphaned blocks?
296297
let rows = {
297298
let _timer = self.start_timer("add_process");
@@ -310,6 +311,7 @@ impl Indexer {
310311
}
311312

312313
fn index(&self, blocks: &[BlockEntry]) {
314+
debug!("Indexing {} blocks with Indexer", blocks.len());
313315
let previous_txos_map = {
314316
let _timer = self.start_timer("index_lookup");
315317
lookup_txos(&self.store.txstore_db, &get_previous_txos(blocks), false)
@@ -371,6 +373,34 @@ impl ChainQuery {
371373
}
372374
}
373375

376+
pub fn get_block_txs(&self, hash: &BlockHash) -> Option<Vec<Transaction>> {
377+
let _timer = self.start_timer("get_block_txs");
378+
379+
let txids: Option<Vec<Txid>> = if self.light_mode {
380+
// TODO fetch block as binary from REST API instead of as hex
381+
let mut blockinfo = self.daemon.getblock_raw(hash, 1).ok()?;
382+
Some(serde_json::from_value(blockinfo["tx"].take()).unwrap())
383+
} else {
384+
self.store
385+
.txstore_db
386+
.get(&BlockRow::txids_key(full_hash(&hash[..])))
387+
.map(|val| bincode::deserialize(&val).expect("failed to parse block txids"))
388+
};
389+
390+
txids.and_then(|txid_vec| {
391+
let mut transactions = Vec::with_capacity(txid_vec.len());
392+
393+
for txid in txid_vec {
394+
match self.lookup_txn(&txid, Some(hash)) {
395+
Some(transaction) => transactions.push(transaction),
396+
None => return None,
397+
}
398+
}
399+
400+
Some(transactions)
401+
})
402+
}
403+
374404
pub fn get_block_meta(&self, hash: &BlockHash) -> Option<BlockMeta> {
375405
let _timer = self.start_timer("get_block_meta");
376406

0 commit comments

Comments
 (0)