Skip to content

Commit 22573f6

Browse files
committed
[ft] Replenish > 1000 lastrevealed with SMT
[CI] Cargo fmt applied [Req] Remove used of mutex and instead use returned results. [std/non-std] variations [fmt] Cargo fmt [Clean] dedup'd code [Clean] Cargo Fmt repair
1 parent eeedb4e commit 22573f6

File tree

2 files changed

+94
-11
lines changed

2 files changed

+94
-11
lines changed

crates/chain/src/indexer/keychain_txout.rs

Lines changed: 90 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@ use crate::{
99
spk_txout::SpkTxOutIndex,
1010
DescriptorExt, DescriptorId, Indexed, Indexer, KeychainIndexed, SpkIterator,
1111
};
12-
use alloc::{borrow::ToOwned, vec::Vec};
12+
use alloc::{borrow::ToOwned, sync::Arc, vec::Vec};
1313
use bitcoin::{Amount, OutPoint, ScriptBuf, SignedAmount, Transaction, TxOut, Txid};
1414
use core::{
1515
fmt::Debug,
1616
ops::{Bound, RangeBounds},
1717
};
18+
#[cfg(feature = "std")]
19+
use std::thread;
1820

1921
use crate::Merge;
2022

@@ -445,6 +447,8 @@ impl<K: Clone + Ord + Debug> KeychainTxOutIndex<K> {
445447
}
446448

447449
/// Syncs the state of the inner spk index after changes to a keychain
450+
/// Note: Uses multithreading to parallelize the derivation of script pubkeys when processing
451+
/// more than 1000 indices (only when the std feature is enabled)
448452
fn replenish_inner_index(&mut self, did: DescriptorId, keychain: &K, lookahead: u32) {
449453
let descriptor = self.descriptors.get(&did).expect("invariant");
450454
let next_store_index = self
@@ -454,13 +458,92 @@ impl<K: Clone + Ord + Debug> KeychainTxOutIndex<K> {
454458
.last()
455459
.map_or(0, |((_, index), _)| *index + 1);
456460
let next_reveal_index = self.last_revealed.get(&did).map_or(0, |v| *v + 1);
457-
for (new_index, new_spk) in
458-
SpkIterator::new_with_range(descriptor, next_store_index..next_reveal_index + lookahead)
461+
let end_index = next_reveal_index + lookahead;
462+
463+
if next_store_index >= end_index {
464+
return;
465+
}
466+
467+
let total_indices = end_index - next_store_index;
468+
469+
// Helper function to generate script pubkeys in a single-threaded manner
470+
let generate_spks = |descriptor: &Descriptor<DescriptorPublicKey>,
471+
start: u32,
472+
end: u32|
473+
-> Vec<(u32, ScriptBuf)> {
474+
let mut results = Vec::with_capacity((end - start) as usize);
475+
for (new_index, new_spk) in SpkIterator::new_with_range(descriptor, start..end) {
476+
results.push((new_index, new_spk));
477+
}
478+
results
479+
};
480+
481+
let process_results = |this: &mut Self, results: Vec<(u32, ScriptBuf)>| {
482+
for (new_index, new_spk) in results {
483+
let _inserted = this
484+
.inner
485+
.insert_spk((keychain.clone(), new_index), new_spk);
486+
debug_assert!(_inserted, "replenish lookahead: must not have existing spk: keychain={:?}, lookahead={}, next_store_index={}, next_reveal_index={}", keychain, lookahead, next_store_index, next_reveal_index);
487+
}
488+
};
489+
490+
#[cfg(not(feature = "std"))]
459491
{
460-
let _inserted = self
461-
.inner
462-
.insert_spk((keychain.clone(), new_index), new_spk);
463-
debug_assert!(_inserted, "replenish lookahead: must not have existing spk: keychain={:?}, lookahead={}, next_store_index={}, next_reveal_index={}", keychain, lookahead, next_store_index, next_reveal_index);
492+
let results = generate_spks(descriptor, next_store_index, end_index);
493+
process_results(self, results);
494+
return;
495+
}
496+
497+
#[cfg(feature = "std")]
498+
{
499+
if total_indices < 1000 {
500+
let results = generate_spks(descriptor, next_store_index, end_index);
501+
process_results(self, results);
502+
return;
503+
}
504+
505+
let num_cpus = thread::available_parallelism()
506+
.map(|p| p.get())
507+
.unwrap_or(1);
508+
let chunk_size = (total_indices + num_cpus as u32 - 1) / num_cpus as u32;
509+
let descriptor = Arc::new(descriptor.clone());
510+
let mut handles = Vec::with_capacity(num_cpus);
511+
512+
for i in 0..num_cpus {
513+
let start = next_store_index + (i as u32 * chunk_size);
514+
if start >= end_index {
515+
break;
516+
}
517+
518+
let end = (start + chunk_size).min(end_index);
519+
let descriptor_clone = Arc::clone(&descriptor);
520+
521+
let handle = thread::spawn(move || {
522+
let mut thread_results = Vec::with_capacity((end - start) as usize);
523+
524+
for (new_index, new_spk) in
525+
SpkIterator::new_with_range(&*descriptor_clone, start..end)
526+
{
527+
thread_results.push((new_index, new_spk));
528+
}
529+
530+
thread_results
531+
});
532+
533+
handles.push(handle);
534+
}
535+
536+
let mut all_results = Vec::new();
537+
for handle in handles {
538+
if let Ok(thread_results) = handle.join() {
539+
all_results.extend(thread_results);
540+
}
541+
}
542+
543+
// Sort results by index to ensure they're processed in order
544+
all_results.sort_by_key(|(index, _)| *index);
545+
546+
process_results(self, all_results);
464547
}
465548
}
466549

crates/chain/tests/test_keychain_txout_index.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -542,10 +542,10 @@ fn lookahead_to_target() {
542542
},
543543
TestCase {
544544
lookahead: 13,
545-
external_last_revealed: Some(100),
546-
internal_last_revealed: Some(21),
547-
internal_target: Some(120),
548-
external_target: Some(130),
545+
external_last_revealed: Some(1100),
546+
internal_last_revealed: Some(1200),
547+
internal_target: Some(1110),
548+
external_target: Some(1120),
549549
},
550550
];
551551

0 commit comments

Comments
 (0)