Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(rust/cardano-chain-follower): Cleanup updating follower state after getting a new update #250

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 99 additions & 95 deletions rust/cardano-chain-follower/src/follow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use cardano_blockchain_types::{Fork, MultiEraBlock, Network, Point};
use pallas::network::miniprotocols::txmonitor::{TxBody, TxId};
use tokio::sync::broadcast::{self};
use tracing::{debug, error};
use tracing::{debug, error, warn};

use crate::{
chain_sync::point_at_tip,
Expand Down Expand Up @@ -85,53 +85,56 @@ impl ChainFollower {

/// If we can, get the next update from the mithril snapshot.
async fn next_from_mithril(&mut self) -> Option<ChainUpdate> {
let current_mithril_tip = latest_mithril_snapshot_id(self.chain).tip();

// Get previous mithril tip, or set it and return current if a previous does not exist.
let previous_mithril_tip = if let Some(tip) = &self.mithril_tip {
tip
} else {
debug!(
mithril_tip = ?current_mithril_tip,
"Setting Initial Mithril Tip"
);
self.mithril_tip = Some(current_mithril_tip.clone());
&current_mithril_tip
};

// Return an ImmutableBlockRollForward event as soon as we can after one occurs.
// This is not an advancement in the followers sequential block iterating state.
// BUT it is a necessary status to return to a follower, so it can properly handle
// when immutable state advances (if it so requires)
if current_mithril_tip != *previous_mithril_tip {
debug!(
new_tip = ?self.mithril_tip,
current_tip = ?current_mithril_tip,
"Mithril Tip has changed"
);
// We have a new mithril tip so report Mithril Tip Roll Forward
if let Some(block) = self.snapshot.read_block_at(&current_mithril_tip).await {
// Update the snapshot in the follower state to the new snapshot.
let update = ChainUpdate::new(
chain_update::Kind::ImmutableBlockRollForward,
false, // Tip is Live chain tip, not Mithril Tip, and this is Mithril Tip.
block,
// This Loop allows us to re-try if we detect that the mithril snapshot has changed while
// we were trying to read a block from it. Typically this function never loops.
loop {
let current_mithril_tip = latest_mithril_snapshot_id(self.chain).tip();

// Get previous mithril tip, or set it and return current if a previous does not exist.
let previous_mithril_tip = self.mithril_tip.get_or_insert_with(|| {
debug!(
mithril_tip = ?current_mithril_tip,
"Setting Initial Mithril Tip"
);
current_mithril_tip.clone()
});

// Return an ImmutableBlockRollForward event as soon as we can after one occurs.
// This is not an advancement in the followers sequential block iterating state.
// BUT it is a necessary status to return to a follower, so it can properly handle
// when immutable state advances (if it so requires)
if current_mithril_tip != *previous_mithril_tip {
debug!(
new_tip = ?self.mithril_tip,
current_tip = ?current_mithril_tip,
"Mithril Tip has changed"
);
// We have a new mithril tip so report Mithril Tip Roll Forward
if let Some(block) = self.snapshot.read_block_at(&current_mithril_tip).await {
// Update the snapshot in the follower state to the new snapshot.
let update = ChainUpdate::new(
chain_update::Kind::ImmutableBlockRollForward,
false, // Tip is Live chain tip, not Mithril Tip, and this is Mithril Tip.
block,
);
return Some(update);
}
// This can only happen if the snapshot does not contain the tip block.
// So its effectively impossible/unreachable.
// However, IF it does happen, nothing bad (other than a delay to reporting
// immutable roll forward) will occur, so we log this impossible
// error, and continue processing.
error!(
tip = ?self.mithril_tip,
current = ?current_mithril_tip,
"Mithril Tip Block is not in snapshot. Should not happen."
);
return Some(update);
}
// This can only happen if the snapshot does not contain the tip block.
// So its effectively impossible/unreachable.
// However, IF it does happen, nothing bad (other than a delay to reporting
// immutable roll forward) will occur, so we log this impossible
// error, and continue processing.
error!(
tip = ?self.mithril_tip,
current = ?current_mithril_tip,
"Mithril Tip Block is not in snapshot. Should not happen."
);
}

if current_mithril_tip > self.current {
if current_mithril_tip <= self.current {
break;
}

if self.mithril_follower.is_none() {
self.mithril_follower = self
.snapshot
Expand All @@ -141,15 +144,22 @@ impl ChainFollower {

if let Some(follower) = self.mithril_follower.as_mut() {
if let Some(next) = follower.next().await {
self.previous = self.current.clone();
self.current = next.point();
self.fork = Fork::IMMUTABLE; // Mithril Immutable data is always Fork 0.
let update = ChainUpdate::new(chain_update::Kind::Block, false, next);
return Some(update);
}

// Verifying ultra rare scenario of race condition on the mithril snapshot data
// directory, where the underlying data directory could be no longer accessible
if follower.is_valid() {
break;
}
// Set the mithril follower to None and restart the loop
warn!("Detected Mithril snapshot data directory race condition, underlying data directory is not accessible anymore: Correcting...");
self.mithril_follower = None;
} else {
break;
}
}

None
}

Expand Down Expand Up @@ -217,11 +227,6 @@ impl ChainFollower {
rollback_depth,
);
}
// debug!("Pre Previous update 4 : {:?}", self.previous);
self.previous = self.current.clone();
// debug!("Post Previous update 4 : {:?}", self.previous);
self.current = next_block.point().clone();
self.fork = next_block.fork();

let tip = point_at_tip(self.chain, &self.current).await;
let update = ChainUpdate::new(update_type, tip, next_block);
Expand All @@ -232,23 +237,22 @@ impl ChainFollower {
}

/// Update the current Point, and return `false` if this fails.
fn update_current(&mut self, update: Option<&ChainUpdate>) -> bool {
if let Some(update) = update {
if update.kind == Kind::ImmutableBlockRollForward {
// The ImmutableBlockRollForward includes the Mithril TIP Block.
// Update the mithril_tip state to the point of it.
self.mithril_tip = Some(update.data.point());
debug!(mithril_tip=?self.mithril_tip, "Updated followers current Mithril Tip");
// We DO NOT update anything else for this kind of update, as its informational and
// does not advance the state of the follower to a new block.
// It is still a valid update, and so return true, but don't update more state.
return true;
}
let decoded = update.block_data().decode();
self.current = Point::new(decoded.slot().into(), decoded.hash().into());
return true;
fn update_current(&mut self, update: &ChainUpdate) {
if update.kind == Kind::ImmutableBlockRollForward {
// The ImmutableBlockRollForward includes the Mithril TIP Block.
// Update the mithril_tip state to the point of it.
self.mithril_tip = Some(update.data.point());
debug!(mithril_tip=?self.mithril_tip, "Updated followers current Mithril Tip");
// Reset mithril snapshot follower, because underlying data directory was renamed
self.mithril_follower = None;
// We DO NOT update anything else for this kind of update, as its informational and
// does not advance the state of the follower to a new block.
return;
}
false
// Avoids of doing unnecessary clones.
std::mem::swap(&mut self.previous, &mut self.current);
self.current = update.block_data().point();
self.fork = update.block_data().fork();
}

/// This is an unprotected version of `next()` which can ONLY be used within this
Expand All @@ -258,23 +262,20 @@ impl ChainFollower {
/// This function can NOT return None, but that state is used to help process data.
///
/// This function must not be exposed for general use.
#[allow(clippy::unused_async)]
pub(crate) async fn unprotected_next(&mut self) -> Option<ChainUpdate> {
let mut update;

async fn unprotected_next(&mut self) -> Option<ChainUpdate> {
// We will loop here until we can successfully return a new block
loop {
// Try and get the next update from the mithril chain, and return it if we are
// successful.
update = self.next_from_mithril().await;
if update.is_some() {
break;
if let Some(update) = self.next_from_mithril().await {
self.update_current(&update);
return Some(update);
}

// No update from Mithril Data, so try and get one from the live chain.
update = self.next_from_live_chain().await;
if update.is_some() {
break;
if let Some(update) = self.next_from_live_chain().await {
self.update_current(&update);
return Some(update);
}

// IF we can't get a new block directly from the mithril data, or the live chain, then
Expand Down Expand Up @@ -304,13 +305,6 @@ impl ChainFollower {
},
}
}

// Update the current block, so we know which one to get next.
if !self.update_current(update.as_ref()) {
return None;
}

update
}

/// Get the next block from the follower.
Expand Down Expand Up @@ -444,32 +438,42 @@ mod tests {
}

#[tokio::test]
async fn test_chain_follower_update_current_none() {
async fn test_chain_follower_update_current() {
let chain = Network::Mainnet;
let start = Point::new(100u64.into(), [0; 32].into());
let end = Point::fuzzy(999u64.into());

let mut follower = ChainFollower::new(chain, start.clone(), end.clone()).await;

let result = follower.update_current(None);
let block_data = mock_block();
let update = ChainUpdate::new(chain_update::Kind::Block, false, block_data);

assert!(!result);
let old_current = follower.current.clone();
follower.update_current(&update);

assert_eq!(follower.current, update.block_data().point());
assert_eq!(follower.previous, old_current);
assert_eq!(follower.fork, update.block_data().fork());
}

#[tokio::test]
async fn test_chain_follower_update_current() {
async fn test_chain_follower_update_current_immutable_roll_forward() {
let chain = Network::Mainnet;
let start = Point::new(100u64.into(), [0; 32].into());
let end = Point::fuzzy(999u64.into());

let mut follower = ChainFollower::new(chain, start.clone(), end.clone()).await;

let block_data = mock_block();
let update = ChainUpdate::new(chain_update::Kind::Block, false, block_data);
let update = ChainUpdate::new(
chain_update::Kind::ImmutableBlockRollForward,
false,
block_data,
);

let result = follower.update_current(Some(&update.clone()));
let old_current = follower.current.clone();
follower.update_current(&update);

assert!(result);
assert_eq!(follower.current, update.block_data().point());
assert_eq!(follower.current, old_current);
}
}
18 changes: 15 additions & 3 deletions rust/cardano-chain-follower/src/mithril_snapshot_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use std::{
fmt::Debug,
path::Path,
path::{Path, PathBuf},
sync::{Arc, Mutex},
};

Expand Down Expand Up @@ -46,6 +46,8 @@ impl Debug for MithrilSnapshotIteratorInner {
/// Wraps the iterator type returned by Pallas.
#[derive(Debug)]
pub(crate) struct MithrilSnapshotIterator {
/// Mithril snapshot directory path
path: PathBuf,
/// Inner Mutable Synchronous Iterator State
inner: Arc<Mutex<MithrilSnapshotIteratorInner>>,
}
Expand All @@ -70,6 +72,12 @@ pub(crate) fn probe_point(point: &Point, distance: u64) -> Point {
}

impl MithrilSnapshotIterator {
/// Returns `true` if the `MithrilSnapshotIterator` could read data without any issues
/// (underlying mithril snapshot directory exists)
pub(crate) fn is_valid(&self) -> bool {
self.path.exists()
}

/// Try and probe to establish the iterator from the desired point.
async fn try_fuzzy_iterator(
chain: Network, path: &Path, from: &Point, search_interval: u64,
Expand Down Expand Up @@ -123,6 +131,7 @@ impl MithrilSnapshotIterator {
};

Some(MithrilSnapshotIterator {
path: path.to_path_buf(),
inner: Arc::new(Mutex::new(MithrilSnapshotIteratorInner {
chain,
start: this,
Expand Down Expand Up @@ -181,6 +190,7 @@ impl MithrilSnapshotIterator {
let iterator = make_mithril_iterator(path, from, chain).await?;

Ok(MithrilSnapshotIterator {
path: path.to_path_buf(),
inner: Arc::new(Mutex::new(MithrilSnapshotIteratorInner {
chain,
start: from.clone(),
Expand All @@ -196,8 +206,10 @@ impl MithrilSnapshotIterator {
let inner = self.inner.clone();

let res = task::spawn_blocking(move || {
#[allow(clippy::unwrap_used)] // Unwrap is safe here because the lock can't be poisoned.
let mut inner_iterator = inner.lock().unwrap();
#[allow(clippy::expect_used)]
let mut inner_iterator = inner
.lock()
.expect("Safe here because the lock can't be poisoned");
inner_iterator.next()
})
.await;
Expand Down