Skip to content

Commit 93a0a25

Browse files
authored
Merge pull request #3521 from autonomys/keep-reconstructor-state
Track segment indexes correctly during DSN sync
2 parents 085ec66 + 083b8d3 commit 93a0a25

File tree

5 files changed

+107
-28
lines changed

5 files changed

+107
-28
lines changed

crates/subspace-core-primitives/src/segments.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ impl SegmentIndex {
128128
source_first_piece_indices
129129
}
130130

131-
/// Checked integer subtraction. Computes `self - rhs`, returning `None` if overflow occurred.
131+
/// Checked integer subtraction. Computes `self - rhs`, returning `None` if underflow occurred.
132132
#[inline]
133133
pub const fn checked_sub(self, rhs: Self) -> Option<Self> {
134134
// TODO: when Option::map becomes const, use it here
@@ -137,6 +137,13 @@ impl SegmentIndex {
137137
None => None,
138138
}
139139
}
140+
141+
/// Saturating integer subtraction. Computes `self - rhs`, returning zero if underflow
142+
/// occurred.
143+
#[inline]
144+
pub const fn saturating_sub(self, rhs: Self) -> Self {
145+
Self(self.0.saturating_sub(rhs.0))
146+
}
140147
}
141148

142149
/// Segment commitment contained within segment header.

crates/subspace-service/src/sync_from_dsn.rs

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -303,26 +303,40 @@ where
303303
.chain_constants(info.best_hash)
304304
.map_err(|error| error.to_string())?;
305305

306-
// Corresponds to contents of block one, everyone has it, so we consider it being processed
307-
// right away
308-
let mut last_processed_segment_index = SegmentIndex::ZERO;
306+
// This is the last segment index that has been fully processed by DSN sync.
307+
// If a segment has a partial block at the end, it is not fully processed until that block is
308+
// processed.
309+
//
310+
// Segment zero corresponds to contents of the genesis block, everyone has it, so we consider it as
311+
// processed right away.
312+
let mut last_completed_segment_index = SegmentIndex::ZERO;
313+
314+
// This is the last block number that has been queued for import by DSN sync.
315+
// (Or we've checked for its header and it has already been imported.)
316+
//
309317
// TODO: We'll be able to just take finalized block once we are able to decouple pruning from
310318
// finality: https://github.com/paritytech/polkadot-sdk/issues/1570
311319
let mut last_processed_block_number = info.best_number;
312320
let segment_header_downloader = SegmentHeaderDownloader::new(node);
313321

314322
while let Some(reason) = notifications.next().await {
323+
info!(
324+
target: LOG_TARGET,
325+
?reason,
326+
?last_completed_segment_index,
327+
?last_processed_block_number,
328+
"Received notification to sync from DSN, deactivating substrate sync"
329+
);
315330
pause_sync.store(true, Ordering::Release);
316331

317-
info!(target: LOG_TARGET, ?reason, "Received notification to sync from DSN");
318332
// TODO: Maybe handle failed block imports, additional helpful logging
319333
let import_blocks_from_dsn_fut = import_blocks_from_dsn(
320334
&segment_headers_store,
321335
&segment_header_downloader,
322336
client,
323337
piece_getter,
324338
import_queue_service,
325-
&mut last_processed_segment_index,
339+
&mut last_completed_segment_index,
326340
&mut last_processed_block_number,
327341
erasure_coding,
328342
);
@@ -341,6 +355,12 @@ where
341355
.map(|diff| diff < chain_constants.confirmation_depth_k().into())
342356
.unwrap_or_default()
343357
{
358+
debug!(
359+
target: LOG_TARGET,
360+
best_block = ?info.best_number,
361+
?target_block_number,
362+
"Node is almost synced, stopping DSN sync until the next notification"
363+
);
344364
break;
345365
}
346366
}
@@ -349,27 +369,38 @@ where
349369
select! {
350370
result = import_blocks_from_dsn_fut.fuse() => {
351371
if let Err(error) = result {
352-
warn!(target: LOG_TARGET, %error, "Error when syncing blocks from DSN");
372+
warn!(
373+
target: LOG_TARGET,
374+
%error,
375+
?last_completed_segment_index,
376+
?last_processed_block_number,
377+
"Error when syncing blocks from DSN, stopping DSN sync until the next notification"
378+
);
353379
}
354380
}
355381
_ = wait_almost_synced_fut.fuse() => {
356382
// Almost synced, DSN sync can't possibly help here
357383
}
358384
}
359385

