Skip to content

Commit df9af47

Browse files
stevenjMr-Leshiy
andauthored
fix(rust/cardano-chain-follower): fix immutable roll forward events (#245)
* fix(rust/cardano-chain-follower): fix immutable roll forward event generation * fix(rust/cardano-chain-follower): remove excessive debug log * fix(rust/cardano-chain-follower): Properly log and handle Immutable Roll Forwards in the example * fix(rust/cardano-chain-follower): Add comments and adjust debug log to be more accurate * fix(rust/cardano-chain-follower): Improve comments and update immutable_tip in the `update_current` function only. * fix(rust/cardano-chain-follower): Finally fixes reporting of immutable roll forward events. * chore(rust/cardano-chain-follower): Cleanup updating follower state after getting a new update (#250) * cleanup update_current method usage * wip * add tests * Update rust/cardano-chain-follower/src/follow.rs Co-authored-by: Steven Johnson <[email protected]> * Update rust/cardano-chain-follower/src/follow.rs Co-authored-by: Steven Johnson <[email protected]> * wip * fix spelling --------- Co-authored-by: Steven Johnson <[email protected]> --------- Co-authored-by: Alex Pozhylenkov <[email protected]>
1 parent 6c1f9eb commit df9af47

File tree

7 files changed

+147
-85
lines changed

7 files changed

+147
-85
lines changed

rust/Justfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ run-mithril-download-example-preview: code-format code-lint
6161
run-mithril-download-example-mainnet: code-format code-lint
6262
cargo build -r --package cardano-chain-follower --example follow_chains --features mimalloc
6363
RUST_LOG="error,follow_chains=debug,cardano_chain_follower=debug,mithril-client=debug" \
64-
./target/release/examples/follow_chains --mainnet
64+
./target/release/examples/follow_chains --mainnet --mithril-sync-workers 64 --mithril-sync-chunk-size 16 --mithril-sync-queue-ahead=6
6565

6666
# Run long running developer test for mithril downloading.
6767
debug-heap-mithril-download-example: code-format code-lint

rust/cardano-chain-follower/examples/follow_chains.rs

+12
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,18 @@ async fn follow_for(network: Network, matches: ArgMatches) {
183183
while let Some(chain_update) = follower.next().await {
184184
updates = updates.saturating_add(1);
185185

186+
if chain_update.kind == Kind::ImmutableBlockRollForward {
187+
let immutable_tip = u64::from(chain_update.data.point().slot_or_default());
188+
let immutable = chain_update.immutable();
189+
info!(
190+
updates = updates,
191+
immutable_tip = immutable_tip,
192+
immutable = immutable,
193+
"Chain Immutable Roll Forward Detected."
194+
);
195+
continue; // Nothing else to do, so get next sequential block.
196+
}
197+
186198
if chain_update.tip {
187199
reached_tip = true;
188200
}

rust/cardano-chain-follower/src/chain_sync.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -484,10 +484,11 @@ async fn live_sync_backfill_and_purge(
484484
}
485485

486486
debug!(
487-
"After Purge: Size of the Live Chain is: {} Blocks",
487+
"After Purge: Size of the Live Chain is: {} Blocks: Triggering Sleeping Followers.",
488488
live_chain_length(cfg.chain)
489489
);
490490

491+
// Trigger any sleeping followers that data has changed.
491492
notify_follower(
492493
cfg.chain,
493494
update_sender.as_ref(),

rust/cardano-chain-follower/src/follow.rs

+115-76
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use cardano_blockchain_types::{Fork, MultiEraBlock, Network, Point};
44
use pallas::network::miniprotocols::txmonitor::{TxBody, TxId};
55
use tokio::sync::broadcast::{self};
6-
use tracing::{debug, error};
6+
use tracing::{debug, error, warn};
77

88
use crate::{
99
chain_sync::point_at_tip,
@@ -14,7 +14,7 @@ use crate::{
1414
mithril_snapshot_data::latest_mithril_snapshot_id,
1515
mithril_snapshot_iterator::MithrilSnapshotIterator,
1616
stats::{self},
17-
Statistics,
17+
Kind, Statistics,
1818
};
1919

2020
/// The Chain Follower
@@ -85,9 +85,56 @@ impl ChainFollower {
8585

8686
/// If we can, get the next update from the mithril snapshot.
8787
async fn next_from_mithril(&mut self) -> Option<ChainUpdate> {
88-
let current_mithril_tip = latest_mithril_snapshot_id(self.chain).tip();
88+
// This Loop allows us to re-try if we detect that the mithril snapshot has changed while
89+
// we were trying to read a block from it. Typically this function never loops.
90+
loop {
91+
let current_mithril_tip = latest_mithril_snapshot_id(self.chain).tip();
92+
93+
// Get previous mithril tip, or set it and return current if a previous does not exist.
94+
let previous_mithril_tip = self.mithril_tip.get_or_insert_with(|| {
95+
debug!(
96+
mithril_tip = ?current_mithril_tip,
97+
"Setting Initial Mithril Tip"
98+
);
99+
current_mithril_tip.clone()
100+
});
101+
102+
// Return an ImmutableBlockRollForward event as soon as we can after one occurs.
103+
// This is not an advancement in the followers sequential block iterating state.
104+
// BUT it is a necessary status to return to a follower, so it can properly handle
105+
// when immutable state advances (if it so requires)
106+
if current_mithril_tip != *previous_mithril_tip {
107+
debug!(
108+
new_tip = ?self.mithril_tip,
109+
current_tip = ?current_mithril_tip,
110+
"Mithril Tip has changed"
111+
);
112+
// We have a new mithril tip so report Mithril Tip Roll Forward
113+
if let Some(block) = self.snapshot.read_block_at(&current_mithril_tip).await {
114+
// Update the snapshot in the follower state to the new snapshot.
115+
let update = ChainUpdate::new(
116+
chain_update::Kind::ImmutableBlockRollForward,
117+
false, // Tip is Live chain tip, not Mithril Tip, and this is Mithril Tip.
118+
block,
119+
);
120+
return Some(update);
121+
}
122+
// This can only happen if the snapshot does not contain the tip block.
123+
// So its effectively impossible/unreachable.
124+
// However, IF it does happen, nothing bad (other than a delay to reporting
125+
// immutable roll forward) will occur, so we log this impossible
126+
// error, and continue processing.
127+
error!(
128+
tip = ?self.mithril_tip,
129+
current = ?current_mithril_tip,
130+
"Mithril Tip Block is not in snapshot. Should not happen."
131+
);
132+
}
133+
134+
if current_mithril_tip <= self.current {
135+
break;
136+
}
89137

90-
if current_mithril_tip > self.current {
91138
if self.mithril_follower.is_none() {
92139
self.mithril_follower = self
93140
.snapshot
@@ -97,40 +144,22 @@ impl ChainFollower {
97144

98145
if let Some(follower) = self.mithril_follower.as_mut() {
99146
if let Some(next) = follower.next().await {
100-
// debug!("Pre Previous update 3 : {:?}", self.previous);
101-
self.previous = self.current.clone();
102-
// debug!("Post Previous update 3 : {:?}", self.previous);
103-
self.current = next.point();
104-
self.fork = Fork::IMMUTABLE; // Mithril Immutable data is always Fork 0.
105147
let update = ChainUpdate::new(chain_update::Kind::Block, false, next);
106148
return Some(update);
107149
}
108-
}
109-
}
110150

111-
let roll_forward_condition = if let Some(mithril_tip) = &self.mithril_tip {
112-
current_mithril_tip > *mithril_tip && *mithril_tip > self.current
113-
} else {
114-
true
115-
};
116-
117-
if roll_forward_condition {
118-
let snapshot = MithrilSnapshot::new(self.chain);
119-
if let Some(block) = snapshot.read_block_at(&current_mithril_tip).await {
120-
// The Mithril Tip has moved forwards.
121-
self.mithril_tip = Some(current_mithril_tip);
122-
// Get the mithril tip block.
123-
let update =
124-
ChainUpdate::new(chain_update::Kind::ImmutableBlockRollForward, false, block);
125-
return Some(update);
151+
// Verifying ultra rare scenario of race condition on the mithril snapshot data
152+
// directory, where the underlying data directory could be no longer accessible
153+
if follower.is_valid() {
154+
break;
155+
}
156+
// Set the mithril follower to None and restart the loop
157+
warn!("Detected Mithril snapshot data directory race condition, underlying data directory is not accessible anymore: Correcting...");
158+
self.mithril_follower = None;
159+
} else {
160+
break;
126161
}
127-
error!(
128-
tip = ?self.mithril_tip,
129-
current = ?current_mithril_tip,
130-
"Mithril Tip Block is not in snapshot. Should not happen."
131-
);
132162
}
133-
134163
None
135164
}
136165

@@ -198,11 +227,6 @@ impl ChainFollower {
198227
rollback_depth,
199228
);
200229
}
201-
// debug!("Pre Previous update 4 : {:?}", self.previous);
202-
self.previous = self.current.clone();
203-
// debug!("Post Previous update 4 : {:?}", self.previous);
204-
self.current = next_block.point().clone();
205-
self.fork = next_block.fork();
206230

207231
let tip = point_at_tip(self.chain, &self.current).await;
208232
let update = ChainUpdate::new(update_type, tip, next_block);
@@ -213,13 +237,22 @@ impl ChainFollower {
213237
}
214238

215239
/// Update the current Point, and return `false` if this fails.
216-
fn update_current(&mut self, update: Option<&ChainUpdate>) -> bool {
217-
if let Some(update) = update {
218-
let decoded = update.block_data().decode();
219-
self.current = Point::new(decoded.slot().into(), decoded.hash().into());
220-
return true;
240+
fn update_current(&mut self, update: &ChainUpdate) {
241+
if update.kind == Kind::ImmutableBlockRollForward {
242+
// The ImmutableBlockRollForward includes the Mithril TIP Block.
243+
// Update the mithril_tip state to the point of it.
244+
self.mithril_tip = Some(update.data.point());
245+
debug!(mithril_tip=?self.mithril_tip, "Updated followers current Mithril Tip");
246+
// Reset mithril snapshot follower, because underlying data directory was renamed
247+
self.mithril_follower = None;
248+
// We DO NOT update anything else for this kind of update, as its informational and
249+
// does not advance the state of the follower to a new block.
250+
return;
221251
}
222-
false
252+
// Avoids of doing unnecessary clones.
253+
std::mem::swap(&mut self.previous, &mut self.current);
254+
self.current = update.block_data().point();
255+
self.fork = update.block_data().fork();
223256
}
224257

225258
/// This is an unprotected version of `next()` which can ONLY be used within this
@@ -229,53 +262,49 @@ impl ChainFollower {
229262
/// This function can NOT return None, but that state is used to help process data.
230263
///
231264
/// This function must not be exposed for general use.
232-
#[allow(clippy::unused_async)]
233-
pub(crate) async fn unprotected_next(&mut self) -> Option<ChainUpdate> {
234-
let mut update;
235-
265+
async fn unprotected_next(&mut self) -> Option<ChainUpdate> {
236266
// We will loop here until we can successfully return a new block
237267
loop {
238-
// Check if Immutable TIP has advanced, and if so, send a ChainUpdate about it.
239-
// Should only happen once every ~6hrs.
240-
// TODO.
241-
242268
// Try and get the next update from the mithril chain, and return it if we are
243269
// successful.
244-
update = self.next_from_mithril().await;
245-
if update.is_some() {
246-
break;
270+
if let Some(update) = self.next_from_mithril().await {
271+
self.update_current(&update);
272+
return Some(update);
247273
}
248274

249275
// No update from Mithril Data, so try and get one from the live chain.
250-
update = self.next_from_live_chain().await;
251-
if update.is_some() {
252-
break;
276+
if let Some(update) = self.next_from_live_chain().await {
277+
self.update_current(&update);
278+
return Some(update);
253279
}
254280

255281
// IF we can't get a new block directly from the mithril data, or the live chain, then
256282
// wait for something to change which might mean we can get the next block.
257-
let update = self.sync_updates.recv().await;
258-
match update {
283+
// Note, this is JUST a trigger, we don't process based on it other than to allow
284+
// a blocked follower to continue.
285+
let changed_data_trigger = self.sync_updates.recv().await;
286+
match changed_data_trigger {
259287
Ok(kind) => {
288+
// The KIND of event signaling changed data is not important, but we do log it
289+
// to help with debugging in case an update stops.
260290
debug!("Update kind: {kind}");
261291
},
262292
Err(tokio::sync::broadcast::error::RecvError::Lagged(distance)) => {
293+
// The update queue is small, its possible that it fills before a task can
294+
// read from it, this will cause this Lagged error.
295+
// BUT, because we don't care what the event was, this is as good as the missed
296+
// event. Therefore its not an error, and just log it at debug to help with
297+
// debugging the logic only.
263298
debug!("Lagged by {} updates", distance);
264299
},
265300
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
266-
// We are closed, so we need to wait for the next update.
301+
// The queue is closed, so we need to return that its no longer possible to
302+
// get data from this follower.
267303
// This is not an error.
268304
return None;
269305
},
270306
}
271307
}
272-
273-
// Update the current block, so we know which one to get next.
274-
if !self.update_current(update.as_ref()) {
275-
return None;
276-
}
277-
278-
update
279308
}
280309

281310
/// Get the next block from the follower.
@@ -405,36 +434,46 @@ mod tests {
405434
assert_eq!(follower.current, start);
406435
assert_eq!(follower.fork, 1.into());
407436
assert!(follower.mithril_follower.is_none());
408-
assert!(follower.mithril_tip.is_none());
437+
// assert!(follower.mithril_tip.is_none());
409438
}
410439

411440
#[tokio::test]
412-
async fn test_chain_follower_update_current_none() {
441+
async fn test_chain_follower_update_current() {
413442
let chain = Network::Mainnet;
414443
let start = Point::new(100u64.into(), [0; 32].into());
415444
let end = Point::fuzzy(999u64.into());
416445

417446
let mut follower = ChainFollower::new(chain, start.clone(), end.clone()).await;
418447

419-
let result = follower.update_current(None);
448+
let block_data = mock_block();
449+
let update = ChainUpdate::new(chain_update::Kind::Block, false, block_data);
420450

421-
assert!(!result);
451+
let old_current = follower.current.clone();
452+
follower.update_current(&update);
453+
454+
assert_eq!(follower.current, update.block_data().point());
455+
assert_eq!(follower.previous, old_current);
456+
assert_eq!(follower.fork, update.block_data().fork());
422457
}
423458

424459
#[tokio::test]
425-
async fn test_chain_follower_update_current() {
460+
async fn test_chain_follower_update_current_immutable_roll_forward() {
426461
let chain = Network::Mainnet;
427462
let start = Point::new(100u64.into(), [0; 32].into());
428463
let end = Point::fuzzy(999u64.into());
429464

430465
let mut follower = ChainFollower::new(chain, start.clone(), end.clone()).await;
431466

432467
let block_data = mock_block();
433-
let update = ChainUpdate::new(chain_update::Kind::Block, false, block_data);
468+
let update = ChainUpdate::new(
469+
chain_update::Kind::ImmutableBlockRollForward,
470+
false,
471+
block_data,
472+
);
434473

435-
let result = follower.update_current(Some(&update.clone()));
474+
let old_current = follower.current.clone();
475+
follower.update_current(&update);
436476

437-
assert!(result);
438-
assert_eq!(follower.current, update.block_data().point());
477+
assert_eq!(follower.current, old_current);
439478
}
440479
}

rust/cardano-chain-follower/src/mithril_snapshot.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ impl MithrilSnapshot {
7777

7878
/// Read a single block from a known point.
7979
#[allow(clippy::indexing_slicing)]
80-
#[logcall("debug")]
80+
//#[logcall("debug")]
8181
pub(crate) async fn read_block_at(&self, point: &Point) -> Option<MultiEraBlock> {
8282
if let Some(iterator) = self.try_read_blocks_from_point(point).await {
8383
let block = iterator.next().await;

0 commit comments

Comments
 (0)