Skip to content

Track segment indexes correctly during DSN sync #3521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
May 28, 2025
9 changes: 8 additions & 1 deletion crates/subspace-core-primitives/src/segments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl SegmentIndex {
source_first_piece_indices
}

/// Checked integer subtraction. Computes `self - rhs`, returning `None` if overflow occurred.
/// Checked integer subtraction. Computes `self - rhs`, returning `None` if underflow occurred.
#[inline]
pub const fn checked_sub(self, rhs: Self) -> Option<Self> {
// TODO: when Option::map becomes const, use it here
Expand All @@ -137,6 +137,13 @@ impl SegmentIndex {
None => None,
}
}

/// Saturating integer subtraction. Computes `self - rhs`, returning zero if underflow
/// occurred.
#[inline]
pub const fn saturating_sub(self, rhs: Self) -> Self {
Self(self.0.saturating_sub(rhs.0))
}
}

/// Segment commitment contained within segment header.
Expand Down
57 changes: 44 additions & 13 deletions crates/subspace-service/src/sync_from_dsn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,26 +303,40 @@ where
.chain_constants(info.best_hash)
.map_err(|error| error.to_string())?;

// Corresponds to contents of block one, everyone has it, so we consider it being processed
// right away
let mut last_processed_segment_index = SegmentIndex::ZERO;
// This is the last segment index that has been fully processed by DSN sync.
// If a segment has a partial block at the end, it is not fully processed until that block is
// processed.
//
// Segment zero corresponds to contents of the genesis block, everyone has it, so we consider it as
// processed right away.
let mut last_completed_segment_index = SegmentIndex::ZERO;

// This is the last block number that has been queued for import by DSN sync.
// (Or we've checked for its header and it has already been imported.)
//
// TODO: We'll be able to just take finalized block once we are able to decouple pruning from
// finality: https://github.com/paritytech/polkadot-sdk/issues/1570
let mut last_processed_block_number = info.best_number;
let segment_header_downloader = SegmentHeaderDownloader::new(node);

while let Some(reason) = notifications.next().await {
info!(
target: LOG_TARGET,
?reason,
?last_completed_segment_index,
?last_processed_block_number,
"Received notification to sync from DSN, deactivating substrate sync"
);
pause_sync.store(true, Ordering::Release);

info!(target: LOG_TARGET, ?reason, "Received notification to sync from DSN");
// TODO: Maybe handle failed block imports, additional helpful logging
let import_blocks_from_dsn_fut = import_blocks_from_dsn(
&segment_headers_store,
&segment_header_downloader,
client,
piece_getter,
import_queue_service,
&mut last_processed_segment_index,
&mut last_completed_segment_index,
&mut last_processed_block_number,
erasure_coding,
);
Expand All @@ -341,6 +355,12 @@ where
.map(|diff| diff < chain_constants.confirmation_depth_k().into())
.unwrap_or_default()
{
debug!(
target: LOG_TARGET,
best_block = ?info.best_number,
?target_block_number,
"Node is almost synced, stopping DSN sync until the next notification"
);
break;
}
}
Expand All @@ -349,27 +369,38 @@ where
select! {
result = import_blocks_from_dsn_fut.fuse() => {
if let Err(error) = result {
warn!(target: LOG_TARGET, %error, "Error when syncing blocks from DSN");
warn!(
target: LOG_TARGET,
%error,
?last_completed_segment_index,
?last_processed_block_number,
"Error when syncing blocks from DSN, stopping DSN sync until the next notification"
);
}
}
_ = wait_almost_synced_fut.fuse() => {
// Almost synced, DSN sync can't possibly help here
}
}

debug!(target: LOG_TARGET, "Finished DSN sync");
while notifications.try_next().is_ok() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was it needed to reorder this? It was at the end to logically show that we discard all the extra notifications that were issues while we were processing the previous notification. I don't see how it fixes or breaks anything being moved here, but I also don't understand why it is needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not particularly committed to this part of the change.

The block gap was sometimes being hidden by substrate sync, which was one reason the bug was hard to diagnose and fix.

Reducing the amount of time that the atomic is set to "true" in each loop reduces the chance that substrate sync hides any bugs - if there are a lot of sync notifications being sent continuously.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so the goal was to move it before atomic specifically. Wasn't obvious, but makes sense now, thanks.

// Just drain extra messages if there are any
}

// This will notify Substrate's sync mechanism and allow regular Substrate sync to continue
// gracefully
// Notify Substrate's sync mechanism to allow regular Substrate sync to continue
// gracefully. We do this at the end of the loop, to minimise race conditions which can
// hide DSN sync bugs.
debug!(
target: LOG_TARGET,
?last_completed_segment_index,
?last_processed_block_number,
"Finished DSN sync, activating substrate sync"
);
{
let info = client.info();
network_block.new_best_block_imported(info.best_hash, info.best_number);
}
pause_sync.store(false, Ordering::Release);

while notifications.try_next().is_ok() {
// Just drain extra messages if there are any
}
}