360-
debug!(target: LOG_TARGET, "Finished DSN sync");
386+
while notifications.try_next().is_ok() {
387+
// Just drain extra messages if there are any
388+
}
361389

362-
// This will notify Substrate's sync mechanism and allow regular Substrate sync to continue
363-
// gracefully
390+
// Notify Substrate's sync mechanism to allow regular Substrate sync to continue
391+
// gracefully. We do this at the end of the loop, to minimise race conditions which can
392+
// hide DSN sync bugs.
393+
debug!(
394+
target: LOG_TARGET,
395+
?last_completed_segment_index,
396+
?last_processed_block_number,
397+
"Finished DSN sync, activating substrate sync"
398+
);
364399
{
365400
let info = client.info();
366401
network_block.new_best_block_imported(info.best_hash, info.best_number);
367402
}
368403
pause_sync.store(false, Ordering::Release);
369-
370-
while notifications.try_next().is_ok() {
371-
// Just drain extra messages if there are any
372-
}
373404
}
374405

375406
Ok(())

crates/subspace-service/src/sync_from_dsn/import_blocks.rs

Lines changed: 51 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use sc_consensus::import_queue::ImportQueueService;
55
use sc_consensus::IncomingBlock;
66
use sc_consensus_subspace::archiver::{decode_block, encode_block, SegmentHeadersStore};
77
use sc_service::Error;
8-
use sc_tracing::tracing::{debug, trace};
8+
use sc_tracing::tracing::{debug, info, trace};
99
use sp_consensus::BlockOrigin;
1010
use sp_runtime::generic::SignedBlock;
1111
use sp_runtime::traits::{Block as BlockT, Header, NumberFor, One};
@@ -35,7 +35,7 @@ pub(super) async fn import_blocks_from_dsn<Block, AS, Client, PG, IQS>(
3535
client: &Client,
3636
piece_getter: &PG,
3737
import_queue_service: &mut IQS,
38-
last_processed_segment_index: &mut SegmentIndex,
38+
last_completed_segment_index: &mut SegmentIndex,
3939
last_processed_block_number: &mut NumberFor<Block>,
4040
erasure_coding: &ErasureCoding,
4141
) -> Result<u64, Error>
@@ -70,7 +70,7 @@ where
7070
let mut imported_blocks = 0;
7171
let mut reconstructor = Arc::new(Mutex::new(Reconstructor::new(erasure_coding.clone())));
7272
// Start from the first unprocessed segment and process all segments known so far
73-
let segment_indices_iter = (*last_processed_segment_index + SegmentIndex::ONE)
73+
let segment_indices_iter = (*last_completed_segment_index + SegmentIndex::ONE)
7474
..=segment_headers_store
7575
.max_segment_index()
7676
.expect("Exists, we have inserted segment headers above; qed");
@@ -105,20 +105,56 @@ where
105105
// so it can't change. Resetting the reconstructor loses any partial blocks, so we
106106
// only reset if the (possibly partial) last block has been processed.
107107
if *last_processed_block_number >= last_archived_maybe_partial_block_number {
108-
*last_processed_segment_index = segment_index;
108+
debug!(
109+
target: LOG_TARGET,
110+
%segment_index,
111+
%last_processed_block_number,
112+
%last_archived_maybe_partial_block_number,
113+
%last_archived_block_partial,
114+
"Already processed last (possibly partial) block in segment, resetting reconstructor",
115+
);
116+
*last_completed_segment_index = segment_index;
109117
// Reset reconstructor instance
110118
reconstructor = Arc::new(Mutex::new(Reconstructor::new(erasure_coding.clone())));
111119
continue;
112120
}
113121
// Just one partial unprocessed block and this was the last segment available, so nothing to
114-
// import
122+
// import. (But we also haven't finished this segment yet, because of the partial block.)
115123
if last_archived_maybe_partial_block_number == *last_processed_block_number + One::one()
116124
&& last_archived_block_partial
117-
&& segment_indices_iter.peek().is_none()
118125
{
119-
// Reset reconstructor instance
120-
reconstructor = Arc::new(Mutex::new(Reconstructor::new(erasure_coding.clone())));
121-
continue;
126+
if segment_indices_iter.peek().is_none() {
127+
// We haven't fully processed this segment yet, because it ends with a partial block.
128+
*last_completed_segment_index = segment_index.saturating_sub(SegmentIndex::ONE);
129+
130+
// We don't need to reset the reconstructor here. We've finished getting blocks, so
131+
// we're about to return and drop the reconstructor and its partial block anyway.
132+
// (Normally, we'd need that partial block to avoid a block gap. But we should be close
133+
// enough to the tip that normal syncing will fill any gaps.)
134+
debug!(
135+
target: LOG_TARGET,
136+
%segment_index,
137+
%last_processed_block_number,
138+
%last_archived_maybe_partial_block_number,
139+
%last_archived_block_partial,
140+
"No more segments, snap sync is about to finish",
141+
);
142+
continue;
143+
} else {
144+
// Downloading an entire segment for one partial block should be rare, but if it
145+
// happens a lot we want to see it in the logs.
146+
//
147+
// TODO: if this happens a lot, check for network/DSN sync bugs - we should be able
148+
// to sync to near the tip reliably, so we don't have to keep reconstructor state.
149+
info!(
150+
target: LOG_TARGET,
151+
%segment_index,
152+
%last_processed_block_number,
153+
%last_archived_maybe_partial_block_number,
154+
%last_archived_block_partial,
155+
"Downloading entire segment for one partial block",
156+
);
157+
}
122158
}
123159

