Skip to content

Commit 0f8de96

Browse files
committed
Refactor block fetcher
1 parent 8fc2d02 commit 0f8de96

File tree

1 file changed

+191
-80
lines changed

1 file changed

+191
-80
lines changed

node/src/source.rs

Lines changed: 191 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::{
33
fmt,
44
sync::{
55
atomic::{AtomicU64, AtomicUsize, Ordering},
6+
mpsc::Receiver,
67
Arc,
78
},
89
time::Duration,
@@ -382,93 +383,27 @@ impl BlockFetcher {
382383

383384
fn run_workers(
384385
job_id: usize,
385-
current_task: Arc<AtomicUsize>,
386+
current_job: Arc<AtomicUsize>,
386387
rpc: Arc<BitcoinRpc>,
387388
sender: std::sync::mpsc::SyncSender<BlockEvent>,
388389
start_block: ChainAnchor,
389390
end_height: u32,
390391
concurrency: usize,
391392
) -> Result<ChainAnchor, BlockFetchError> {
392-
let pool = ThreadPool::new(concurrency);
393-
let client = reqwest::blocking::Client::new();
394-
395-
let (tx, rx) = std::sync::mpsc::sync_channel(1);
396-
397-
let mut queued_height = start_block.height + 1;
398-
399-
let mut parsed_blocks = BTreeMap::new();
400-
let mut previous_hash = start_block.hash;
401-
let mut next_emit_height = queued_height;
402-
403-
while queued_height <= end_height || pool.active_count() > 0 || !parsed_blocks.is_empty() {
404-
if current_task.load(Ordering::SeqCst) != job_id {
405-
return Err(BlockFetchError::ChannelClosed);
406-
}
407-
408-
while pool.queued_count() < concurrency && queued_height <= end_height {
409-
if current_task.load(Ordering::SeqCst) != job_id {
410-
return Err(BlockFetchError::ChannelClosed);
411-
}
412-
413-
let tx = tx.clone();
414-
let rpc = rpc.clone();
415-
let task_client = client.clone();
416-
let task_sigterm = current_task.clone();
417-
418-
pool.execute(move || {
419-
if task_sigterm.load(Ordering::SeqCst) != job_id {
420-
return;
421-
}
422-
let result: Result<_, BitcoinRpcError> = (move || {
423-
let hash: BlockHash = rpc
424-
.send_json_blocking(&task_client, &rpc.get_block_hash(queued_height))?;
425-
let block = Self::fetch_block(&rpc, &task_client, &hash)?;
426-
Ok((
427-
queued_height,
428-
ChainAnchor {
429-
height: queued_height,
430-
hash,
431-
},
432-
block,
433-
))
434-
})();
435-
436-
_ = tx.send(result);
437-
});
438-
439-
queued_height += 1;
440-
}
441-
442-
// Check if any blocks are ready to emit
443-
if let Ok(result) = rx.try_recv() {
444-
if current_task.load(Ordering::SeqCst) != job_id {
445-
return Err(BlockFetchError::ChannelClosed);
446-
}
447-
448-
let (height, id, block) = result?;
449-
parsed_blocks.insert(height, (id, block));
450-
451-
// Emit blocks in order
452-
while let Some((id, block)) = parsed_blocks.remove(&next_emit_height) {
453-
if current_task.load(Ordering::SeqCst) != job_id {
454-
return Err(BlockFetchError::ChannelClosed);
455-
}
456-
if block.header.prev_blockhash != previous_hash {
457-
return Err(BlockFetchError::BlockMismatch);
458-
}
459-
sender
460-
.send(BlockEvent::Block(id.clone(), block))
461-
.map_err(|_| BlockFetchError::ChannelClosed)?;
462-
previous_hash = id.hash;
463-
next_emit_height += 1;
464-
}
465-
}
466-
}
393+
let mut workers = Workers {
394+
current_job,
395+
job_id,
396+
out_of_order: Default::default(),
397+
last_emitted: start_block,
398+
queued_height: start_block.height + 1,
399+
end_height,
400+
ordered_sender: sender,
401+
rpc,
402+
concurrency,
403+
pool: ThreadPool::new(concurrency),
404+
};
467405

468-
Ok(ChainAnchor {
469-
height: next_emit_height - 1,
470-
hash: previous_hash,
471-
})
406+
workers.run()
472407
}
473408

474409
pub fn fetch_block(
@@ -520,6 +455,129 @@ impl BlockFetcher {
520455
}
521456
}
522457

458+
struct Workers {
459+
current_job: Arc<AtomicUsize>,
460+
job_id: usize,
461+
out_of_order: BTreeMap<u32, (ChainAnchor, Block)>,
462+
last_emitted: ChainAnchor,
463+
queued_height: u32,
464+
end_height: u32,
465+
ordered_sender: std::sync::mpsc::SyncSender<BlockEvent>,
466+
rpc: Arc<BitcoinRpc>,
467+
concurrency: usize,
468+
pool: ThreadPool,
469+
}
470+
471+
type RpcBlockReceiver = Receiver<Result<(ChainAnchor, Block), BitcoinRpcError>>;
472+
473+
impl Workers {
474+
fn try_emit_next_block(
475+
&mut self,
476+
unordered: &RpcBlockReceiver,
477+
) -> Result<bool, BlockFetchError> {
478+
while let Ok(unordered_block) = unordered.try_recv() {
479+
if self.should_stop() {
480+
return Err(BlockFetchError::ChannelClosed);
481+
}
482+
let (id, block) = unordered_block?;
483+
self.out_of_order.insert(id.height, (id, block));
484+
}
485+
486+
let next = self.last_emitted.height + 1;
487+
if next > self.end_height {
488+
return Ok(false);
489+
}
490+
491+
if let Some((id, block)) = self.out_of_order.remove(&next) {
492+
if self.should_stop() {
493+
return Err(BlockFetchError::ChannelClosed);
494+
}
495+
496+
if block.header.prev_blockhash != self.last_emitted.hash {
497+
return Err(BlockFetchError::BlockMismatch);
498+
}
499+
500+
self.last_emitted = id;
501+
self.ordered_sender
502+
.send(BlockEvent::Block(id.clone(), block))
503+
.map_err(|_| BlockFetchError::ChannelClosed)?;
504+
return Ok(true);
505+
}
506+
Ok(false)
507+
}
508+
509+
#[inline(always)]
510+
fn can_add_workers(&self) -> bool {
511+
self.pool.queued_count() < self.concurrency && !self.queued_all()
512+
}
513+
514+
#[inline(always)]
515+
fn queued_all(&self) -> bool {
516+
self.queued_height > self.end_height
517+
}
518+
519+
fn run(&mut self) -> Result<ChainAnchor, BlockFetchError> {
520+
let client = reqwest::blocking::Client::new();
521+
let (tx, rx) = std::sync::mpsc::sync_channel(self.concurrency);
522+
523+
'queue_blocks: while !self.queued_all() {
524+
if self.should_stop() {
525+
return Err(BlockFetchError::ChannelClosed);
526+
}
527+
528+
while self.can_add_workers() {
529+
if self.should_stop() {
530+
return Err(BlockFetchError::ChannelClosed);
531+
}
532+
let tx = tx.clone();
533+
let rpc = self.rpc.clone();
534+
let task_client = client.clone();
535+
let task_sigterm = self.current_job.clone();
536+
let height = self.queued_height;
537+
let job_id = self.job_id;
538+
539+
self.pool.execute(move || {
540+
if task_sigterm.load(Ordering::SeqCst) != job_id {
541+
return;
542+
}
543+
let result: Result<_, BitcoinRpcError> = (move || {
544+
let hash: BlockHash =
545+
rpc.send_json_blocking(&task_client, &rpc.get_block_hash(height))?;
546+
let block = BlockFetcher::fetch_block(&rpc, &task_client, &hash)?;
547+
Ok((ChainAnchor { height, hash }, block))
548+
})();
549+
_ = tx.send(result);
550+
});
551+
self.queued_height += 1;
552+
}
553+
554+
// Emits any completed blocks while workers are processing
555+
while self.try_emit_next_block(&rx)? {
556+
// If pool has availability queue up more instead
557+
if self.can_add_workers() {
558+
continue 'queue_blocks;
559+
}
560+
}
561+
562+
std::thread::sleep(Duration::from_millis(10));
563+
}
564+
565+
// Wait for all blocks to be processed
566+
while self.pool.active_count() > 0 || self.last_emitted.height != self.end_height {
567+
while self.try_emit_next_block(&rx)? {
568+
// do nothing
569+
}
570+
std::thread::sleep(Duration::from_millis(10));
571+
}
572+
Ok(self.last_emitted)
573+
}
574+
575+
#[inline(always)]
576+
fn should_stop(&self) -> bool {
577+
self.current_job.load(Ordering::SeqCst) != self.job_id
578+
}
579+
}
580+
523581
// From hex crate
524582
pub(crate) fn hex_to_bytes(mut hex_data: Vec<u8>) -> Result<Vec<u8>, FromHexError> {
525583
let len = hex_data.len() / 2;
@@ -670,3 +728,56 @@ impl BlockSource for BitcoinBlockSource {
670728
.send_json_blocking(&self.client, &self.rpc.get_block_count())?)
671729
}
672730
}
731+
732+
#[cfg(test)]
733+
mod test {
734+
use protocol::{
735+
bitcoin,
736+
bitcoin::{BlockHash, Network},
737+
constants::ChainAnchor,
738+
};
739+
740+
use crate::source::{BitcoinRpc, BitcoinRpcAuth, BlockEvent, BlockFetcher};
741+
742+
#[test]
743+
fn test_fetcher() -> anyhow::Result<()> {
744+
let rpc = BitcoinRpc::new(
745+
"http://127.0.0.1:18332",
746+
BitcoinRpcAuth::UserPass("test".to_string(), "test".to_string()),
747+
);
748+
749+
let client = reqwest::blocking::Client::new();
750+
let count: u32 = rpc.send_json_blocking(&client, &rpc.get_block_count())?;
751+
752+
let start_block = count - 1;
753+
let start_block_hash: BlockHash =
754+
rpc.send_json_blocking(&client, &rpc.get_block_hash(start_block))?;
755+
756+
let (fetcher, receiver) = BlockFetcher::new(rpc, client);
757+
758+
println!("fetcher starting from block {}", start_block);
759+
760+
fetcher.start(ChainAnchor {
761+
hash: start_block_hash,
762+
height: start_block,
763+
});
764+
765+
println!("fetcher receiving blocks");
766+
while let Ok(event) = receiver.recv() {
767+
match event {
768+
BlockEvent::Block(id, _) => {
769+
println!("got block {}", id.height);
770+
if id.height >= count {
771+
break;
772+
}
773+
}
774+
BlockEvent::Error(e) => {
775+
println!("error: {}", e.to_string());
776+
break;
777+
}
778+
}
779+
}
780+
781+
Ok(())
782+
}
783+
}

0 commit comments

Comments
 (0)