Skip to content

Commit 7161ce4

Browse files
committed
f Update GossipSource task to use select
1 parent 7102ff4 commit 7161ce4

File tree

3 files changed

+51
-42
lines changed

3 files changed

+51
-42
lines changed

src/io/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ pub(crate) const PEER_INFO_PERSISTENCE_KEY: &str = "peers";
2727
pub(crate) const PAYMENT_INFO_PERSISTENCE_NAMESPACE: &str = "payments";
2828

2929
/// RapidGossipSync's `latest_sync_timestamp` will be persisted under this key.
30-
pub(crate) const RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE: &str = "";
31-
pub(crate) const RGS_LATEST_SYNC_TIMESTAMP_KEY: &str = "rgs_latest_sync_timestamp";
30+
pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE: &str = "";
31+
pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_KEY: &str = "latest_rgs_sync_timestamp";
3232

3333
/// Provides an interface that allows to store and retrieve persisted values that are associated
3434
/// with given keys.

src/io/utils.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -174,12 +174,12 @@ where
174174
Ok(res)
175175
}
176176

177-
pub(crate) fn read_rgs_latest_sync_timestamp<K: Deref>(kv_store: K) -> Result<u32, std::io::Error>
177+
pub(crate) fn read_latest_rgs_sync_timestamp<K: Deref>(kv_store: K) -> Result<u32, std::io::Error>
178178
where
179179
K::Target: KVStore,
180180
{
181181
let mut reader =
182-
kv_store.read(RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE, RGS_LATEST_SYNC_TIMESTAMP_KEY)?;
182+
kv_store.read(LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE, LATEST_RGS_SYNC_TIMESTAMP_KEY)?;
183183
u32::read(&mut reader).map_err(|_| {
184184
std::io::Error::new(
185185
std::io::ErrorKind::InvalidData,
@@ -188,21 +188,21 @@ where
188188
})
189189
}
190190

191-
pub(crate) fn write_rgs_latest_sync_timestamp<K: Deref, L: Deref>(
191+
pub(crate) fn write_latest_rgs_sync_timestamp<K: Deref, L: Deref>(
192192
updated_timestamp: u32, kv_store: K, logger: L,
193193
) -> Result<(), Error>
194194
where
195195
K::Target: KVStore,
196196
L::Target: Logger,
197197
{
198198
let mut writer = kv_store
199-
.write(RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE, RGS_LATEST_SYNC_TIMESTAMP_KEY)
199+
.write(LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE, LATEST_RGS_SYNC_TIMESTAMP_KEY)
200200
.map_err(|e| {
201201
log_error!(
202202
logger,
203203
"Getting writer for key {}/{} failed due to: {}",
204-
RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE,
205-
RGS_LATEST_SYNC_TIMESTAMP_KEY,
204+
LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE,
205+
LATEST_RGS_SYNC_TIMESTAMP_KEY,
206206
e
207207
);
208208
Error::PersistenceFailed
@@ -211,8 +211,8 @@ where
211211
log_error!(
212212
logger,
213213
"Writing data to key {}/{} failed due to: {}",
214-
RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE,
215-
RGS_LATEST_SYNC_TIMESTAMP_KEY,
214+
LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE,
215+
LATEST_RGS_SYNC_TIMESTAMP_KEY,
216216
e
217217
);
218218
Error::PersistenceFailed
@@ -221,8 +221,13 @@ where
221221
log_error!(
222222
logger,
223223
"Committing data to key {}/{} failed due to: {}",
224-
RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE,
225-
RGS_LATEST_SYNC_TIMESTAMP_KEY,
224+
LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE,
225+
LATEST_RGS_SYNC_TIMESTAMP_KEY,
226+
e
227+
);
228+
Error::PersistenceFailed
229+
})
230+
}
226231
e
227232
);
228233
Error::PersistenceFailed

src/lib.rs

Lines changed: 34 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,9 @@ const LDK_PAYMENT_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
172172
// The time in-between peer reconnection attempts.
173173
const PEER_RECONNECTION_INTERVAL: Duration = Duration::from_secs(10);
174174

175+
// The time in-between RGS sync attempts.
176+
const RGS_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60);
177+
175178
// The time in-between node announcement broadcast attempts.
176179
const NODE_ANN_BCAST_INTERVAL: Duration = Duration::from_secs(60 * 60);
177180

