-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchain_sync_ready.rs
172 lines (144 loc) · 5.82 KB
/
chain_sync_ready.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
//! Flag to control if chain sync for a blockchain is ready.
//! Can not consume the blockchain data until it is.
use std::{sync::LazyLock, time::Duration};
use cardano_blockchain_types::Network;
use dashmap::DashMap;
use strum::IntoEnumIterator;
use tokio::{
sync::{broadcast, oneshot, RwLock},
time::sleep,
};
use tracing::error;
use crate::{chain_update, stats};
/// Data we hold related to sync being ready or not.
struct SyncReady {
/// MPMC Receive queue for Blockchain Updates
rx: broadcast::Receiver<chain_update::Kind>,
/// MPMC Transmit queue for Blockchain Updates
tx: broadcast::Sender<chain_update::Kind>,
/// Sync is ready flag. (Prevents data race conditions)
ready: bool,
}
impl SyncReady {
/// Create a new `SyncReady` state.
fn new() -> Self {
// Can buffer up to 3 update messages before lagging.
let (tx, rx) = broadcast::channel::<chain_update::Kind>(3);
Self {
tx,
rx,
ready: false,
}
}
}
/// Sand a chain update to any subscribers that are listening.
pub(crate) fn notify_follower(
chain: Network, update_sender: Option<&broadcast::Sender<chain_update::Kind>>,
kind: &chain_update::Kind,
) {
if let Some(update_sender) = update_sender {
if let Err(error) = update_sender.send(kind.clone()) {
error!(
chain = chain.to_string(),
"Failed to broadcast the Update {kind} : {error}"
);
}
}
}
/// Waiter for sync to become ready, use `signal` when it is.
pub(crate) struct SyncReadyWaiter {
/// The oneshot queue we use to signal ready.
signal: Option<oneshot::Sender<()>>,
}
impl SyncReadyWaiter {
/// Create a new `SyncReadyWaiter` state.
pub(crate) fn signal(&mut self) {
if let Some(signaler) = self.signal.take() {
if let Err(error) = signaler.send(()) {
error!("sync ready waiter signal should not fail: {error:?}");
}
} else {
error!("sync ready waiter signal should not be called more than once.");
}
}
}
/// Lock to prevent using any blockchain data for a network UNTIL it is synced to TIP.
/// Pre-initialized for all possible blockchains, so it's safe to use `expect` to access a
/// value.
static SYNC_READY: LazyLock<DashMap<Network, RwLock<SyncReady>>> = LazyLock::new(|| {
let map = DashMap::new();
for network in Network::iter() {
map.insert(network, RwLock::new(SyncReady::new()));
}
map
});
/// Write Lock the `SYNC_READY` lock for a network.
/// When we are signaled to be ready, set it to true and release the lock.
pub(crate) fn wait_for_sync_ready(chain: Network) -> SyncReadyWaiter {
let (tx, rx) = oneshot::channel::<()>();
tokio::spawn(async move {
stats::start_thread(chain, stats::thread::name::WAIT_FOR_SYNC_READY, true);
// We are safe to use `expect` here because the SYNC_READY list is exhaustively
// initialized. Its a Serious BUG if that not True, so panic is OK.
#[allow(clippy::expect_used)]
let lock_entry = SYNC_READY.get(&chain).expect("network should exist");
let lock = lock_entry.value();
let mut status = lock.write().await;
// If we successfully get told to unlock, we do.
if let Ok(()) = rx.await {
status.ready = true;
}
stats::stop_thread(chain, stats::thread::name::WAIT_FOR_SYNC_READY);
// If the channel closes early, we can NEVER use the Blockchain data.
});
SyncReadyWaiter { signal: Some(tx) }
}
/// Get a Read lock on the Sync State, and return if we are ready or not.
async fn check_sync_ready(chain: Network) -> bool {
// We are safe to use `expect` here because the SYNC_READY list is exhaustively
// initialized. Its a Serious BUG if that not True, so panic is OK.
#[allow(clippy::expect_used)]
let lock_entry = SYNC_READY.get(&chain).expect("network should exist");
let lock = lock_entry.value();
let status = lock.read().await;
// If the transmitter has not been taken, we are not really ready.
status.ready
}
/// Number of seconds to wait if we detect a `SyncReady` race condition.
const SYNC_READY_RACE_BACKOFF_SECS: u64 = 1;
/// Block until the chain is synced to TIP.
/// This is necessary to ensure the Blockchain data is fully intact before attempting to
/// consume it.
pub(crate) async fn block_until_sync_ready(chain: Network) {
// There is a potential race where we haven't yet write locked the SYNC_READY lock when we
// check it. So, IF the ready state returns as false, sleep a while and try again.
while !check_sync_ready(chain).await {
sleep(Duration::from_secs(SYNC_READY_RACE_BACKOFF_SECS)).await;
}
}
/// Get the Broadcast Receive queue for the given chain updates.
pub(crate) async fn get_chain_update_rx_queue(
chain: Network,
) -> broadcast::Receiver<chain_update::Kind> {
// We are safe to use `expect` here because the SYNC_READY list is exhaustively
// initialized. Its a Serious BUG if that not True, so panic is OK.
#[allow(clippy::expect_used)]
let lock_entry = SYNC_READY.get(&chain).expect("network should exist");
let lock = lock_entry.value();
let status = lock.read().await;
status.rx.resubscribe()
}
/// Get the Broadcast Transmit queue for the given chain updates.
pub(crate) async fn get_chain_update_tx_queue(
chain: Network,
) -> Option<broadcast::Sender<chain_update::Kind>> {
// We are safe to use `expect` here because the SYNC_READY list is exhaustively
// initialized. Its a Serious BUG if that not True, so panic is OK.
#[allow(clippy::expect_used)]
let lock_entry = SYNC_READY.get(&chain).expect("network should exist");
let lock = lock_entry.value();
if let Ok(status) = lock.try_read() {
return Some(status.tx.clone());
}
None
}