@@ -15,6 +15,7 @@ use crate::chain::{deserialize, Network, OutPoint, Transaction, TxOut, Txid};
15
15
use crate :: config:: Config ;
16
16
use crate :: daemon:: Daemon ;
17
17
use crate :: errors:: * ;
18
+ use crate :: error_chain:: ChainedError ;
18
19
use crate :: metrics:: { GaugeVec , HistogramOpts , HistogramVec , MetricOpts , Metrics } ;
19
20
use crate :: new_index:: {
20
21
compute_script_hash, schema:: FullHash , ChainQuery , FundingInfo , GetAmountVal , ScriptStats ,
@@ -280,73 +281,6 @@ impl Mempool {
280
281
return HashSet :: from_iter ( self . txstore . keys ( ) . cloned ( ) ) ;
281
282
}
282
283
283
- pub fn download_new_mempool_txs ( daemon : & Daemon , old_txids : & HashSet < Txid > , new_txids : & HashSet < Txid > ) -> Vec < Transaction > {
284
- let txids: Vec < & Txid > = ( * new_txids) . difference ( old_txids) . collect ( ) ;
285
- let tranactions = match daemon. gettransactions ( & txids) {
286
- Ok ( txs) => txs,
287
- Err ( err) => {
288
- warn ! ( "failed to get {} transactions: {}" , txids. len( ) , err) ; // e.g. new block or RBF
289
- vec ! [ ] // return an empty vector if there's an error
290
- }
291
- } ;
292
-
293
- return tranactions;
294
- }
295
-
296
- pub fn update_quick ( & mut self , to_add : & Vec < Transaction > , to_remove : & HashSet < & Txid > ) -> Result < ( ) > {
297
- let _timer = self . latency . with_label_values ( & [ "update_quick" ] ) . start_timer ( ) ;
298
-
299
- // Add new transactions
300
- self . add ( to_add. clone ( ) ) ;
301
- // Remove missing transactions
302
- self . remove ( to_remove. clone ( ) ) ;
303
-
304
- self . count
305
- . with_label_values ( & [ "txs" ] )
306
- . set ( self . txstore . len ( ) as f64 ) ;
307
-
308
- // Update cached backlog stats (if expired)
309
- if self . backlog_stats . 1 . elapsed ( ) > Duration :: from_secs ( BACKLOG_STATS_TTL ) {
310
- self . update_backlog_stats ( ) ;
311
- }
312
-
313
- Ok ( ( ) )
314
- }
315
-
316
- pub fn update ( & mut self , daemon : & Daemon ) -> Result < ( ) > {
317
- let _timer = self . latency . with_label_values ( & [ "update" ] ) . start_timer ( ) ;
318
- let new_txids = daemon
319
- . getmempooltxids ( )
320
- . chain_err ( || "failed to update mempool from daemon" ) ?;
321
- let old_txids = HashSet :: from_iter ( self . txstore . keys ( ) . cloned ( ) ) ;
322
- let to_remove: HashSet < & Txid > = old_txids. difference ( & new_txids) . collect ( ) ;
323
-
324
- // Download and add new transactions from bitcoind's mempool
325
- let txids: Vec < & Txid > = new_txids. difference ( & old_txids) . collect ( ) ;
326
- let to_add = match daemon. gettransactions ( & txids) {
327
- Ok ( txs) => txs,
328
- Err ( err) => {
329
- warn ! ( "failed to get {} transactions: {}" , txids. len( ) , err) ; // e.g. new block or RBF
330
- return Ok ( ( ) ) ; // keep the mempool until next update()
331
- }
332
- } ;
333
- // Add new transactions
334
- self . add ( to_add) ;
335
- // Remove missing transactions
336
- self . remove ( to_remove) ;
337
-
338
- self . count
339
- . with_label_values ( & [ "txs" ] )
340
- . set ( self . txstore . len ( ) as f64 ) ;
341
-
342
- // Update cached backlog stats (if expired)
343
- if self . backlog_stats . 1 . elapsed ( ) > Duration :: from_secs ( BACKLOG_STATS_TTL ) {
344
- self . update_backlog_stats ( ) ;
345
- }
346
-
347
- Ok ( ( ) )
348
- }
349
-
350
284
pub fn update_backlog_stats ( & mut self ) {
351
285
let _timer = self
352
286
. latency
@@ -563,60 +497,47 @@ impl Mempool {
563
497
old_txids : & HashSet < Txid > ,
564
498
new_txids : & HashSet < Txid >
565
499
) -> Result < Vec < Transaction > > {
566
- let t = Instant :: now ( ) ;
567
-
568
500
let txids: Vec < & Txid > = ( * new_txids) . difference ( old_txids) . collect ( ) ;
569
- let transactions = match daemon. gettransactions ( & txids) {
570
- Ok ( txs) => txs,
571
- Err ( err) => {
572
- warn ! ( "failed to get {} transactions: {}" , txids. len( ) , err) ; // e.g. new block or RBF
573
- return Err ( err) ;
574
- }
575
- } ;
576
-
577
- log_fn_duration ( "mempool::download_new_mempool_txs" , t. elapsed ( ) . as_micros ( ) ) ;
501
+ let transactions = daemon. gettransactions ( & txids)
502
+ . chain_err ( || format ! ( "failed to get {} transactions" , txids. len( ) ) ) ?;
578
503
return Ok ( transactions) ;
579
504
}
580
505
581
506
pub fn update ( mempool : & Arc < RwLock < Mempool > > , daemon : & Daemon ) -> Result < ( ) > {
582
- let t = Instant :: now ( ) ;
583
-
584
- // 1. Determine which transactions are no longer in the daemon's mempool
507
+ // 1. Determine which transactions are no longer in the daemon's mempool and which ones have newly entered it
585
508
let old_txids = mempool. read ( ) . unwrap ( ) . old_txids ( ) ;
586
509
let new_txids = daemon
587
510
. getmempooltxids ( )
588
511
. chain_err ( || "failed to update mempool from daemon" ) ?;
589
- let old_mempool_txs: HashSet < & Txid > = old_txids. difference ( & new_txids) . collect ( ) ;
590
- log_fn_duration ( "mempool::paratial_tx_fetch" , t. elapsed ( ) . as_micros ( ) ) ;
512
+ let to_remove: HashSet < & Txid > = old_txids. difference ( & new_txids) . collect ( ) ;
591
513
592
- // 2. Download new transactions from the daemon's mempool
593
- let new_mempool_txs = match Mempool :: download_new_mempool_txs ( & daemon, & old_txids, & new_txids) {
514
+ // 2. Download the new transactions from the daemon's mempool
515
+ let to_add = match Mempool :: download_new_mempool_txs ( & daemon, & old_txids, & new_txids) {
594
516
Ok ( txs) => txs,
595
- Err ( _ ) => {
596
- warn ! ( "Failed to get new mempool txs, skipping mempool update" ) ;
517
+ Err ( err ) => {
518
+ warn ! ( "Unable to get new mempool txs, skipping mempool update: {}" , err . display_chain ( ) ) ;
597
519
return Ok ( ( ) ) ;
598
520
}
599
521
} ;
600
522
601
523
// 3. Update local mempool to match daemon's state
602
524
{
603
- let mut mempool_guard = mempool. write ( ) . unwrap ( ) ;
525
+ let mut mempool = mempool. write ( ) . unwrap ( ) ;
604
526
// Add new transactions
605
- mempool_guard . add ( new_mempool_txs . clone ( ) ) ;
527
+ mempool . add ( to_add ) ;
606
528
// Remove missing transactions
607
- mempool_guard . remove ( old_mempool_txs . clone ( ) ) ;
529
+ mempool . remove ( to_remove ) ;
608
530
609
- mempool_guard . count
531
+ mempool . count
610
532
. with_label_values ( & [ "txs" ] )
611
- . set ( mempool_guard . txstore . len ( ) as f64 ) ;
533
+ . set ( mempool . txstore . len ( ) as f64 ) ;
612
534
613
535
// Update cached backlog stats (if expired)
614
- if mempool_guard . backlog_stats . 1 . elapsed ( ) > Duration :: from_secs ( BACKLOG_STATS_TTL ) {
615
- mempool_guard . update_backlog_stats ( ) ;
536
+ if mempool . backlog_stats . 1 . elapsed ( ) > Duration :: from_secs ( BACKLOG_STATS_TTL ) {
537
+ mempool . update_backlog_stats ( ) ;
616
538
}
617
539
}
618
540
619
- log_fn_duration ( "mempool::update" , t. elapsed ( ) . as_micros ( ) ) ;
620
541
Ok ( ( ) )
621
542
}
622
543
}
0 commit comments