Skip to content

Commit 76708c2

Browse files
committed
Refactor sync checks
1 parent 697c180 commit 76708c2

File tree

7 files changed

+136
-36
lines changed

7 files changed

+136
-36
lines changed

client/src/client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use spaces_protocol::{
1414
validate::{TxChangeSet, UpdateKind, Validator},
1515
Bytes, Covenant, FullSpaceOut, RevokeReason, SpaceOut,
1616
};
17-
use spaces_wallet::bitcoin::Transaction;
17+
use spaces_wallet::bitcoin::{Network, Transaction};
1818

1919
use crate::{
2020
source::BitcoinRpcError,
@@ -27,7 +27,7 @@ pub trait BlockSource {
2727
fn get_median_time(&self) -> Result<u64, BitcoinRpcError>;
2828
fn in_mempool(&self, txid: &Txid, height: u32) -> Result<bool, BitcoinRpcError>;
2929
fn get_block_count(&self) -> Result<u64, BitcoinRpcError>;
30-
fn get_best_chain(&self) -> Result<ChainAnchor, BitcoinRpcError>;
30+
fn get_best_chain(&self, tip: Option<u32>, expected_chain: Network) -> Result<Option<ChainAnchor>, BitcoinRpcError>;
3131
}
3232

3333
#[derive(Debug, Clone)]

client/src/config.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ pub struct Args {
7979
/// Skip maintaining historical root anchors
8080
#[arg(long, env = "SPACED_SKIP_ANCHORS", default_value = "false")]
8181
skip_anchors: bool,
82+
/// The specified Bitcoin RPC is a light client
83+
#[arg(long, env = "SPACED_BITCOIN_RPC_LIGHT", default_value = "false")]
84+
bitcoin_rpc_light: bool,
8285
}
8386

8487
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug, ValueEnum, Serialize, Deserialize)]
@@ -164,6 +167,7 @@ impl Args {
164167
let rpc = BitcoinRpc::new(
165168
&args.bitcoin_rpc_url.expect("bitcoin rpc url"),
166169
bitcoin_rpc_auth,
170+
!args.bitcoin_rpc_light
167171
);
168172

169173
let genesis = Spaced::genesis(&rpc, args.chain).await?;

client/src/rpc.rs

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,9 @@ pub(crate) type Responder<T> = oneshot::Sender<T>;
6666

6767
#[derive(Debug, Clone, Serialize, Deserialize)]
6868
pub struct ServerInfo {
69-
pub chain: ExtendedNetwork,
69+
pub chain: String,
7070
pub tip: ChainAnchor,
71+
pub progress: Option<f32>,
7172
}
7273

7374
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -99,8 +100,8 @@ pub enum ChainStateCommand {
99100
txs: Vec<String>,
100101
resp: Responder<anyhow::Result<Vec<Option<TxChangeSet>>>>,
101102
},
102-
GetTip {
103-
resp: Responder<anyhow::Result<ChainAnchor>>,
103+
GetServerInfo {
104+
resp: Responder<anyhow::Result<ServerInfo>>,
104105
},
105106
GetSpace {
106107
hash: SpaceKey,
@@ -723,13 +724,12 @@ impl RpcServerImpl {
723724
#[async_trait]
724725
impl RpcServer for RpcServerImpl {
725726
async fn get_server_info(&self) -> Result<ServerInfo, ErrorObjectOwned> {
726-
let chain = self.wallet_manager.network;
727-
let tip = self
727+
let info = self
728728
.store
729-
.get_tip()
729+
.get_server_info()
730730
.await
731731
.map_err(|error| ErrorObjectOwned::owned(-1, error.to_string(), None::<String>))?;
732-
Ok(ServerInfo { chain, tip })
732+
Ok(info)
733733
}
734734

735735
async fn get_space(
@@ -1092,6 +1092,7 @@ impl AsyncChainState {
10921092
.find(|tx| &tx.changeset.txid == txid))
10931093
}
10941094

1095+
10951096
async fn get_indexed_block(
10961097
index: &mut Option<LiveSnapshot>,
10971098
height_or_hash: HeightOrHash,
@@ -1173,9 +1174,9 @@ impl AsyncChainState {
11731174
let result = emulator.apply_package(tip.height + 1, txs);
11741175
let _ = resp.send(result);
11751176
}
1176-
ChainStateCommand::GetTip { resp } => {
1177+
ChainStateCommand::GetServerInfo { resp } => {
11771178
let tip = chain_state.tip.read().expect("read meta").clone();
1178-
_ = resp.send(Ok(tip))
1179+
_ = resp.send(get_server_info(client, rpc, tip).await)
11791180
}
11801181
ChainStateCommand::GetSpace { hash, resp } => {
11811182
let result = chain_state.get_space_info(&hash);
@@ -1498,9 +1499,9 @@ impl AsyncChainState {
14981499
resp_rx.await?
14991500
}
15001501

1501-
pub async fn get_tip(&self) -> anyhow::Result<ChainAnchor> {
1502+
pub async fn get_server_info(&self) -> anyhow::Result<ServerInfo> {
15021503
let (resp, resp_rx) = oneshot::channel();
1503-
self.sender.send(ChainStateCommand::GetTip { resp }).await?;
1504+
self.sender.send(ChainStateCommand::GetServerInfo { resp }).await?;
15041505
resp_rx.await?
15051506
}
15061507

@@ -1561,3 +1562,27 @@ fn get_space_key(space_or_hash: &str) -> Result<SpaceKey, ErrorObjectOwned> {
15611562

15621563
Ok(SpaceKey::from(hash))
15631564
}
1565+
1566+
1567+
async fn get_server_info(client: &reqwest::Client, rpc: &BitcoinRpc, tip: ChainAnchor) -> anyhow::Result<ServerInfo> {
1568+
#[derive(Deserialize)]
1569+
struct Info {
1570+
pub chain: String,
1571+
pub headers: u32,
1572+
}
1573+
1574+
let info: Info = rpc
1575+
.send_json(client, &rpc.get_blockchain_info())
1576+
.await
1577+
.map_err(|e| anyhow!("Could not retrieve blockchain info ({})", e))?;
1578+
1579+
Ok(ServerInfo {
1580+
chain: info.chain,
1581+
tip,
1582+
progress: if info.headers >= tip.height {
1583+
Some(tip.height as f32 / info.headers as f32)
1584+
} else {
1585+
None
1586+
}
1587+
})
1588+
}

client/src/source.rs

Lines changed: 57 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,15 @@ use std::{
1212
use base64::Engine;
1313
use bitcoin::{Block, BlockHash, Txid};
1414
use hex::FromHexError;
15-
use log::error;
15+
use log::{error, warn};
1616
use reqwest::StatusCode;
1717
use serde::{de::DeserializeOwned, Deserialize, Serialize};
1818
use serde_json::Value;
1919
use spaces_protocol::constants::ChainAnchor;
2020
use spaces_wallet::{bitcoin, bitcoin::Transaction};
2121
use threadpool::ThreadPool;
2222
use tokio::time::Instant;
23-
23+
use spaces_protocol::bitcoin::Network;
2424
use crate::{client::BlockSource, std_wait};
2525

2626
const BITCOIN_RPC_IN_WARMUP: i32 = -28; // Client still warming up
@@ -34,9 +34,11 @@ pub struct BitcoinRpc {
3434
id: Arc<AtomicU64>,
3535
auth_token: Option<String>,
3636
url: String,
37+
legacy: bool
3738
}
3839

3940
pub struct BlockFetcher {
41+
chain: Network,
4042
src: BitcoinBlockSource,
4143
job_id: Arc<AtomicUsize>,
4244
sender: std::sync::mpsc::SyncSender<BlockEvent>,
@@ -121,18 +123,23 @@ trait ErrorForRpcBlocking {
121123
}
122124

123125
impl BitcoinRpc {
124-
pub fn new(url: &str, auth: BitcoinRpcAuth) -> Self {
126+
pub fn new(url: &str, auth: BitcoinRpcAuth, legacy: bool) -> Self {
125127
Self {
126128
id: Default::default(),
127129
auth_token: auth.to_token(),
128130
url: url.to_string(),
131+
legacy,
129132
}
130133
}
131134

132-
pub fn make_request(&self, method: &str, params: serde_json::Value) -> BitcoinRpcRequest {
135+
pub fn make_request(&self, method: &str, params: Value) -> BitcoinRpcRequest {
133136
let id = self.id.fetch_add(1, Ordering::Relaxed);
134137
let body = serde_json::json!({
135-
"jsonrpc": "1.0",
138+
"jsonrpc": if self.legacy {
139+
"1.0"
140+
} else {
141+
"2.0"
142+
},
136143
"id": id.to_string(),
137144
"method": method,
138145
"params": params,
@@ -381,12 +388,14 @@ impl BitcoinRpcAuth {
381388

382389
impl BlockFetcher {
383390
pub fn new(
391+
chain: Network,
384392
src: BitcoinBlockSource,
385393
num_workers: usize,
386394
) -> (Self, std::sync::mpsc::Receiver<BlockEvent>) {
387395
let (tx, rx) = std::sync::mpsc::sync_channel(12);
388396
(
389397
Self {
398+
chain,
390399
src,
391400
job_id: Arc::new(AtomicUsize::new(0)),
392401
sender: tx,
@@ -401,10 +410,15 @@ impl BlockFetcher {
401410
}
402411

403412
fn should_sync(
413+
expected_chain: Network,
404414
source: &BitcoinBlockSource,
405415
start: ChainAnchor,
406416
) -> Result<Option<ChainAnchor>, BlockFetchError> {
407-
let tip = source.get_best_chain()?;
417+
let tip = match source.get_best_chain(Some(start.height), expected_chain)? {
418+
Some(tip) => tip,
419+
None => return Ok(None),
420+
};
421+
408422
if start.height > tip.height {
409423
return Err(BlockFetchError::BlockMismatch);
410424
}
@@ -437,6 +451,7 @@ impl BlockFetcher {
437451
let current_task = self.job_id.clone();
438452
let task_sender = self.sender.clone();
439453
let num_workers = self.num_workers;
454+
let chain = self.chain;
440455

441456
_ = std::thread::spawn(move || {
442457
let mut last_check = Instant::now() - Duration::from_secs(2);
@@ -451,7 +466,7 @@ impl BlockFetcher {
451466
}
452467
last_check = Instant::now();
453468

454-
let tip = match BlockFetcher::should_sync(&task_src, checkpoint) {
469+
let tip = match BlockFetcher::should_sync(chain, &task_src, checkpoint) {
455470
Ok(t) => t,
456471
Err(e) => {
457472
_ = task_sender.send(BlockEvent::Error(e));
@@ -872,21 +887,48 @@ impl BlockSource for BitcoinBlockSource {
872887
.send_json_blocking(&self.client, &self.rpc.get_block_count())?)
873888
}
874889

875-
fn get_best_chain(&self) -> Result<ChainAnchor, BitcoinRpcError> {
890+
fn get_best_chain(&self, tip: Option<u32>, expected_chain: Network) -> Result<Option<ChainAnchor>, BitcoinRpcError> {
876891
#[derive(Deserialize)]
877892
struct Info {
878-
#[serde(rename = "blocks")]
879-
height: u64,
893+
pub chain: String,
894+
pub blocks: u32,
895+
pub headers: u32,
880896
#[serde(rename = "bestblockhash")]
881-
hash: BlockHash,
897+
pub best_block_hash: BlockHash,
882898
}
883899
let info: Info = self
884900
.rpc
885901
.send_json_blocking(&self.client, &self.rpc.get_blockchain_info())?;
886902

887-
Ok(ChainAnchor {
888-
hash: info.hash,
889-
height: info.height as _,
890-
})
903+
let expected_chain = match expected_chain {
904+
Network::Bitcoin => "main",
905+
Network::Regtest => "regtest",
906+
_ => "test"
907+
};
908+
if info.chain != expected_chain {
909+
warn!("Invalid chain from connected rpc node - expected {}, got {}", expected_chain, info.chain);
910+
return Ok(None);
911+
}
912+
913+
let synced = info.headers == info.blocks;
914+
let best_chain = if !synced {
915+
let block_hash = self.get_block_hash(info.blocks)?;
916+
ChainAnchor {
917+
hash: block_hash,
918+
height: info.blocks,
919+
}
920+
} else {
921+
ChainAnchor {
922+
hash: info.best_block_hash,
923+
height: info.headers,
924+
}
925+
};
926+
927+
// If the source is still syncing, and we have a higher tip, wait.
928+
if !synced && tip.is_some_and(|tip| tip > info.blocks) {
929+
return Ok(None);
930+
}
931+
932+
Ok(Some(best_chain))
891933
}
892934
}

client/src/sync.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,11 @@ impl Spaced {
189189
start_block.hash, start_block.height
190190
);
191191

192-
let (fetcher, receiver) = BlockFetcher::new(source.clone(), self.num_workers);
192+
let (fetcher, receiver) = BlockFetcher::new(
193+
self.network.fallback_network(),
194+
source.clone(),
195+
self.num_workers,
196+
);
193197
fetcher.start(start_block);
194198

195199
let mut shutdown_signal = shutdown.subscribe();

client/src/wallets.rs

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,20 @@ impl RpcWallet {
357357
synced: bool,
358358
) -> anyhow::Result<()> {
359359
match command {
360-
WalletCommand::GetInfo { resp } => _ = resp.send(Ok(wallet.get_info())),
360+
WalletCommand::GetInfo { resp } =>{
361+
let mut wallet_info = wallet.get_info();
362+
let best_chain = source
363+
.get_best_chain(Some(wallet_info.tip), wallet.config.network);
364+
if let Ok(Some(best_chain)) = best_chain {
365+
wallet_info.progress = if best_chain.height >= wallet_info.tip {
366+
Some(wallet_info.tip as f32 / best_chain.height as f32)
367+
} else {
368+
None
369+
}
370+
}
371+
372+
_ = resp.send(Ok(wallet_info))
373+
},
361374
WalletCommand::BatchTx { request, resp } => {
362375
if !synced && !request.force {
363376
_ = resp.send(Err(anyhow::anyhow!("Wallet is syncing")));
@@ -462,14 +475,17 @@ impl RpcWallet {
462475
protocol: &mut LiveSnapshot,
463476
wallet: &SpacesWallet,
464477
) -> Option<ChainAnchor> {
465-
let bitcoin_tip = match bitcoin.get_best_chain() {
466-
Ok(tip) => tip,
478+
let wallet_tip = wallet.local_chain().tip();
479+
480+
let bitcoin_tip = match bitcoin.get_best_chain(Some(wallet_tip.height()), wallet.config.network) {
481+
Ok(Some(tip)) => tip,
482+
Ok(None) => return None,
467483
Err(e) => {
468484
warn!("Sync check failed: {}", e);
469485
return None;
470486
}
471487
};
472-
let wallet_tip = wallet.local_chain().tip();
488+
473489
let protocol_tip = match protocol.tip.read() {
474490
Ok(tip) => tip.clone(),
475491
Err(e) => {
@@ -493,7 +509,9 @@ impl RpcWallet {
493509
shutdown: broadcast::Sender<()>,
494510
num_workers: usize,
495511
) -> anyhow::Result<()> {
496-
let (fetcher, receiver) = BlockFetcher::new(source.clone(), num_workers);
512+
let (fetcher, receiver) = BlockFetcher::new(network.fallback_network(),
513+
source.clone(),
514+
num_workers);
497515

498516
let mut wallet_tip = {
499517
let tip = wallet.local_chain().tip();
@@ -547,8 +565,13 @@ impl RpcWallet {
547565
}
548566
BlockEvent::Error(e) if matches!(e, BlockFetchError::BlockMismatch) => {
549567
let mut checkpoint_in_chain = None;
550-
let best_chain = match source.get_best_chain() {
551-
Ok(best) => best,
568+
let best_chain = match source.get_best_chain(Some(wallet_tip.height), network.fallback_network()) {
569+
Ok(Some(best)) => best,
570+
Ok(None) => {
571+
warn!("Waiting for source to sync");
572+
fetcher.restart(wallet_tip, &receiver);
573+
continue;
574+
}
552575
Err(error) => {
553576
warn!("Wallet error: {}", error);
554577
fetcher.restart(wallet_tip, &receiver);

wallet/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ pub struct WalletInfo {
104104
pub start_block: u32,
105105
pub tip: u32,
106106
pub descriptors: Vec<DescriptorInfo>,
107+
pub progress: Option<f32>,
107108
}
108109

109110
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -589,6 +590,7 @@ impl SpacesWallet {
589590
start_block: self.config.start_block,
590591
tip: self.internal.local_chain().tip().height(),
591592
descriptors,
593+
progress: None,
592594
}
593595
}
594596

0 commit comments

Comments
 (0)