Skip to content

Commit 2bd559b

Browse files
committed
Implement sync_onchain_wallet for ChainSource::Electrum
1 parent 6ac8b91 commit 2bd559b

File tree

3 files changed

+169
-3
lines changed

3 files changed

+169
-3
lines changed

src/chain/electrum.rs

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
// accordance with one or both of these licenses.
77

88
use crate::config::{
9-
Config, FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS, LDK_WALLET_SYNC_TIMEOUT_SECS,
10-
TX_BROADCAST_TIMEOUT_SECS,
9+
Config, BDK_CLIENT_STOP_GAP, BDK_WALLET_SYNC_TIMEOUT_SECS, FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS,
10+
LDK_WALLET_SYNC_TIMEOUT_SECS, TX_BROADCAST_TIMEOUT_SECS,
1111
};
1212
use crate::error::Error;
1313
use crate::fee_estimator::{
@@ -20,6 +20,12 @@ use lightning::chain::{Confirm, Filter, WatchedOutput};
2020
use lightning::util::ser::Writeable;
2121
use lightning_transaction_sync::ElectrumSyncClient;
2222

23+
use bdk_chain::bdk_core::spk_client::FullScanRequest as BdkFullScanRequest;
24+
use bdk_chain::bdk_core::spk_client::FullScanResponse as BdkFullScanResponse;
25+
use bdk_chain::bdk_core::spk_client::SyncRequest as BdkSyncRequest;
26+
use bdk_chain::bdk_core::spk_client::SyncResponse as BdkSyncResponse;
27+
use bdk_wallet::KeychainKind as BdkKeyChainKind;
28+
2329
use bdk_electrum::BdkElectrumClient;
2430

2531
use electrum_client::Client as ElectrumClient;
@@ -32,6 +38,7 @@ use std::collections::HashMap;
3238
use std::sync::Arc;
3339
use std::time::{Duration, Instant};
3440

41+
const BDK_ELECTRUM_CLIENT_BATCH_SIZE: usize = 5;
3542
const ELECTRUM_CLIENT_NUM_RETRIES: u8 = 3;
3643
const ELECTRUM_CLIENT_TIMEOUT_SECS: u8 = 20;
3744

@@ -109,6 +116,69 @@ impl ElectrumRuntimeClient {
109116
Ok(res)
110117
}
111118

119+
pub(crate) async fn get_full_scan_wallet_update(
120+
&self, request: BdkFullScanRequest<BdkKeyChainKind>,
121+
cached_txs: impl IntoIterator<Item = impl Into<Arc<Transaction>>>,
122+
) -> Result<BdkFullScanResponse<BdkKeyChainKind>, Error> {
123+
let bdk_electrum_client = Arc::clone(&self.bdk_electrum_client);
124+
bdk_electrum_client.populate_tx_cache(cached_txs);
125+
126+
let spawn_fut = self.runtime.spawn_blocking(move || {
127+
bdk_electrum_client.full_scan(
128+
request,
129+
BDK_CLIENT_STOP_GAP,
130+
BDK_ELECTRUM_CLIENT_BATCH_SIZE,
131+
true,
132+
)
133+
});
134+
let wallet_sync_timeout_fut =
135+
tokio::time::timeout(Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), spawn_fut);
136+
137+
wallet_sync_timeout_fut
138+
.await
139+
.map_err(|e| {
140+
log_error!(self.logger, "Sync of on-chain wallet timed out: {}", e);
141+
Error::WalletOperationTimeout
142+
})?
143+
.map_err(|e| {
144+
log_error!(self.logger, "Sync of on-chain wallet failed: {}", e);
145+
Error::WalletOperationFailed
146+
})?
147+
.map_err(|e| {
148+
log_error!(self.logger, "Sync of on-chain wallet failed: {}", e);
149+
Error::WalletOperationFailed
150+
})
151+
}
152+
153+
pub(crate) async fn get_incremental_sync_wallet_update(
154+
&self, request: BdkSyncRequest<(BdkKeyChainKind, u32)>,
155+
cached_txs: impl IntoIterator<Item = impl Into<Arc<Transaction>>>,
156+
) -> Result<BdkSyncResponse, Error> {
157+
let bdk_electrum_client = Arc::clone(&self.bdk_electrum_client);
158+
bdk_electrum_client.populate_tx_cache(cached_txs);
159+
160+
let spawn_fut = self.runtime.spawn_blocking(move || {
161+
bdk_electrum_client.sync(request, BDK_ELECTRUM_CLIENT_BATCH_SIZE, true)
162+
});
163+
let wallet_sync_timeout_fut =
164+
tokio::time::timeout(Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), spawn_fut);
165+
166+
wallet_sync_timeout_fut
167+
.await
168+
.map_err(|e| {
169+
log_error!(self.logger, "Incremental sync of on-chain wallet timed out: {}", e);
170+
Error::WalletOperationTimeout
171+
})?
172+
.map_err(|e| {
173+
log_error!(self.logger, "Incremental sync of on-chain wallet failed: {}", e);
174+
Error::WalletOperationFailed
175+
})?
176+
.map_err(|e| {
177+
log_error!(self.logger, "Incremental sync of on-chain wallet failed: {}", e);
178+
Error::WalletOperationFailed
179+
})
180+
}
181+
112182
pub(crate) async fn broadcast(&self, tx: Transaction) {
113183
let electrum_client = Arc::clone(&self.electrum_client);
114184

src/chain/mod.rs

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use lightning_block_sync::poll::{ChainPoller, ChainTip, ValidatedBlockHeader};
3939
use lightning_block_sync::SpvClient;
4040

4141
use bdk_esplora::EsploraAsyncExt;
42+
use bdk_wallet::Update as BdkUpdate;
4243

4344
use esplora_client::AsyncClient as EsploraAsyncClient;
4445

@@ -717,7 +718,98 @@ impl ChainSource {
717718

718719
res
719720
},
720-
Self::Electrum { .. } => todo!(),
721+
Self::Electrum {
722+
electrum_runtime_status,
723+
onchain_wallet,
724+
onchain_wallet_sync_status,
725+
kv_store,
726+
logger,
727+
node_metrics,
728+
..
729+
} => {
730+
let electrum_client: Arc<ElectrumRuntimeClient> = if let Some(client) =
731+
electrum_runtime_status.read().unwrap().client().as_ref()
732+
{
733+
Arc::clone(client)
734+
} else {
735+
debug_assert!(
736+
false,
737+
"We should have started the chain source before syncing the onchain wallet"
738+
);
739+
return Err(Error::FeerateEstimationUpdateFailed);
740+
};
741+
let receiver_res = {
742+
let mut status_lock = onchain_wallet_sync_status.lock().unwrap();
743+
status_lock.register_or_subscribe_pending_sync()
744+
};
745+
if let Some(mut sync_receiver) = receiver_res {
746+
log_info!(logger, "Sync in progress, skipping.");
747+
return sync_receiver.recv().await.map_err(|e| {
748+
debug_assert!(false, "Failed to receive wallet sync result: {:?}", e);
749+
log_error!(logger, "Failed to receive wallet sync result: {:?}", e);
750+
Error::WalletOperationFailed
751+
})?;
752+
}
753+
754+
// If this is our first sync, do a full scan with the configured gap limit.
755+
// Otherwise just do an incremental sync.
756+
let incremental_sync =
757+
node_metrics.read().unwrap().latest_onchain_wallet_sync_timestamp.is_some();
758+
759+
let apply_wallet_update =
760+
|update_res: Result<BdkUpdate, Error>, now: Instant| match update_res {
761+
Ok(update) => match onchain_wallet.apply_update(update) {
762+
Ok(()) => {
763+
log_info!(
764+
logger,
765+
"{} of on-chain wallet finished in {}ms.",
766+
if incremental_sync { "Incremental sync" } else { "Sync" },
767+
now.elapsed().as_millis()
768+
);
769+
let unix_time_secs_opt = SystemTime::now()
770+
.duration_since(UNIX_EPOCH)
771+
.ok()
772+
.map(|d| d.as_secs());
773+
{
774+
let mut locked_node_metrics = node_metrics.write().unwrap();
775+
locked_node_metrics.latest_onchain_wallet_sync_timestamp =
776+
unix_time_secs_opt;
777+
write_node_metrics(
778+
&*locked_node_metrics,
779+
Arc::clone(&kv_store),
780+
Arc::clone(&logger),
781+
)?;
782+
}
783+
Ok(())
784+
},
785+
Err(e) => Err(e),
786+
},
787+
Err(e) => Err(e),
788+
};
789+
790+
let cached_txs = onchain_wallet.get_cached_txs();
791+
792+
let res = if incremental_sync {
793+
let incremental_sync_request = onchain_wallet.get_incremental_sync_request();
794+
let incremental_sync_fut = electrum_client
795+
.get_incremental_sync_wallet_update(incremental_sync_request, cached_txs);
796+
797+
let now = Instant::now();
798+
let update_res = incremental_sync_fut.await.map(|u| u.into());
799+
apply_wallet_update(update_res, now)
800+
} else {
801+
let full_scan_request = onchain_wallet.get_full_scan_request();
802+
let full_scan_fut =
803+
electrum_client.get_full_scan_wallet_update(full_scan_request, cached_txs);
804+
let now = Instant::now();
805+
let update_res = full_scan_fut.await.map(|u| u.into());
806+
apply_wallet_update(update_res, now)
807+
};
808+
809+
onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res);
810+
811+
res
812+
},
721813
Self::BitcoindRpc { .. } => {
722814
// In BitcoindRpc mode we sync lightning and onchain wallet in one go by via
723815
// `ChainPoller`. So nothing to do here.

src/wallet/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,10 @@ where
9898
self.inner.lock().unwrap().start_sync_with_revealed_spks().build()
9999
}
100100

101+
pub(crate) fn get_cached_txs(&self) -> Vec<Arc<Transaction>> {
102+
self.inner.lock().unwrap().tx_graph().full_txs().map(|tx_node| tx_node.tx).collect()
103+
}
104+
101105
pub(crate) fn current_best_block(&self) -> BestBlock {
102106
let checkpoint = self.inner.lock().unwrap().latest_checkpoint();
103107
BestBlock { block_hash: checkpoint.hash(), height: checkpoint.height() }

0 commit comments

Comments
 (0)