124160
let segment_pieces = download_segment_pieces(segment_index, piece_getter)
@@ -239,7 +275,12 @@ where
239275
import_queue_service.import_blocks(BlockOrigin::NetworkInitialSync, blocks_to_import);
240276
}
241277

242-
*last_processed_segment_index = segment_index;
278+
// Segments are only fully processed when all their blocks are fully processed.
279+
if last_archived_block_partial {
280+
*last_completed_segment_index = segment_index.saturating_sub(SegmentIndex::ONE);
281+
} else {
282+
*last_completed_segment_index = segment_index;
283+
}
243284
}
244285

245286
Ok(imported_blocks)

crates/subspace-service/src/sync_from_dsn/segment_header_downloader.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ use subspace_networking::Node;
1313
use tracing::{debug, error, trace, warn};
1414

1515
const SEGMENT_HEADER_NUMBER_PER_REQUEST: u64 = 1000;
16-
/// Initial number of peers to query for last segment header
16+
/// Initial number of peers to query for last segment header.
1717
const SEGMENT_HEADER_CONSENSUS_INITIAL_NODES: usize = 20;
18-
/// How many distinct peers to try when downloading segment headers
19-
const SEGMENT_HEADER_PEERS_RETRIES: u32 = 10;
18+
/// How many distinct peers to try before giving up on downloading segment headers batches.
19+
const SEGMENT_HEADER_PEERS_RETRIES: u32 = 20;
2020

2121
/// Helps downloader segment headers from DSN
2222
pub struct SegmentHeaderDownloader {

domains/client/domain-operator/src/snap_sync.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,7 @@ where
416416
{
417417
let block_number = *header.number();
418418

419-
const STATE_SYNC_RETRIES: u32 = 5;
419+
const STATE_SYNC_RETRIES: u32 = 20;
420420
const LOOP_PAUSE: Duration = Duration::from_secs(20);
421421
const MAX_GET_PEERS_ATTEMPT_NUMBER: usize = 30;
422422

0 commit comments

Comments
 (0)