Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ impl Daemon {
/// Fetch the given transactions in parallel over multiple threads and RPC connections,
/// ignoring any missing ones and returning whatever is available.
#[trace]
pub fn gettransactions_available(&self, txids: &[&Txid]) -> Result<Vec<(Txid, Transaction)>> {
pub fn gettransactions_available(&self, txids: &[&Txid]) -> Result<HashMap<Txid, Transaction>> {
const RPC_INVALID_ADDRESS_OR_KEY: i64 = -5;

let params_list: Vec<Value> = txids
Expand Down
171 changes: 92 additions & 79 deletions src/new_index/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,99 +516,112 @@ impl Mempool {
daemon: &Daemon,
tip: &BlockHash,
) -> Result<bool> {
let _timer = mempool
.read()
.unwrap()
.latency
.with_label_values(&["update"])
.start_timer();
let (_timer, count) = {
let mempool = mempool.read().unwrap();
let timer = mempool.latency.with_label_values(&["update"]).start_timer();
(timer, mempool.count.clone())
};

// Continuously attempt to fetch mempool transactions until we're able to get them in full
let mut fetched_txs = HashMap::<Txid, Transaction>::new();
let mut indexed_txids = mempool.read().unwrap().txids_set();
loop {
// Get bitcoind's current list of mempool txids
let all_txids = daemon
.getmempooltxids()
.chain_err(|| "failed to update mempool from daemon")?;

// Remove evicted mempool transactions
mempool
.write()
.unwrap()
.remove(indexed_txids.difference(&all_txids).collect());

indexed_txids.retain(|txid| all_txids.contains(txid));
fetched_txs.retain(|txid, _| all_txids.contains(txid));

// Fetch missing transactions from bitcoind
let new_txids = all_txids
.iter()
.filter(|&txid| !fetched_txs.contains_key(txid) && !indexed_txids.contains(txid))
.collect::<Vec<_>>();
if new_txids.is_empty() {
break;
}
debug!(
"mempool with total {} txs, {} fetched, {} missing",
all_txids.len(),
indexed_txids.len() + fetched_txs.len(),
new_txids.len()
);
// Get bitcoind's current list of mempool txids
let bitcoind_txids = daemon
.getmempooltxids()
.chain_err(|| "failed to update mempool from daemon")?;

// Get the list of mempool txids in the local mempool view
let indexed_txids = mempool.read().unwrap().txids_set();

// Remove evicted mempool transactions from the local mempool view
let evicted_txids = indexed_txids
.difference(&bitcoind_txids)
.collect::<HashSet<_>>();
if !evicted_txids.is_empty() {
mempool.write().unwrap().remove(evicted_txids);
} // avoids acquiring a lock when there are no evictions

// Find transactions available in bitcoind's mempool but not indexed locally
let new_txids = bitcoind_txids
.difference(&indexed_txids)
.collect::<Vec<_>>();

debug!(
"mempool with total {} txs, {} indexed locally, {} to fetch",
bitcoind_txids.len(),
indexed_txids.len(),
new_txids.len()
);
count
.with_label_values(&["all_txs"])
.set(bitcoind_txids.len() as f64);
count
.with_label_values(&["indexed_txs"])
.set(indexed_txids.len() as f64);
count
.with_label_values(&["missing_txs"])
.set(new_txids.len() as f64);

if new_txids.is_empty() {
return Ok(true);
}

{
let mempool = mempool.read().unwrap();

mempool
.count
.with_label_values(&["all_txs"])
.set(all_txids.len() as f64);
mempool
.count
.with_label_values(&["fetched_txs"])
.set((indexed_txids.len() + fetched_txs.len()) as f64);
mempool
.count
.with_label_values(&["missing_txs"])
.set(new_txids.len() as f64);
}
// Fetch missing transactions from bitcoind
let mut fetched_txs = daemon.gettransactions_available(&new_txids)?;

let new_txs = daemon.gettransactions_available(&new_txids)?;
// Abort if the chain tip moved while fetching transactions
if daemon.getbestblockhash()? != *tip {
warn!("chain tip moved while updating mempool");
return Ok(false);
}

// Abort if the chain tip moved while fetching transactions
if daemon.getbestblockhash()? != *tip {
warn!("chain tip moved while updating mempool");
return Ok(false);
}
// Find which transactions were requested but are no longer available in bitcoind's mempool,
// typically due to Replace-By-Fee (or mempool eviction for some other reason) taking place
// between querying for the mempool txids and querying for the transactions themselves.
let mut replaced_txids: HashSet<_> = new_txids
.into_iter()
.filter(|txid| !fetched_txs.contains_key(*txid))
.cloned()
.collect();

let fetched_count = new_txs.len();
fetched_txs.extend(new_txs);
if replaced_txids.is_empty() {
trace!("fetched complete mempool snapshot");
} else {
warn!(
"could not to fetch {} replaced/evicted mempool transactions: {:?}",
replaced_txids.len(),
replaced_txids.iter().take(10).collect::<Vec<_>>()
);
}

// Retry if any transactions were evicted form the mempool before we managed to get them
if fetched_count != new_txids.len() {
warn!(
"failed to fetch {} mempool txs, retrying...",
new_txids.len() - fetched_count
);
let missing_txids: Vec<_> = new_txids
// If we were unable to get a complete consistent snapshot of the bitcoind mempool,
// detect and remove any transactions that spend from the missing (replaced) transactions
// or any of their descendants. This is necessary because it could be possible to fetch the
// child tx successfully before the parent is replaced, but miss the replaced parent tx.
while !replaced_txids.is_empty() {
let mut descendants_txids = HashSet::new();
fetched_txs.retain(|txid, tx| {
let parent_was_replaced = tx
.input
.iter()
.filter(|txid| !fetched_txs.contains_key(**txid))
.take(10)
.collect();
warn!("missing mempool txids: {:?} (capped at 10)", missing_txids);
} else {
break;
}
.any(|txin| replaced_txids.contains(&txin.previous_output.txid));
if parent_was_replaced {
descendants_txids.insert(*txid);
}
!parent_was_replaced
});
trace!(
"detected {} replaced mempool descendants",
descendants_txids.len()
);
replaced_txids = descendants_txids;
}

// Add fetched transactions to our view of the mempool
{
trace!("indexing {} new mempool transactions", fetched_txs.len());
if !fetched_txs.is_empty() {
let mut mempool = mempool.write().unwrap();

mempool.add(fetched_txs)?;

mempool
.count
count
.with_label_values(&["txs"])
.set(mempool.txstore.len() as f64);

Expand Down
Loading