@@ -550,7 +553,7 @@ impl Builder {
550553
));
551554

552555
// Reset the RGS sync timestamp in case we somehow switch gossip sources
553-
io::utils::write_rgs_latest_sync_timestamp(
556+
io::utils::write_latest_rgs_sync_timestamp(
554557
0,
555558
Arc::clone(&kv_store),
556559
Arc::clone(&logger),
@@ -560,7 +563,7 @@ impl Builder {
560563
}
561564
GossipSourceConfig::RapidGossipSync(rgs_server) => {
562565
let latest_sync_timestamp =
563-
io::utils::read_rgs_latest_sync_timestamp(Arc::clone(&kv_store)).unwrap_or(0);
566+
io::utils::read_latest_rgs_sync_timestamp(Arc::clone(&kv_store)).unwrap_or(0);
564567
Arc::new(GossipSource::new_rgs(
565568
rgs_server.clone(),
566569
latest_sync_timestamp,
@@ -756,38 +759,39 @@ impl Node {
756759
let gossip_source = Arc::clone(&self.gossip_source);
757760
let gossip_sync_store = Arc::clone(&self.kv_store);
758761
let gossip_sync_logger = Arc::clone(&self.logger);
759-
let stop_gossip_sync = Arc::clone(&stop_running);
762+
let mut stop_gossip_sync = self.stop_receiver.clone();
760763
runtime.spawn(async move {
764+
let mut interval = tokio::time::interval(RGS_SYNC_INTERVAL);
761765
loop {
762-
let gossip_sync_logger = Arc::clone(&gossip_sync_logger);
763-
let stop_gossip_sync = Arc::clone(&stop_gossip_sync);
764-
if stop_gossip_sync.load(Ordering::Acquire) {
765-
return;
766-
}
767-
768-
let now = Instant::now();
769-
match gossip_source.update_rgs_snapshot().await {
770-
Ok(updated_timestamp) => {
771-
log_info!(
772-
gossip_sync_logger,
773-
"Background sync of RGS gossip data finished in {}ms.",
774-
now.elapsed().as_millis()
775-
);
776-
io::utils::write_rgs_latest_sync_timestamp(
777-
updated_timestamp,
778-
Arc::clone(&gossip_sync_store),
779-
Arc::clone(&gossip_sync_logger),
780-
)
781-
.expect("Persistence failed");
766+
tokio::select! {
767+
_ = stop_gossip_sync.changed() => {
768+
return;
769+
}
770+
_ = interval.tick() => {
771+
let gossip_sync_logger = Arc::clone(&gossip_sync_logger);
772+
let now = Instant::now();
773+
match gossip_source.update_rgs_snapshot().await {
774+
Ok(updated_timestamp) => {
775+
log_info!(
776+
gossip_sync_logger,
777+
"Background sync of RGS gossip data finished in {}ms.",
778+
now.elapsed().as_millis()
779+
);
780+
io::utils::write_latest_rgs_sync_timestamp(
781+
updated_timestamp,
782+
Arc::clone(&gossip_sync_store),
783+
Arc::clone(&gossip_sync_logger),
784+
)
785+
.expect("Persistence failed");
786+
}
787+
Err(e) => log_error!(
788+
gossip_sync_logger,
789+
"Background sync of RGS gossip data failed: {}",
790+
e
791+
),
792+
}
782793
}
783-
Err(e) => log_error!(
784-
gossip_sync_logger,
785-
"Background sync of RGS gossip data failed: {}",
786-
e
787-
),
788794
}
789-
790-
tokio::time::sleep(Duration::from_secs(60 * 60)).await;
791795
}
792796
});
793797
}

0 commit comments

Comments
 (0)