Skip to content

Commit 9dad04f

Browse files
committed
Add --jobs parameter to customize concurrency
Enables users to tradeoff cpu/memory usage for performance Made `out_of_order` blocks bounded by restricting the number of workers being spawned
1 parent 0f8de96 commit 9dad04f

File tree

5 files changed

+29
-16
lines changed

5 files changed

+29
-16
lines changed

node/src/bin/spaced.rs

+1
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ impl Composer {
6060
spaced.chain.state.clone(),
6161
rx,
6262
self.shutdown.clone(),
63+
spaced.num_workers,
6364
);
6465

6566
self.services.spawn(async move {

node/src/config.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ pub struct Args {
4747
/// Network to use
4848
#[arg(long, env = "SPACED_CHAIN")]
4949
chain: ExtendedNetwork,
50+
/// Number of concurrent workers allowed during syncing
51+
#[arg(short, long, env = "SPACED_JOBS", default_value = "8")]
52+
jobs: u8,
5053
/// Bitcoin RPC URL
5154
#[arg(long, env = "SPACED_BITCOIN_RPC_URL")]
5255
bitcoin_rpc_url: Option<String>,
@@ -68,7 +71,6 @@ pub struct Args {
6871
#[arg(long, help_heading = Some(RPC_OPTIONS), default_values = ["127.0.0.1", "::1"], env = "SPACED_RPC_BIND")]
6972
rpc_bind: Vec<String>,
7073
/// Listen for JSON-RPC connections on <port>
71-
/// (default: 22220, testnet: 22221, signet: 22224, regtest: 22226)
7274
#[arg(long, help_heading = Some(RPC_OPTIONS), env = "SPACED_RPC_PORT")]
7375
rpc_port: Option<u16>,
7476
}
@@ -183,6 +185,7 @@ impl Args {
183185
bind: rpc_bind_addresses,
184186
chain,
185187
block_index,
188+
num_workers: args.jobs as usize,
186189
})
187190
}
188191

node/src/source.rs

+16-12
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ pub struct BlockFetcher {
3838
rpc: Arc<BitcoinRpc>,
3939
job_id: Arc<AtomicUsize>,
4040
sender: std::sync::mpsc::SyncSender<BlockEvent>,
41+
num_workers: usize,
4142
}
4243

4344
pub enum BlockEvent {
@@ -304,6 +305,7 @@ impl BlockFetcher {
304305
pub fn new(
305306
rpc: BitcoinRpc,
306307
client: reqwest::blocking::Client,
308+
num_workers: usize,
307309
) -> (Self, std::sync::mpsc::Receiver<BlockEvent>) {
308310
let (tx, rx) = std::sync::mpsc::sync_channel(12);
309311
(
@@ -312,6 +314,7 @@ impl BlockFetcher {
312314
rpc: Arc::new(rpc),
313315
job_id: Arc::new(AtomicUsize::new(0)),
314316
sender: tx,
317+
num_workers,
315318
},
316319
rx,
317320
)
@@ -329,6 +332,7 @@ impl BlockFetcher {
329332
let task_rpc = self.rpc.clone();
330333
let current_task = self.job_id.clone();
331334
let task_sender = self.sender.clone();
335+
let num_workers = self.num_workers;
332336

333337
_ = std::thread::spawn(move || {
334338
let mut last_check = Instant::now() - Duration::from_secs(2);
@@ -354,16 +358,14 @@ impl BlockFetcher {
354358
};
355359

356360
if tip > start_block.height {
357-
let concurrency = std::cmp::min(tip - start_block.height, 8);
358-
359361
let res = Self::run_workers(
360362
job_id,
361363
current_task.clone(),
362364
task_rpc.clone(),
363365
task_sender.clone(),
364366
start_block,
365367
tip,
366-
concurrency as usize,
368+
num_workers,
367369
);
368370

369371
match res {
@@ -388,7 +390,7 @@ impl BlockFetcher {
388390
sender: std::sync::mpsc::SyncSender<BlockEvent>,
389391
start_block: ChainAnchor,
390392
end_height: u32,
391-
concurrency: usize,
393+
num_workers: usize,
392394
) -> Result<ChainAnchor, BlockFetchError> {
393395
let mut workers = Workers {
394396
current_job,
@@ -399,8 +401,8 @@ impl BlockFetcher {
399401
end_height,
400402
ordered_sender: sender,
401403
rpc,
402-
concurrency,
403-
pool: ThreadPool::new(concurrency),
404+
num_workers,
405+
pool: ThreadPool::new(num_workers),
404406
};
405407

406408
workers.run()
@@ -464,7 +466,7 @@ struct Workers {
464466
end_height: u32,
465467
ordered_sender: std::sync::mpsc::SyncSender<BlockEvent>,
466468
rpc: Arc<BitcoinRpc>,
467-
concurrency: usize,
469+
num_workers: usize,
468470
pool: ThreadPool,
469471
}
470472

@@ -508,7 +510,9 @@ impl Workers {
508510

509511
#[inline(always)]
510512
fn can_add_workers(&self) -> bool {
511-
self.pool.queued_count() < self.concurrency && !self.queued_all()
513+
self.out_of_order.len() < self.num_workers
514+
&& self.pool.queued_count() < self.num_workers
515+
&& !self.queued_all()
512516
}
513517

514518
#[inline(always)]
@@ -518,7 +522,7 @@ impl Workers {
518522

519523
fn run(&mut self) -> Result<ChainAnchor, BlockFetchError> {
520524
let client = reqwest::blocking::Client::new();
521-
let (tx, rx) = std::sync::mpsc::sync_channel(self.concurrency);
525+
let (tx, rx) = std::sync::mpsc::sync_channel(self.num_workers);
522526

523527
'queue_blocks: while !self.queued_all() {
524528
if self.should_stop() {
@@ -559,15 +563,15 @@ impl Workers {
559563
}
560564
}
561565

562-
std::thread::sleep(Duration::from_millis(10));
566+
std::thread::sleep(Duration::from_millis(1));
563567
}
564568

565569
// Wait for all blocks to be processed
566570
while self.pool.active_count() > 0 || self.last_emitted.height != self.end_height {
567571
while self.try_emit_next_block(&rx)? {
568572
// do nothing
569573
}
570-
std::thread::sleep(Duration::from_millis(10));
574+
std::thread::sleep(Duration::from_millis(1));
571575
}
572576
Ok(self.last_emitted)
573577
}
@@ -753,7 +757,7 @@ mod test {
753757
let start_block_hash: BlockHash =
754758
rpc.send_json_blocking(&client, &rpc.get_block_hash(start_block))?;
755759

756-
let (fetcher, receiver) = BlockFetcher::new(rpc, client);
760+
let (fetcher, receiver) = BlockFetcher::new(rpc, client, 8);
757761

758762
println!("fetcher starting from block {}", start_block);
759763

node/src/sync.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ pub struct Spaced {
3636
pub rpc: BitcoinRpc,
3737
pub data_dir: PathBuf,
3838
pub bind: Vec<SocketAddr>,
39+
pub num_workers: usize,
3940
}
4041

4142
impl Spaced {
@@ -155,7 +156,7 @@ impl Spaced {
155156

156157
let rpc = source.rpc.clone();
157158
let client = reqwest::blocking::Client::new();
158-
let (fetcher, receiver) = BlockFetcher::new(rpc.clone(), client.clone());
159+
let (fetcher, receiver) = BlockFetcher::new(rpc.clone(), client.clone(), self.num_workers);
159160
fetcher.start(start_block);
160161

161162
let mut shutdown_signal = shutdown.subscribe();

node/src/wallets.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -282,8 +282,10 @@ impl RpcWallet {
282282
mut wallet: SpacesWallet,
283283
mut commands: Receiver<WalletCommand>,
284284
mut shutdown: broadcast::Receiver<()>,
285+
num_workers: usize,
285286
) -> anyhow::Result<()> {
286-
let (fetcher, receiver) = BlockFetcher::new(source.rpc.clone(), source.client.clone());
287+
let (fetcher, receiver) =
288+
BlockFetcher::new(source.rpc.clone(), source.client.clone(), num_workers);
287289

288290
let mut wallet_tip = {
289291
let tip = wallet.coins.local_chain().tip();
@@ -706,6 +708,7 @@ impl RpcWallet {
706708
store: LiveSnapshot,
707709
mut channel: Receiver<LoadedWallet>,
708710
shutdown: broadcast::Sender<()>,
711+
num_workers: usize,
709712
) -> anyhow::Result<()> {
710713
let mut shutdown_signal = shutdown.subscribe();
711714
let mut wallet_results = FuturesUnordered::new();
@@ -734,7 +737,8 @@ impl RpcWallet {
734737
wallet_chain,
735738
loaded.wallet,
736739
loaded.rx,
737-
wallet_shutdown
740+
wallet_shutdown,
741+
num_workers
738742
));
739743
});
740744
wallet_results.push(named_future(wallet_name, rx));

0 commit comments

Comments
 (0)