Skip to content

fix(rust/cardano-chain-follower): fix immutable roll forward events #245

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Mar 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ run-mithril-download-example-preview: code-format code-lint
run-mithril-download-example-mainnet: code-format code-lint
cargo build -r --package cardano-chain-follower --example follow_chains --features mimalloc
RUST_LOG="error,follow_chains=debug,cardano_chain_follower=debug,mithril-client=debug" \
./target/release/examples/follow_chains --mainnet
./target/release/examples/follow_chains --mainnet --mithril-sync-workers 64 --mithril-sync-chunk-size 16 --mithril-sync-queue-ahead=6

# Run long running developer test for mithril downloading.
debug-heap-mithril-download-example: code-format code-lint
Expand Down
12 changes: 12 additions & 0 deletions rust/cardano-chain-follower/examples/follow_chains.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,18 @@ async fn follow_for(network: Network, matches: ArgMatches) {
while let Some(chain_update) = follower.next().await {
updates = updates.saturating_add(1);

if chain_update.kind == Kind::ImmutableBlockRollForward {
let immutable_tip = u64::from(chain_update.data.point().slot_or_default());
let immutable = chain_update.immutable();
info!(
updates = updates,
immutable_tip = immutable_tip,
immutable = immutable,
"Chain Immutable Roll Forward Detected."
);
continue; // Nothing else to do, so get next sequential block.
}

if chain_update.tip {
reached_tip = true;
}
Expand Down
3 changes: 2 additions & 1 deletion rust/cardano-chain-follower/src/chain_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,10 +484,11 @@ async fn live_sync_backfill_and_purge(
}

debug!(
"After Purge: Size of the Live Chain is: {} Blocks",
"After Purge: Size of the Live Chain is: {} Blocks: Triggering Sleeping Followers.",
live_chain_length(cfg.chain)
);

// Trigger any sleeping followers that data has changed.
notify_follower(
cfg.chain,
update_sender.as_ref(),
Expand Down
191 changes: 115 additions & 76 deletions rust/cardano-chain-follower/src/follow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use cardano_blockchain_types::{Fork, MultiEraBlock, Network, Point};
use pallas::network::miniprotocols::txmonitor::{TxBody, TxId};
use tokio::sync::broadcast::{self};
use tracing::{debug, error};
use tracing::{debug, error, warn};

use crate::{
chain_sync::point_at_tip,
Expand All @@ -14,7 +14,7 @@ use crate::{
mithril_snapshot_data::latest_mithril_snapshot_id,
mithril_snapshot_iterator::MithrilSnapshotIterator,
stats::{self},
Statistics,
Kind, Statistics,
};

/// The Chain Follower
Expand Down Expand Up @@ -85,9 +85,56 @@ impl ChainFollower {

/// If we can, get the next update from the mithril snapshot.
async fn next_from_mithril(&mut self) -> Option<ChainUpdate> {
let current_mithril_tip = latest_mithril_snapshot_id(self.chain).tip();
// This Loop allows us to re-try if we detect that the mithril snapshot has changed while
// we were trying to read a block from it. Typically this function never loops.
loop {
let current_mithril_tip = latest_mithril_snapshot_id(self.chain).tip();

// Get previous mithril tip, or set it and return current if a previous does not exist.
let previous_mithril_tip = self.mithril_tip.get_or_insert_with(|| {
debug!(
mithril_tip = ?current_mithril_tip,
"Setting Initial Mithril Tip"
);
current_mithril_tip.clone()
});

// Return an ImmutableBlockRollForward event as soon as we can after one occurs.
// This is not an advancement in the followers sequential block iterating state.
// BUT it is a necessary status to return to a follower, so it can properly handle
// when immutable state advances (if it so requires)
if current_mithril_tip != *previous_mithril_tip {
debug!(
new_tip = ?self.mithril_tip,
current_tip = ?current_mithril_tip,
"Mithril Tip has changed"
);
// We have a new mithril tip so report Mithril Tip Roll Forward
if let Some(block) = self.snapshot.read_block_at(&current_mithril_tip).await {
// Update the snapshot in the follower state to the new snapshot.
let update = ChainUpdate::new(
chain_update::Kind::ImmutableBlockRollForward,
false, // Tip is Live chain tip, not Mithril Tip, and this is Mithril Tip.
block,
);
return Some(update);
}
// This can only happen if the snapshot does not contain the tip block.
// So its effectively impossible/unreachable.
// However, IF it does happen, nothing bad (other than a delay to reporting
// immutable roll forward) will occur, so we log this impossible
// error, and continue processing.
error!(
tip = ?self.mithril_tip,
current = ?current_mithril_tip,
"Mithril Tip Block is not in snapshot. Should not happen."
);
}

if current_mithril_tip <= self.current {
break;
}

if current_mithril_tip > self.current {
if self.mithril_follower.is_none() {
self.mithril_follower = self
.snapshot
Expand All @@ -97,40 +144,22 @@ impl ChainFollower {

if let Some(follower) = self.mithril_follower.as_mut() {
if let Some(next) = follower.next().await {
// debug!("Pre Previous update 3 : {:?}", self.previous);
self.previous = self.current.clone();
// debug!("Post Previous update 3 : {:?}", self.previous);
self.current = next.point();
self.fork = Fork::IMMUTABLE; // Mithril Immutable data is always Fork 0.
let update = ChainUpdate::new(chain_update::Kind::Block, false, next);
return Some(update);
}
}
}

let roll_forward_condition = if let Some(mithril_tip) = &self.mithril_tip {
current_mithril_tip > *mithril_tip && *mithril_tip > self.current
} else {
true
};

if roll_forward_condition {
let snapshot = MithrilSnapshot::new(self.chain);
if let Some(block) = snapshot.read_block_at(&current_mithril_tip).await {
// The Mithril Tip has moved forwards.
self.mithril_tip = Some(current_mithril_tip);
// Get the mithril tip block.
let update =
ChainUpdate::new(chain_update::Kind::ImmutableBlockRollForward, false, block);
return Some(update);
// Verifying ultra rare scenario of race condition on the mithril snapshot data
// directory, where the underlying data directory could be no longer accessible
if follower.is_valid() {
break;
}
// Set the mithril follower to None and restart the loop
warn!("Detected Mithril snapshot data directory race condition, underlying data directory is not accessible anymore: Correcting...");
self.mithril_follower = None;
} else {
break;
}
error!(
tip = ?self.mithril_tip,
current = ?current_mithril_tip,
"Mithril Tip Block is not in snapshot. Should not happen."
);
}

None
}

Expand Down Expand Up @@ -198,11 +227,6 @@ impl ChainFollower {
rollback_depth,
);
}
// debug!("Pre Previous update 4 : {:?}", self.previous);
self.previous = self.current.clone();
// debug!("Post Previous update 4 : {:?}", self.previous);
self.current = next_block.point().clone();
self.fork = next_block.fork();

let tip = point_at_tip(self.chain, &self.current).await;
let update = ChainUpdate::new(update_type, tip, next_block);
Expand All @@ -213,13 +237,22 @@ impl ChainFollower {
}

/// Update the current Point, and return `false` if this fails.
fn update_current(&mut self, update: Option<&ChainUpdate>) -> bool {
if let Some(update) = update {
let decoded = update.block_data().decode();
self.current = Point::new(decoded.slot().into(), decoded.hash().into());
return true;
fn update_current(&mut self, update: &ChainUpdate) {
if update.kind == Kind::ImmutableBlockRollForward {
// The ImmutableBlockRollForward includes the Mithril TIP Block.
// Update the mithril_tip state to the point of it.
self.mithril_tip = Some(update.data.point());
debug!(mithril_tip=?self.mithril_tip, "Updated followers current Mithril Tip");
// Reset mithril snapshot follower, because underlying data directory was renamed
self.mithril_follower = None;
// We DO NOT update anything else for this kind of update, as its informational and
// does not advance the state of the follower to a new block.
return;
}
false
// Avoids of doing unnecessary clones.
std::mem::swap(&mut self.previous, &mut self.current);
self.current = update.block_data().point();
self.fork = update.block_data().fork();
}

/// This is an unprotected version of `next()` which can ONLY be used within this
Expand All @@ -229,53 +262,49 @@ impl ChainFollower {
/// This function can NOT return None, but that state is used to help process data.
///
/// This function must not be exposed for general use.
#[allow(clippy::unused_async)]
pub(crate) async fn unprotected_next(&mut self) -> Option<ChainUpdate> {
let mut update;

async fn unprotected_next(&mut self) -> Option<ChainUpdate> {
// We will loop here until we can successfully return a new block
loop {
// Check if Immutable TIP has advanced, and if so, send a ChainUpdate about it.
// Should only happen once every ~6hrs.
// TODO.

// Try and get the next update from the mithril chain, and return it if we are
// successful.
update = self.next_from_mithril().await;
if update.is_some() {
break;
if let Some(update) = self.next_from_mithril().await {
self.update_current(&update);
return Some(update);
}

// No update from Mithril Data, so try and get one from the live chain.
update = self.next_from_live_chain().await;
if update.is_some() {
break;
if let Some(update) = self.next_from_live_chain().await {
self.update_current(&update);
return Some(update);
}

// IF we can't get a new block directly from the mithril data, or the live chain, then
// wait for something to change which might mean we can get the next block.
let update = self.sync_updates.recv().await;
match update {
// Note, this is JUST a trigger, we don't process based on it other than to allow
// a blocked follower to continue.
let changed_data_trigger = self.sync_updates.recv().await;
match changed_data_trigger {
Ok(kind) => {
// The KIND of event signaling changed data is not important, but we do log it
// to help with debugging in case an update stops.
debug!("Update kind: {kind}");
},
Err(tokio::sync::broadcast::error::RecvError::Lagged(distance)) => {
// The update queue is small, its possible that it fills before a task can
// read from it, this will cause this Lagged error.
// BUT, because we don't care what the event was, this is as good as the missed
// event. Therefore its not an error, and just log it at debug to help with
// debugging the logic only.
debug!("Lagged by {} updates", distance);
},
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
// We are closed, so we need to wait for the next update.
// The queue is closed, so we need to return that its no longer possible to
// get data from this follower.
// This is not an error.
return None;
},
}
}

// Update the current block, so we know which one to get next.
if !self.update_current(update.as_ref()) {
return None;
}

update
}

/// Get the next block from the follower.
Expand Down Expand Up @@ -405,36 +434,46 @@ mod tests {
assert_eq!(follower.current, start);
assert_eq!(follower.fork, 1.into());
assert!(follower.mithril_follower.is_none());
assert!(follower.mithril_tip.is_none());
// assert!(follower.mithril_tip.is_none());
}

#[tokio::test]
async fn test_chain_follower_update_current_none() {
async fn test_chain_follower_update_current() {
let chain = Network::Mainnet;
let start = Point::new(100u64.into(), [0; 32].into());
let end = Point::fuzzy(999u64.into());

let mut follower = ChainFollower::new(chain, start.clone(), end.clone()).await;

let result = follower.update_current(None);
let block_data = mock_block();
let update = ChainUpdate::new(chain_update::Kind::Block, false, block_data);

assert!(!result);
let old_current = follower.current.clone();
follower.update_current(&update);

assert_eq!(follower.current, update.block_data().point());
assert_eq!(follower.previous, old_current);
assert_eq!(follower.fork, update.block_data().fork());
}

#[tokio::test]
async fn test_chain_follower_update_current() {
async fn test_chain_follower_update_current_immutable_roll_forward() {
let chain = Network::Mainnet;
let start = Point::new(100u64.into(), [0; 32].into());
let end = Point::fuzzy(999u64.into());

let mut follower = ChainFollower::new(chain, start.clone(), end.clone()).await;

let block_data = mock_block();
let update = ChainUpdate::new(chain_update::Kind::Block, false, block_data);
let update = ChainUpdate::new(
chain_update::Kind::ImmutableBlockRollForward,
false,
block_data,
);

let result = follower.update_current(Some(&update.clone()));
let old_current = follower.current.clone();
follower.update_current(&update);

assert!(result);
assert_eq!(follower.current, update.block_data().point());
assert_eq!(follower.current, old_current);
}
}
2 changes: 1 addition & 1 deletion rust/cardano-chain-follower/src/mithril_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl MithrilSnapshot {

/// Read a single block from a known point.
#[allow(clippy::indexing_slicing)]
#[logcall("debug")]
//#[logcall("debug")]
pub(crate) async fn read_block_at(&self, point: &Point) -> Option<MultiEraBlock> {
if let Some(iterator) = self.try_read_blocks_from_point(point).await {
let block = iterator.next().await;
Expand Down
Loading