Skip to content

Commit

Permalink
repair: send stake weights and slot number sooner
Browse files Browse the repository at this point in the history
  • Loading branch information
ibhatt-jumptrading committed Jan 10, 2025
1 parent 4687dfa commit e0465c9
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 50 deletions.
64 changes: 53 additions & 11 deletions src/app/fdctl/run/tiles/fd_replay.c
Original file line number Diff line number Diff line change
Expand Up @@ -1804,6 +1804,18 @@ tpool_boot( fd_topo_t * topo, ulong total_thread_count ) {
fd_tile_private_map_boot( tile_to_cpu, thread_count );
}

static void
kickoff_repair_orphans( fd_replay_tile_ctx_t * ctx, fd_stem_context_t * stem ) {

fd_blockstore_start_write( ctx->slot_ctx->blockstore );
fd_blockstore_init( ctx->slot_ctx->blockstore, ctx->blockstore_fd, FD_BLOCKSTORE_ARCHIVE_MIN_SIZE, &ctx->slot_ctx->slot_bank );
fd_blockstore_end_write( ctx->slot_ctx->blockstore );

publish_stake_weights( ctx, stem, ctx->slot_ctx );
fd_fseq_update( ctx->wmk, ctx->slot_ctx->slot_bank.slot );

}

static void
read_snapshot( void * _ctx,
fd_stem_context_t * stem,
Expand All @@ -1827,7 +1839,42 @@ read_snapshot( void * _ctx,
/* Funk already has a snapshot loaded */
fd_runtime_recover_banks( ctx->slot_ctx, 0, 1 );
} else {
fd_snapshot_load_all( snapshot, ctx->slot_ctx, ctx->tpool, false, false, FD_SNAPSHOT_TYPE_FULL );

/* If we have an incremental snapshot try to prefetch the snapshot slot
and manifest as soon as possible. In order to kick off repair effectively
we need the snapshot slot and the stake weights. These are both available
in the manifest. We will try to load in the manifest from the latest
snapshot that is availble, then setup the blockstore and publish the
stake weights. After this, repair will kick off concurrently with loading
the rest of the snapshots. */

if( strlen( incremental )>0UL ) {
uchar * tmp_mem = fd_scratch_alloc( fd_snapshot_load_ctx_align(), fd_snapshot_load_ctx_footprint() );
fd_snapshot_load_ctx_t * tmp_snap_ctx = fd_snapshot_load_new( tmp_mem, incremental, ctx->slot_ctx, ctx->tpool, false, false, FD_SNAPSHOT_TYPE_FULL );
fd_snapshot_load_prefetch_manifest( tmp_snap_ctx );
kickoff_repair_orphans( ctx, stem );
}

/* In order to kick off repair effectively we need the snapshot slot and
the stake weights. These are both available in the manifest. We will
try to load in the manifest from the latest snapshot that is availble,
then setup the blockstore and publish the stake weights. After this,
repair will kick off concurrently with loading the rest of the snapshots. */

uchar * mem = fd_scratch_alloc( fd_snapshot_load_ctx_align(), fd_snapshot_load_ctx_footprint() );
fd_snapshot_load_ctx_t * snap_ctx = fd_snapshot_load_new( mem, snapshot, ctx->slot_ctx, ctx->tpool, false, false, FD_SNAPSHOT_TYPE_FULL );

fd_snapshot_load_init( snap_ctx );
fd_snapshot_load_manifest_and_status_cache( snap_ctx );

if( strlen( incremental )<=0UL ) {
/* If we don't have an incremental snapshot, we can still kick off
sending the stake weights and snapshot slot to repair. */
kickoff_repair_orphans( ctx, stem );
}

fd_snapshot_load_accounts( snap_ctx );
fd_snapshot_load_fini( snap_ctx );
}

/* Load incremental */
Expand Down Expand Up @@ -1874,7 +1921,7 @@ init_after_snapshot( fd_replay_tile_ctx_t * ctx ) {
fd_runtime_update_leaders(ctx->slot_ctx, ctx->slot_ctx->slot_bank.slot);

ctx->slot_ctx->slot_bank.prev_slot = 0UL;
ctx->slot_ctx->slot_bank.slot = 1UL;
ctx->slot_ctx->slot_bank.slot = 1UL;

ulong hashcnt_per_slot = ctx->slot_ctx->epoch_ctx->epoch_bank.hashes_per_tick * ctx->slot_ctx->epoch_ctx->epoch_bank.ticks_per_slot;
while(hashcnt_per_slot--) {
Expand All @@ -1886,27 +1933,23 @@ init_after_snapshot( fd_replay_tile_ctx_t * ctx ) {
FD_TEST( fd_runtime_block_execute_finalize_tpool( ctx->slot_ctx, NULL, &info, ctx->tpool ) == 0 );

ctx->slot_ctx->slot_bank.prev_slot = 0UL;
ctx->slot_ctx->slot_bank.slot = 1UL;
snapshot_slot = 1UL;
ctx->slot_ctx->slot_bank.slot = 1UL;
snapshot_slot = 1UL;

FD_LOG_NOTICE(( "starting fd_bpf_scan_and_create_bpf_program_cache_entry..." ));
fd_funk_start_write( ctx->slot_ctx->acc_mgr->funk );
fd_bpf_scan_and_create_bpf_program_cache_entry_tpool( ctx->slot_ctx, ctx->slot_ctx->funk_txn, ctx->tpool );
fd_funk_end_write( ctx->slot_ctx->acc_mgr->funk );
FD_LOG_NOTICE(( "finished fd_bpf_scan_and_create_bpf_program_cache_entry..." ));

fd_blockstore_start_write( ctx->slot_ctx->blockstore );
fd_blockstore_init( ctx->slot_ctx->blockstore, ctx->blockstore_fd, FD_BLOCKSTORE_ARCHIVE_MIN_SIZE, &ctx->slot_ctx->slot_bank );
fd_blockstore_end_write( ctx->slot_ctx->blockstore );
}
fd_fseq_update( ctx->wmk, snapshot_slot );

ctx->curr_slot = snapshot_slot;
ctx->parent_slot = ctx->slot_ctx->slot_bank.prev_slot;
ctx->snapshot_slot = snapshot_slot;
ctx->blockhash = ( fd_hash_t ){ .hash = { 0 } };
ctx->flags = 0;
ctx->txn_cnt = 0;
ctx->flags = 0UL;
ctx->txn_cnt = 0UL;

/* Initialize consensus structures post-snapshot */

Expand Down Expand Up @@ -1972,7 +2015,6 @@ init_snapshot( fd_replay_tile_ctx_t * ctx,
ctx->epoch_ctx->bank_hash_cmp = ctx->bank_hash_cmp;
init_after_snapshot( ctx );

publish_stake_weights( ctx, stem, ctx->slot_ctx );
} FD_SCRATCH_SCOPE_END;


Expand Down
3 changes: 2 additions & 1 deletion src/flamenco/runtime/fd_blockstore.c
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,8 @@ wrap_offset( fd_blockstore_archiver_t * archvr, ulong off ) {

/* Build the archival file index */

static inline void build_idx( fd_blockstore_t * blockstore, int fd ) {
static inline void FD_FN_UNUSED
build_idx( fd_blockstore_t * blockstore, int fd ) {
if ( FD_UNLIKELY( fd == -1 ) ) {
return;
}
Expand Down
84 changes: 67 additions & 17 deletions src/flamenco/snapshot/fd_snapshot.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
#include <assert.h>
#include <errno.h>

/* FIXME: don't hardcode this param */
#define ZSTD_WINDOW_SZ (33554432UL)

struct fd_snapshot_load_ctx {
/* User-defined parameters. */
const char * snapshot_file;
Expand All @@ -31,12 +34,13 @@ struct fd_snapshot_load_ctx {
typedef struct fd_snapshot_load_ctx fd_snapshot_load_ctx_t;

static void
fd_hashes_load(fd_exec_slot_ctx_t * slot_ctx) {
FD_BORROWED_ACCOUNT_DECL(block_hashes_rec);
int err = fd_acc_mgr_view(slot_ctx->acc_mgr, slot_ctx->funk_txn, &fd_sysvar_recent_block_hashes_id, block_hashes_rec);
fd_hashes_load( fd_exec_slot_ctx_t * slot_ctx ) {
FD_BORROWED_ACCOUNT_DECL( block_hashes_rec );
int err = fd_acc_mgr_view( slot_ctx->acc_mgr, slot_ctx->funk_txn, &fd_sysvar_recent_block_hashes_id, block_hashes_rec );

if( err != FD_ACC_MGR_SUCCESS )
if( err != FD_ACC_MGR_SUCCESS ) {
FD_LOG_ERR(( "missing recent block hashes account" ));
}

fd_bincode_decode_ctx_t ctx = {
.data = block_hashes_rec->const_data,
Expand All @@ -46,15 +50,17 @@ fd_hashes_load(fd_exec_slot_ctx_t * slot_ctx) {

fd_recent_block_hashes_decode( &slot_ctx->slot_bank.recent_block_hashes, &ctx );

/* FIXME: Do not hardcode the number of vote accounts */

slot_ctx->slot_bank.stake_account_keys.stake_accounts_root = NULL;
slot_ctx->slot_bank.stake_account_keys.stake_accounts_pool = fd_stake_accounts_pair_t_map_alloc(slot_ctx->valloc, 100000);
slot_ctx->slot_bank.stake_account_keys.stake_accounts_pool = fd_stake_accounts_pair_t_map_alloc( slot_ctx->valloc, 100000UL );

slot_ctx->slot_bank.vote_account_keys.vote_accounts_root = NULL;
slot_ctx->slot_bank.vote_account_keys.vote_accounts_pool = fd_vote_accounts_pair_t_map_alloc(slot_ctx->valloc, 100000);
slot_ctx->slot_bank.vote_account_keys.vote_accounts_pool = fd_vote_accounts_pair_t_map_alloc( slot_ctx->valloc, 100000UL );

slot_ctx->slot_bank.collected_execution_fees = 0;
slot_ctx->slot_bank.collected_priority_fees = 0;
slot_ctx->slot_bank.collected_rent = 0;
slot_ctx->slot_bank.collected_execution_fees = 0UL;
slot_ctx->slot_bank.collected_priority_fees = 0UL;
slot_ctx->slot_bank.collected_rent = 0UL;

fd_runtime_save_slot_bank( slot_ctx );
fd_runtime_save_epoch_bank( slot_ctx );
Expand Down Expand Up @@ -127,8 +133,7 @@ fd_snapshot_load_init( fd_snapshot_load_ctx_t * ctx ) {
memset( &xid, 0xc3, sizeof(xid) );
ctx->child_txn = fd_funk_txn_prepare( ctx->slot_ctx->acc_mgr->funk, ctx->child_txn, &xid, 0 );
ctx->slot_ctx->funk_txn = ctx->child_txn;
}

}
}

void
Expand All @@ -139,9 +144,6 @@ fd_snapshot_load_manifest_and_status_cache( fd_snapshot_load_ctx_t * ctx ) {
char * snapshot_cstr = fd_scratch_alloc( 1UL, slen + 1 );
fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( snapshot_cstr ), ctx->snapshot_file, slen ) );

/* FIXME don't hardcode this param */
static ulong const zstd_window_sz = 33554432UL;

fd_snapshot_src_t src[1];
if( FD_UNLIKELY( !fd_snapshot_src_parse( src, snapshot_cstr ) ) ) {
FD_LOG_ERR(( "Failed to load snapshot" ));
Expand All @@ -154,18 +156,18 @@ fd_snapshot_load_manifest_and_status_cache( fd_snapshot_load_ctx_t * ctx ) {
fd_funk_txn_t * funk_txn = ctx->slot_ctx->funk_txn;

void * restore_mem = fd_valloc_malloc( valloc, fd_snapshot_restore_align(), fd_snapshot_restore_footprint() );
void * loader_mem = fd_valloc_malloc( valloc, fd_snapshot_loader_align(), fd_snapshot_loader_footprint( zstd_window_sz ) );
void * loader_mem = fd_valloc_malloc( valloc, fd_snapshot_loader_align(), fd_snapshot_loader_footprint( ZSTD_WINDOW_SZ ) );

ctx->restore = fd_snapshot_restore_new( restore_mem, acc_mgr, funk_txn, valloc, ctx->slot_ctx, restore_manifest, restore_status_cache );
ctx->loader = fd_snapshot_loader_new ( loader_mem, zstd_window_sz );
ctx->loader = fd_snapshot_loader_new ( loader_mem, ZSTD_WINDOW_SZ );

if( FD_UNLIKELY( !ctx->restore || !ctx->loader ) ) {
fd_valloc_free( valloc, fd_snapshot_loader_delete ( ctx->loader ) );
fd_valloc_free( valloc, fd_snapshot_restore_delete( ctx->restore ) );
FD_LOG_ERR(( "Failed to load snapshot" ));
}

if( FD_UNLIKELY( !fd_snapshot_loader_init( ctx->loader, ctx->restore, src, ctx->slot_ctx->slot_bank.slot ) ) ) {
if( FD_UNLIKELY( !fd_snapshot_loader_init( ctx->loader, ctx->restore, src, ctx->slot_ctx->slot_bank.slot, 1 ) ) ) {
FD_LOG_ERR(( "Failed to init snapshot loader" ));
}

Expand Down Expand Up @@ -282,3 +284,51 @@ fd_snapshot_load_all( const char * source_cstr,

} FD_SCRATCH_SCOPE_END;
}

void
fd_snapshot_load_prefetch_manifest( fd_snapshot_load_ctx_t * ctx ) {

fd_funk_start_write( ctx->slot_ctx->acc_mgr->funk );

size_t slen = strlen( ctx->snapshot_file );
char * snapshot_cstr = fd_scratch_alloc( 8UL, slen + 1 );
fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( snapshot_cstr ), ctx->snapshot_file, slen ) );

fd_snapshot_src_t src[1];
if( FD_UNLIKELY( !fd_snapshot_src_parse( src, snapshot_cstr ) ) ) {
FD_LOG_ERR(( "Failed to load snapshot" ));
}

fd_valloc_t valloc = ctx->slot_ctx->valloc;
fd_acc_mgr_t * acc_mgr = ctx->slot_ctx->acc_mgr;
fd_funk_txn_t * funk_txn = ctx->slot_ctx->funk_txn;

void * restore_mem = fd_valloc_malloc( valloc, fd_snapshot_restore_align(), fd_snapshot_restore_footprint() );
void * loader_mem = fd_valloc_malloc( valloc, fd_snapshot_loader_align(), fd_snapshot_loader_footprint( ZSTD_WINDOW_SZ ) );

ctx->restore = fd_snapshot_restore_new( restore_mem, acc_mgr, funk_txn, valloc, ctx->slot_ctx, restore_manifest, restore_status_cache );
ctx->loader = fd_snapshot_loader_new ( loader_mem, ZSTD_WINDOW_SZ );

if( FD_UNLIKELY( !ctx->restore || !ctx->loader ) ) {
fd_valloc_free( valloc, fd_snapshot_loader_delete ( ctx->loader ) );
fd_valloc_free( valloc, fd_snapshot_restore_delete( ctx->restore ) );
FD_LOG_ERR(( "Failed to load snapshot" ));
}

if( FD_UNLIKELY( !fd_snapshot_loader_init( ctx->loader, ctx->restore, src, ctx->slot_ctx->slot_bank.slot, 0 ) ) ) {
FD_LOG_ERR(( "Failed to init snapshot loader" ));
}

/* First load in the manifest. */
for(;;) {
int err = fd_snapshot_loader_advance( ctx->loader );
if( err==MANIFEST_DONE ) break; /* We have finished loading in the manifest. */
if( FD_LIKELY( !err ) ) continue; /* Keep going. */

/* If we have reached the end of the snapshot(err==-1), throw an error because
this is not expected. */
FD_LOG_ERR(( "Failed to load snapshot (%d-%s)", err, fd_io_strerror( err ) ));
}

fd_funk_end_write( ctx->slot_ctx->acc_mgr->funk );
}
3 changes: 3 additions & 0 deletions src/flamenco/snapshot/fd_snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ fd_snapshot_load_all( const char * source_cstr,
uint check_hash,
int snapshot_type );

void
fd_snapshot_load_prefetch_manifest( fd_snapshot_load_ctx_t * ctx );

FD_PROTOTYPES_END

#endif /* FD_HAS_ZSTD */
Expand Down
27 changes: 16 additions & 11 deletions src/flamenco/snapshot/fd_snapshot_base.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,18 @@
fd_snapshot_name_t *
fd_snapshot_name_from_buf( fd_snapshot_name_t * id,
char const * str,
ulong str_len,
ulong base_slot ) {
ulong str_len ) {
char buf[ 4096 ];
str_len = fd_ulong_min( sizeof(buf)-1, str_len );
fd_memcpy( buf, str, str_len );
buf[ str_len ] = '\0';

return fd_snapshot_name_from_cstr( id, buf, base_slot );
return fd_snapshot_name_from_cstr( id, buf );
}

fd_snapshot_name_t *
fd_snapshot_name_from_cstr( fd_snapshot_name_t * id,
char const * cstr,
ulong base_slot ) {
char const * cstr ) {

fd_memset( id, 0, sizeof(fd_snapshot_name_t) );

Expand Down Expand Up @@ -53,12 +51,6 @@ fd_snapshot_name_from_cstr( fd_snapshot_name_t * id,
return NULL;
}
cstr = endptr+1;

if( base_slot != id->slot ) {
FD_LOG_WARNING(( "failed to load snapshot: \"%s\"", orig_cstr ));
FD_LOG_WARNING(( "expected base slot %lu but got %lu, incremental snapshot does not match full snapshot", base_slot, id->slot ));
return NULL;
}
}

char const * file_ext = strchr( cstr, '.' );
Expand All @@ -77,3 +69,16 @@ fd_snapshot_name_from_cstr( fd_snapshot_name_t * id,
}
return id;
}

int
fd_snapshot_name_slot_validate( fd_snapshot_name_t * id, ulong base_slot ) {

if( id->type==FD_SNAPSHOT_TYPE_INCREMENTAL ) {
if( base_slot != id->slot ) {
FD_LOG_WARNING(( "expected base slot %lu but got %lu, incremental snapshot does not match full snapshot", base_slot, id->slot ));
return -1;
}
}

return 0;
}
11 changes: 7 additions & 4 deletions src/flamenco/snapshot/fd_snapshot_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,17 @@ FD_PROTOTYPES_BEGIN

fd_snapshot_name_t *
fd_snapshot_name_from_cstr( fd_snapshot_name_t * id,
char const * cstr,
ulong base_slot );
char const * cstr );

fd_snapshot_name_t *
fd_snapshot_name_from_buf( fd_snapshot_name_t * id,
char const * str,
ulong str_len,
ulong base_slot );
ulong str_len );

int
fd_snapshot_name_slot_validate( fd_snapshot_name_t * id,
ulong base_slot );


FD_PROTOTYPES_END

Expand Down
4 changes: 2 additions & 2 deletions src/flamenco/snapshot/fd_snapshot_http.c
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ fd_snapshot_http_follow_redirect( fd_snapshot_http_t * this,

FD_LOG_NOTICE(( "Following redirect to %.*s", (int)loc_len, loc ));

if( FD_UNLIKELY( !fd_snapshot_name_from_buf( this->name_out, loc, loc_len, this->base_slot ) ) ) {
if( FD_UNLIKELY( !fd_snapshot_name_from_buf( this->name_out, loc, loc_len ) ) ) {
return EPROTO;
}

Expand Down Expand Up @@ -387,7 +387,7 @@ fd_snapshot_http_resp( fd_snapshot_http_t * this ) {
if( FD_UNLIKELY( this->name_out->type == FD_SNAPSHOT_TYPE_UNSPECIFIED ) ) {
/* We must not have followed a redirect. Try to parse here. */
ulong off = (ulong)this->path_off + 4;
if( FD_UNLIKELY( !fd_snapshot_name_from_buf( this->name_out, this->path + off, sizeof(this->path) - off, this->base_slot ) ) ) {
if( FD_UNLIKELY( !fd_snapshot_name_from_buf( this->name_out, this->path + off, sizeof(this->path) - off ) ) ) {
FD_LOG_WARNING(( "Cannot download, snapshot hash is unknown" ));
this->state = FD_SNAPSHOT_HTTP_STATE_FAIL;
return EINVAL;
Expand Down
Loading

0 comments on commit e0465c9

Please sign in to comment.