-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchain_sync_config.rs
166 lines (145 loc) · 5.11 KB
/
chain_sync_config.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
//! Cardano chain sync configuration.
//!
//! Independent of ANY followers, we allow a maximum of 3 Chains being updated, one for
//! each network. Chain Followers use the data supplied by the Chain-Sync.
//! This module configures the chain sync processes.
use std::sync::LazyLock;
use cardano_blockchain_types::Network;
use dashmap::DashMap;
use strum::IntoEnumIterator;
use tokio::{sync::Mutex, task::JoinHandle};
use tracing::{debug, error};
use crate::{
chain_sync::chain_sync,
error::{Error, Result},
mithril_snapshot_config::MithrilSnapshotConfig,
stats,
};
/// Default Follower block buffer size.
const DEFAULT_CHAIN_UPDATE_BUFFER_SIZE: usize = 32;
/// How many window (in slot) back from TIP is considered Immutable in the
/// absence of a mithril snapshot.
const DEFAULT_IMMUTABLE_SLOT_WINDOW: u64 = 12 * 60 * 60;
/// Type we use to manage the Sync Task handle map.
type SyncMap = DashMap<Network, Mutex<Option<JoinHandle<()>>>>;
/// Handle to the mithril sync thread. One for each Network ONLY.
static SYNC_JOIN_HANDLE_MAP: LazyLock<SyncMap> = LazyLock::new(|| {
let map = DashMap::new();
for network in Network::iter() {
map.insert(network, Mutex::new(None));
}
map
});
/// A Follower Connection to the Cardano Network.
#[derive(Clone, Debug)]
pub struct ChainSyncConfig {
/// Chain Network
pub chain: Network,
/// Relay Node Address
pub(crate) relay_address: String,
/// Block buffer size option.
chain_update_buffer_size: usize,
/// If we don't have immutable data, how far back from TIP is the data considered
/// Immutable (in slots).
immutable_slot_window: u64,
/// Configuration of Mithril Snapshots.
pub mithril_cfg: MithrilSnapshotConfig,
}
impl ChainSyncConfig {
/// Sets the defaults for a given cardano network.
/// Each network has a different set of defaults, so no single "default" can apply.
/// This function is preferred to the `default()` standard function.
#[must_use]
pub fn default_for(chain: Network) -> Self {
Self {
chain,
relay_address: chain.default_relay(),
chain_update_buffer_size: DEFAULT_CHAIN_UPDATE_BUFFER_SIZE,
immutable_slot_window: DEFAULT_IMMUTABLE_SLOT_WINDOW,
mithril_cfg: MithrilSnapshotConfig::default_for(chain),
}
}
/// Sets the relay address to use for Chain Sync.
///
/// # Arguments
///
/// * `relay`: Address to use for the blockchain relay node.
#[must_use]
pub fn relay(mut self, address: String) -> Self {
self.relay_address = address;
self
}
/// Sets the size of the chain updates buffer used by the Follower.
///
/// # Arguments
///
/// * `chain_update_buffer_size`: Size of the chain updates buffer.
#[must_use]
pub fn chain_update_buffer_size(mut self, block_buffer_size: usize) -> Self {
self.chain_update_buffer_size = block_buffer_size;
self
}
/// Sets the size of the Immutable window used when Mithril is not available.
///
/// # Arguments
///
/// * `window`: Size of the Immutable window.
#[must_use]
pub fn immutable_slot_window(mut self, window: u64) -> Self {
self.immutable_slot_window = window;
self
}
/// Sets the Mithril snapshot Config the `ChainSync` will use.
///
/// # Arguments
///
/// * `cfg`: Mithril snapshot configuration.
#[must_use]
pub fn mithril_cfg(mut self, cfg: MithrilSnapshotConfig) -> Self {
self.mithril_cfg = cfg;
self
}
/// Runs Chain Synchronization.
///
/// Must be done BEFORE the chain can be followed.
///
/// # Returns
///
/// `Result<()>`: On success.
///
/// # Errors
///
/// `Error`: On error.
pub async fn run(self) -> Result<()> {
debug!(
chain = self.chain.to_string(),
"Chain Synchronization Starting"
);
stats::sync_started(self.chain);
// Start the Chain Sync - IFF its not already running.
let lock_entry = match SYNC_JOIN_HANDLE_MAP.get(&self.chain) {
None => {
error!("Join Map improperly initialized: Missing {}!!", self.chain);
return Err(Error::Internal); // Should not get here.
},
Some(entry) => entry,
};
let mut locked_handle = lock_entry.value().lock().await;
if (*locked_handle).is_some() {
debug!("Chain Sync Already Running for {}", self.chain);
return Err(Error::ChainSyncAlreadyRunning(self.chain));
}
// Start the Mithril Snapshot Follower
let rx = self.mithril_cfg.run().await?;
let config = self.clone();
// Start Chain Sync
*locked_handle = Some(tokio::spawn(async move {
stats::start_thread(config.chain, stats::thread::name::CHAIN_SYNC, true);
chain_sync(config.clone(), rx).await;
stats::stop_thread(config.chain, stats::thread::name::CHAIN_SYNC);
}));
// sync_map.insert(chain, handle);
debug!("Chain Sync for {} : Started", self.chain);
Ok(())
}
}