Ok(())
Expand Down
61 changes: 51 additions & 10 deletions crates/subspace-service/src/sync_from_dsn/import_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use sc_consensus::import_queue::ImportQueueService;
use sc_consensus::IncomingBlock;
use sc_consensus_subspace::archiver::{decode_block, encode_block, SegmentHeadersStore};
use sc_service::Error;
use sc_tracing::tracing::{debug, trace};
use sc_tracing::tracing::{debug, info, trace};
use sp_consensus::BlockOrigin;
use sp_runtime::generic::SignedBlock;
use sp_runtime::traits::{Block as BlockT, Header, NumberFor, One};
Expand Down Expand Up @@ -35,7 +35,7 @@ pub(super) async fn import_blocks_from_dsn<Block, AS, Client, PG, IQS>(
client: &Client,
piece_getter: &PG,
import_queue_service: &mut IQS,
last_processed_segment_index: &mut SegmentIndex,
last_completed_segment_index: &mut SegmentIndex,
last_processed_block_number: &mut NumberFor<Block>,
erasure_coding: &ErasureCoding,
) -> Result<u64, Error>
Expand Down Expand Up @@ -70,7 +70,7 @@ where
let mut imported_blocks = 0;
let mut reconstructor = Arc::new(Mutex::new(Reconstructor::new(erasure_coding.clone())));
// Start from the first unprocessed segment and process all segments known so far
let segment_indices_iter = (*last_processed_segment_index + SegmentIndex::ONE)
let segment_indices_iter = (*last_completed_segment_index + SegmentIndex::ONE)
..=segment_headers_store
.max_segment_index()
.expect("Exists, we have inserted segment headers above; qed");
Expand Down Expand Up @@ -105,20 +105,56 @@ where
// so it can't change. Resetting the reconstructor loses any partial blocks, so we
// only reset if the (possibly partial) last block has been processed.
if *last_processed_block_number >= last_archived_maybe_partial_block_number {
*last_processed_segment_index = segment_index;
debug!(
target: LOG_TARGET,
%segment_index,
%last_processed_block_number,
%last_archived_maybe_partial_block_number,
%last_archived_block_partial,
"Already processed last (possibly partial) block in segment, resetting reconstructor",
);
*last_completed_segment_index = segment_index;
// Reset reconstructor instance
reconstructor = Arc::new(Mutex::new(Reconstructor::new(erasure_coding.clone())));
continue;
}
// Just one partial unprocessed block and this was the last segment available, so nothing to
// import
// import. (But we also haven't finished this segment yet, because of the partial block.)
if last_archived_maybe_partial_block_number == *last_processed_block_number + One::one()
&& last_archived_block_partial
&& segment_indices_iter.peek().is_none()
{
// Reset reconstructor instance
reconstructor = Arc::new(Mutex::new(Reconstructor::new(erasure_coding.clone())));
continue;
if segment_indices_iter.peek().is_none() {
// We haven't fully processed this segment yet, because it ends with a partial block.
*last_completed_segment_index = segment_index.saturating_sub(SegmentIndex::ONE);

// We don't need to reset the reconstructor here. We've finished getting blocks, so
// we're about to return and drop the reconstructor and its partial block anyway.
// (Normally, we'd need that partial block to avoid a block gap. But we should be close
// enough to the tip that normal syncing will fill any gaps.)
debug!(
target: LOG_TARGET,
%segment_index,
%last_processed_block_number,
%last_archived_maybe_partial_block_number,
%last_archived_block_partial,
"No more segments, snap sync is about to finish",
);
continue;
} else {
// Downloading an entire segment for one partial block should be rare, but if it
// happens a lot we want to see it in the logs.
//
// TODO: if this happens a lot, check for network/DSN sync bugs - we should be able
// to sync to near the tip reliably, so we don't have to keep reconstructor state.
info!(
target: LOG_TARGET,
%segment_index,
%last_processed_block_number,
%last_archived_maybe_partial_block_number,
%last_archived_block_partial,
"Downloading entire segment for one partial block",
);
}
}

let segment_pieces = download_segment_pieces(segment_index, piece_getter)
Expand Down Expand Up @@ -239,7 +275,12 @@ where
import_queue_service.import_blocks(BlockOrigin::NetworkInitialSync, blocks_to_import);
}

*last_processed_segment_index = segment_index;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem is in this line in the main branch:

  • when we update the segment index above, we check we've fully processed all blocks from that segment
  • but when we update it here, we don't check for a partial block at the end of the segment
        // Segments are only fully processed when all their blocks are processed.
        if last_archived_block_partial {
            *last_processed_segment_index = segment_index.saturating_sub(1);
        } else {
           *last_processed_segment_index = segment_index;
        } 

// Segments are only fully processed when all their blocks are fully processed.
if last_archived_block_partial {
*last_completed_segment_index = segment_index.saturating_sub(SegmentIndex::ONE);
} else {
*last_completed_segment_index = segment_index;
}
}

Ok(imported_blocks)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ use subspace_networking::Node;
use tracing::{debug, error, trace, warn};

const SEGMENT_HEADER_NUMBER_PER_REQUEST: u64 = 1000;
/// Initial number of peers to query for last segment header
/// Initial number of peers to query for last segment header.
const SEGMENT_HEADER_CONSENSUS_INITIAL_NODES: usize = 20;
/// How many distinct peers to try when downloading segment headers
const SEGMENT_HEADER_PEERS_RETRIES: u32 = 10;
/// How many distinct peers to try before giving up on downloading segment headers batches.
const SEGMENT_HEADER_PEERS_RETRIES: u32 = 20;

/// Helps downloader segment headers from DSN
pub struct SegmentHeaderDownloader {
Expand Down
2 changes: 1 addition & 1 deletion domains/client/domain-operator/src/snap_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ where
{
let block_number = *header.number();

const STATE_SYNC_RETRIES: u32 = 5;
const STATE_SYNC_RETRIES: u32 = 20;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need such high state sync retries ?
So far 5 was sufficient.

Not opposing it but want to know the reason behind the change

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was sufficient for some nodes, but not others. Both Jim and I got failures here during testing, and the state sync is small, so it's cheaper to retry a few more times here, than give up and start the entire process again (or require manual operator intervention).

const LOOP_PAUSE: Duration = Duration::from_secs(20);
const MAX_GET_PEERS_ATTEMPT_NUMBER: usize = 30;

Expand Down
Loading