Skip to content

Commit cbb929c

Browse files
Mr-Leshiystevenj
andauthored
chore(rust/cardano-chain-follower): Cleanup updating follower state after getting a new update (#250)
* cleanup update_current method usage * wip * add tests * Update rust/cardano-chain-follower/src/follow.rs Co-authored-by: Steven Johnson <[email protected]> * Update rust/cardano-chain-follower/src/follow.rs Co-authored-by: Steven Johnson <[email protected]> * wip * fix spelling --------- Co-authored-by: Steven Johnson <[email protected]>
1 parent 2ef16a9 commit cbb929c

File tree

2 files changed

+114
-98
lines changed

2 files changed

+114
-98
lines changed

rust/cardano-chain-follower/src/follow.rs

+99-95
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use cardano_blockchain_types::{Fork, MultiEraBlock, Network, Point};
44
use pallas::network::miniprotocols::txmonitor::{TxBody, TxId};
55
use tokio::sync::broadcast::{self};
6-
use tracing::{debug, error};
6+
use tracing::{debug, error, warn};
77

88
use crate::{
99
chain_sync::point_at_tip,
@@ -85,53 +85,56 @@ impl ChainFollower {
8585

8686
/// If we can, get the next update from the mithril snapshot.
8787
async fn next_from_mithril(&mut self) -> Option<ChainUpdate> {
88-
let current_mithril_tip = latest_mithril_snapshot_id(self.chain).tip();
89-
90-
// Get previous mithril tip, or set it and return current if a previous does not exist.
91-
let previous_mithril_tip = if let Some(tip) = &self.mithril_tip {
92-
tip
93-
} else {
94-
debug!(
95-
mithril_tip = ?current_mithril_tip,
96-
"Setting Initial Mithril Tip"
97-
);
98-
self.mithril_tip = Some(current_mithril_tip.clone());
99-
&current_mithril_tip
100-
};
101-
102-
// Return an ImmutableBlockRollForward event as soon as we can after one occurs.
103-
// This is not an advancement in the followers sequential block iterating state.
104-
// BUT it is a necessary status to return to a follower, so it can properly handle
105-
// when immutable state advances (if it so requires)
106-
if current_mithril_tip != *previous_mithril_tip {
107-
debug!(
108-
new_tip = ?self.mithril_tip,
109-
current_tip = ?current_mithril_tip,
110-
"Mithril Tip has changed"
111-
);
112-
// We have a new mithril tip so report Mithril Tip Roll Forward
113-
if let Some(block) = self.snapshot.read_block_at(&current_mithril_tip).await {
114-
// Update the snapshot in the follower state to the new snapshot.
115-
let update = ChainUpdate::new(
116-
chain_update::Kind::ImmutableBlockRollForward,
117-
false, // Tip is Live chain tip, not Mithril Tip, and this is Mithril Tip.
118-
block,
88+
// This Loop allows us to re-try if we detect that the mithril snapshot has changed while
89+
// we were trying to read a block from it. Typically this function never loops.
90+
loop {
91+
let current_mithril_tip = latest_mithril_snapshot_id(self.chain).tip();
92+
93+
// Get previous mithril tip, or set it and return current if a previous does not exist.
94+
let previous_mithril_tip = self.mithril_tip.get_or_insert_with(|| {
95+
debug!(
96+
mithril_tip = ?current_mithril_tip,
97+
"Setting Initial Mithril Tip"
98+
);
99+
current_mithril_tip.clone()
100+
});
101+
102+
// Return an ImmutableBlockRollForward event as soon as we can after one occurs.
103+
// This is not an advancement in the followers sequential block iterating state.
104+
// BUT it is a necessary status to return to a follower, so it can properly handle
105+
// when immutable state advances (if it so requires)
106+
if current_mithril_tip != *previous_mithril_tip {
107+
debug!(
108+
new_tip = ?self.mithril_tip,
109+
current_tip = ?current_mithril_tip,
110+
"Mithril Tip has changed"
111+
);
112+
// We have a new mithril tip so report Mithril Tip Roll Forward
113+
if let Some(block) = self.snapshot.read_block_at(&current_mithril_tip).await {
114+
// Update the snapshot in the follower state to the new snapshot.
115+
let update = ChainUpdate::new(
116+
chain_update::Kind::ImmutableBlockRollForward,
117+
false, // Tip is Live chain tip, not Mithril Tip, and this is Mithril Tip.
118+
block,
119+
);
120+
return Some(update);
121+
}
122+
// This can only happen if the snapshot does not contain the tip block.
123+
// So its effectively impossible/unreachable.
124+
// However, IF it does happen, nothing bad (other than a delay to reporting
125+
// immutable roll forward) will occur, so we log this impossible
126+
// error, and continue processing.
127+
error!(
128+
tip = ?self.mithril_tip,
129+
current = ?current_mithril_tip,
130+
"Mithril Tip Block is not in snapshot. Should not happen."
119131
);
120-
return Some(update);
121132
}
122-
// This can only happen if the snapshot does not contain the tip block.
123-
// So its effectively impossible/unreachable.
124-
// However, IF it does happen, nothing bad (other than a delay to reporting
125-
// immutable roll forward) will occur, so we log this impossible
126-
// error, and continue processing.
127-
error!(
128-
tip = ?self.mithril_tip,
129-
current = ?current_mithril_tip,
130-
"Mithril Tip Block is not in snapshot. Should not happen."
131-
);
132-
}
133133

134-
if current_mithril_tip > self.current {
134+
if current_mithril_tip <= self.current {
135+
break;
136+
}
137+
135138
if self.mithril_follower.is_none() {
136139
self.mithril_follower = self
137140
.snapshot
@@ -141,15 +144,22 @@ impl ChainFollower {
141144

142145
if let Some(follower) = self.mithril_follower.as_mut() {
143146
if let Some(next) = follower.next().await {
144-
self.previous = self.current.clone();
145-
self.current = next.point();
146-
self.fork = Fork::IMMUTABLE; // Mithril Immutable data is always Fork 0.
147147
let update = ChainUpdate::new(chain_update::Kind::Block, false, next);
148148
return Some(update);
149149
}
150+
151+
// Verifying ultra rare scenario of race condition on the mithril snapshot data
152+
// directory, where the underlying data directory could be no longer accessible
153+
if follower.is_valid() {
154+
break;
155+
}
156+
// Set the mithril follower to None and restart the loop
157+
warn!("Detected Mithril snapshot data directory race condition, underlying data directory is not accessible anymore: Correcting...");
158+
self.mithril_follower = None;
159+
} else {
160+
break;
150161
}
151162
}
152-
153163
None
154164
}
155165

@@ -217,11 +227,6 @@ impl ChainFollower {
217227
rollback_depth,
218228
);
219229
}
220-
// debug!("Pre Previous update 4 : {:?}", self.previous);
221-
self.previous = self.current.clone();
222-
// debug!("Post Previous update 4 : {:?}", self.previous);
223-
self.current = next_block.point().clone();
224-
self.fork = next_block.fork();
225230

226231
let tip = point_at_tip(self.chain, &self.current).await;
227232
let update = ChainUpdate::new(update_type, tip, next_block);
@@ -232,23 +237,22 @@ impl ChainFollower {
232237
}
233238

234239
/// Update the current Point, and return `false` if this fails.
235-
fn update_current(&mut self, update: Option<&ChainUpdate>) -> bool {
236-
if let Some(update) = update {
237-
if update.kind == Kind::ImmutableBlockRollForward {
238-
// The ImmutableBlockRollForward includes the Mithril TIP Block.
239-
// Update the mithril_tip state to the point of it.
240-
self.mithril_tip = Some(update.data.point());
241-
debug!(mithril_tip=?self.mithril_tip, "Updated followers current Mithril Tip");
242-
// We DO NOT update anything else for this kind of update, as its informational and
243-
// does not advance the state of the follower to a new block.
244-
// It is still a valid update, and so return true, but don't update more state.
245-
return true;
246-
}
247-
let decoded = update.block_data().decode();
248-
self.current = Point::new(decoded.slot().into(), decoded.hash().into());
249-
return true;
240+
fn update_current(&mut self, update: &ChainUpdate) {
241+
if update.kind == Kind::ImmutableBlockRollForward {
242+
// The ImmutableBlockRollForward includes the Mithril TIP Block.
243+
// Update the mithril_tip state to the point of it.
244+
self.mithril_tip = Some(update.data.point());
245+
debug!(mithril_tip=?self.mithril_tip, "Updated followers current Mithril Tip");
246+
// Reset mithril snapshot follower, because underlying data directory was renamed
247+
self.mithril_follower = None;
248+
// We DO NOT update anything else for this kind of update, as its informational and
249+
// does not advance the state of the follower to a new block.
250+
return;
250251
}
251-
false
252+
// Avoids of doing unnecessary clones.
253+
std::mem::swap(&mut self.previous, &mut self.current);
254+
self.current = update.block_data().point();
255+
self.fork = update.block_data().fork();
252256
}
253257

254258
/// This is an unprotected version of `next()` which can ONLY be used within this
@@ -258,23 +262,20 @@ impl ChainFollower {
258262
/// This function can NOT return None, but that state is used to help process data.
259263
///
260264
/// This function must not be exposed for general use.
261-
#[allow(clippy::unused_async)]
262-
pub(crate) async fn unprotected_next(&mut self) -> Option<ChainUpdate> {
263-
let mut update;
264-
265+
async fn unprotected_next(&mut self) -> Option<ChainUpdate> {
265266
// We will loop here until we can successfully return a new block
266267
loop {
267268
// Try and get the next update from the mithril chain, and return it if we are
268269
// successful.
269-
update = self.next_from_mithril().await;
270-
if update.is_some() {
271-
break;
270+
if let Some(update) = self.next_from_mithril().await {
271+
self.update_current(&update);
272+
return Some(update);
272273
}
273274

274275
// No update from Mithril Data, so try and get one from the live chain.
275-
update = self.next_from_live_chain().await;
276-
if update.is_some() {
277-
break;
276+
if let Some(update) = self.next_from_live_chain().await {
277+
self.update_current(&update);
278+
return Some(update);
278279
}
279280

280281
// IF we can't get a new block directly from the mithril data, or the live chain, then
@@ -304,13 +305,6 @@ impl ChainFollower {
304305
},
305306
}
306307
}
307-
308-
// Update the current block, so we know which one to get next.
309-
if !self.update_current(update.as_ref()) {
310-
return None;
311-
}
312-
313-
update
314308
}
315309

316310
/// Get the next block from the follower.
@@ -444,32 +438,42 @@ mod tests {
444438
}
445439

446440
#[tokio::test]
447-
async fn test_chain_follower_update_current_none() {
441+
async fn test_chain_follower_update_current() {
448442
let chain = Network::Mainnet;
449443
let start = Point::new(100u64.into(), [0; 32].into());
450444
let end = Point::fuzzy(999u64.into());
451445

452446
let mut follower = ChainFollower::new(chain, start.clone(), end.clone()).await;
453447

454-
let result = follower.update_current(None);
448+
let block_data = mock_block();
449+
let update = ChainUpdate::new(chain_update::Kind::Block, false, block_data);
455450

456-
assert!(!result);
451+
let old_current = follower.current.clone();
452+
follower.update_current(&update);
453+
454+
assert_eq!(follower.current, update.block_data().point());
455+
assert_eq!(follower.previous, old_current);
456+
assert_eq!(follower.fork, update.block_data().fork());
457457
}
458458

459459
#[tokio::test]
460-
async fn test_chain_follower_update_current() {
460+
async fn test_chain_follower_update_current_immutable_roll_forward() {
461461
let chain = Network::Mainnet;
462462
let start = Point::new(100u64.into(), [0; 32].into());
463463
let end = Point::fuzzy(999u64.into());
464464

465465
let mut follower = ChainFollower::new(chain, start.clone(), end.clone()).await;
466466

467467
let block_data = mock_block();
468-
let update = ChainUpdate::new(chain_update::Kind::Block, false, block_data);
468+
let update = ChainUpdate::new(
469+
chain_update::Kind::ImmutableBlockRollForward,
470+
false,
471+
block_data,
472+
);
469473

470-
let result = follower.update_current(Some(&update.clone()));
474+
let old_current = follower.current.clone();
475+
follower.update_current(&update);
471476

472-
assert!(result);
473-
assert_eq!(follower.current, update.block_data().point());
477+
assert_eq!(follower.current, old_current);
474478
}
475479
}

rust/cardano-chain-follower/src/mithril_snapshot_iterator.rs

+15-3
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
33
use std::{
44
fmt::Debug,
5-
path::Path,
5+
path::{Path, PathBuf},
66
sync::{Arc, Mutex},
77
};
88

@@ -46,6 +46,8 @@ impl Debug for MithrilSnapshotIteratorInner {
4646
/// Wraps the iterator type returned by Pallas.
4747
#[derive(Debug)]
4848
pub(crate) struct MithrilSnapshotIterator {
49+
/// Mithril snapshot directory path
50+
path: PathBuf,
4951
/// Inner Mutable Synchronous Iterator State
5052
inner: Arc<Mutex<MithrilSnapshotIteratorInner>>,
5153
}
@@ -70,6 +72,12 @@ pub(crate) fn probe_point(point: &Point, distance: u64) -> Point {
7072
}
7173

7274
impl MithrilSnapshotIterator {
75+
/// Returns `true` if the `MithrilSnapshotIterator` could read data without any issues
76+
/// (underlying mithril snapshot directory exists)
77+
pub(crate) fn is_valid(&self) -> bool {
78+
self.path.exists()
79+
}
80+
7381
/// Try and probe to establish the iterator from the desired point.
7482
async fn try_fuzzy_iterator(
7583
chain: Network, path: &Path, from: &Point, search_interval: u64,
@@ -123,6 +131,7 @@ impl MithrilSnapshotIterator {
123131
};
124132

125133
Some(MithrilSnapshotIterator {
134+
path: path.to_path_buf(),
126135
inner: Arc::new(Mutex::new(MithrilSnapshotIteratorInner {
127136
chain,
128137
start: this,
@@ -181,6 +190,7 @@ impl MithrilSnapshotIterator {
181190
let iterator = make_mithril_iterator(path, from, chain).await?;
182191

183192
Ok(MithrilSnapshotIterator {
193+
path: path.to_path_buf(),
184194
inner: Arc::new(Mutex::new(MithrilSnapshotIteratorInner {
185195
chain,
186196
start: from.clone(),
@@ -196,8 +206,10 @@ impl MithrilSnapshotIterator {
196206
let inner = self.inner.clone();
197207

198208
let res = task::spawn_blocking(move || {
199-
#[allow(clippy::unwrap_used)] // Unwrap is safe here because the lock can't be poisoned.
200-
let mut inner_iterator = inner.lock().unwrap();
209+
#[allow(clippy::expect_used)]
210+
let mut inner_iterator = inner
211+
.lock()
212+
.expect("Safe here because the lock can't be poisoned");
201213
inner_iterator.next()
202214
})
203215
.await;

0 commit comments

Comments
 (0)