Skip to content

Commit f2bbf12

Browse files
committed
Persist latest_node_ann_broadcast_timestamp
We remember when we last broadcasted a node announcement and only re-announce if sufficient time has passed, we have at least one public channel, and we have some connected peers to gossip to.
1 parent 7161ce4 commit f2bbf12

File tree

3 files changed

+88
-11
lines changed

3 files changed

+88
-11
lines changed

src/io/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ pub(crate) const PAYMENT_INFO_PERSISTENCE_NAMESPACE: &str = "payments";
3030
pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE: &str = "";
3131
pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_KEY: &str = "latest_rgs_sync_timestamp";
3232

33+
/// The last time we broadcast a node announcement will be persisted under this key.
34+
pub(crate) const LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE: &str = "";
35+
pub(crate) const LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY: &str = "latest_node_ann_bcast_timestamp";
36+
3337
/// Provides an interface that allows to store and retrieve persisted values that are associated
3438
/// with given keys.
3539
///

src/io/utils.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,58 @@ where
228228
Error::PersistenceFailed
229229
})
230230
}
231+
232+
pub(crate) fn read_latest_node_ann_bcast_timestamp<K: Deref>(
233+
kv_store: K,
234+
) -> Result<u64, std::io::Error>
235+
where
236+
K::Target: KVStore,
237+
{
238+
let mut reader = kv_store
239+
.read(LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE, LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY)?;
240+
u64::read(&mut reader).map_err(|_| {
241+
std::io::Error::new(
242+
std::io::ErrorKind::InvalidData,
243+
"Failed to deserialize latest node announcment broadcast timestamp",
244+
)
245+
})
246+
}
247+
248+
pub(crate) fn write_latest_node_ann_bcast_timestamp<K: Deref, L: Deref>(
249+
updated_timestamp: u64, kv_store: K, logger: L,
250+
) -> Result<(), Error>
251+
where
252+
K::Target: KVStore,
253+
L::Target: Logger,
254+
{
255+
let mut writer = kv_store
256+
.write(LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY)
257+
.map_err(|e| {
258+
log_error!(
259+
logger,
260+
"Getting writer for key {}/{} failed due to: {}",
261+
LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE,
262+
LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY,
263+
e
264+
);
265+
Error::PersistenceFailed
266+
})?;
267+
updated_timestamp.write(&mut writer).map_err(|e| {
268+
log_error!(
269+
logger,
270+
"Writing data to key {}/{} failed due to: {}",
271+
LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE,
272+
LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY,
273+
e
274+
);
275+
Error::PersistenceFailed
276+
})?;
277+
writer.commit().map_err(|e| {
278+
log_error!(
279+
logger,
280+
"Committing data to key {}/{} failed due to: {}",
281+
LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE,
282+
LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY,
231283
e
232284
);
233285
Error::PersistenceFailed

src/lib.rs

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -910,30 +910,51 @@ impl Node {
910910
let bcast_cm = Arc::clone(&self.channel_manager);
911911
let bcast_pm = Arc::clone(&self.peer_manager);
912912
let bcast_config = Arc::clone(&self.config);
913+
let bcast_store = Arc::clone(&self.kv_store);
914+
let bcast_logger = Arc::clone(&self.logger);
913915
let mut stop_bcast = self.stop_receiver.clone();
914916
runtime.spawn(async move {
915-
let mut interval = tokio::time::interval(NODE_ANN_BCAST_INTERVAL);
917+
// We check every 30 secs whether our last broadcast is NODE_ANN_BCAST_INTERVAL away.
918+
let mut interval = tokio::time::interval(Duration::from_secs(30));
916919
loop {
917920
tokio::select! {
918921
_ = stop_bcast.changed() => {
919922
return;
920923
}
921-
_ = interval.tick(), if bcast_cm.list_channels().iter().any(|chan| chan.is_public) => {
922-
while bcast_pm.get_peer_node_ids().is_empty() {
923-
// Sleep a bit and retry if we don't have any peers yet.
924-
tokio::time::sleep(Duration::from_secs(5)).await;
925-
926-
// Check back if we need to stop.
927-
match stop_bcast.has_changed() {
928-
Ok(false) => {},
929-
Ok(true) => return,
930-
Err(_) => return,
924+
_ = interval.tick() => {
925+
let skip_broadcast = match io::utils::read_latest_node_ann_bcast_timestamp(Arc::clone(&bcast_store)) {
926+
Ok(latest_bcast_time_secs) => {
927+
// Skip if the time hasn't elapsed yet.
928+
let next_bcast_unix_time = SystemTime::UNIX_EPOCH + Duration::from_secs(latest_bcast_time_secs) + NODE_ANN_BCAST_INTERVAL;
929+
next_bcast_unix_time.elapsed().is_err()
930+
}
931+
Err(_) => {
932+
// Don't skip if we haven't broadcasted before.
933+
false
931934
}
935+
};
936+
937+
if skip_broadcast {
938+
continue;
939+
}
940+
941+
if bcast_cm.list_channels().iter().any(|chan| chan.is_public) {
942+
// Skip if we don't have any public channels.
943+
continue;
944+
}
945+
946+
if bcast_pm.get_peer_node_ids().is_empty() {
947+
// Skip if we don't have any connected peers to gossip to.
948+
continue;
932949
}
933950

934951
let addresses =
935952
bcast_config.listening_address.iter().cloned().map(|a| a.0).collect();
936953
bcast_pm.broadcast_node_announcement([0; 3], [0; 32], addresses);
954+
955+
let unix_time_secs = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
956+
io::utils::write_latest_node_ann_bcast_timestamp(unix_time_secs, Arc::clone(&bcast_store), Arc::clone(&bcast_logger))
957+
.expect("Persistence failed");
937958
}
938959
}
939960
}

0 commit comments

Comments
 (0)