From a439935c6eb00d1d77b4032fd0523bbcc9589e56 Mon Sep 17 00:00:00 2001 From: ibhatt-jumptrading Date: Thu, 9 Jan 2025 20:37:34 +0000 Subject: [PATCH] snapshot: break out snapshot load into functions --- src/app/fdctl/run/tiles/fd_replay.c | 4 +- src/app/ledger/main.c | 16 +- src/disco/consensus/test_consensus.c | 2 +- src/flamenco/snapshot/fd_snapshot.c | 255 ++++++++++++------ src/flamenco/snapshot/fd_snapshot.h | 72 ++++- src/flamenco/snapshot/fd_snapshot_http.h | 5 +- src/flamenco/snapshot/fd_snapshot_istream.c | 13 +- src/flamenco/snapshot/fd_snapshot_loader.c | 20 +- src/flamenco/snapshot/fd_snapshot_loader.h | 20 +- src/flamenco/snapshot/fd_snapshot_restore.c | 8 +- src/flamenco/snapshot/fd_snapshot_restore.h | 9 + .../snapshot/fd_snapshot_restore_private.h | 2 +- src/flamenco/snapshot/test_snapshot_restore.c | 24 +- src/util/archive/fd_tar.h | 8 +- src/util/archive/fd_tar_reader.c | 19 +- src/util/archive/fuzz_tar.c | 4 +- 16 files changed, 328 insertions(+), 153 deletions(-) diff --git a/src/app/fdctl/run/tiles/fd_replay.c b/src/app/fdctl/run/tiles/fd_replay.c index 2a6f057f44..9d6b0dda06 100644 --- a/src/app/fdctl/run/tiles/fd_replay.c +++ b/src/app/fdctl/run/tiles/fd_replay.c @@ -1827,7 +1827,7 @@ read_snapshot( void * _ctx, /* Funk already has a snapshot loaded */ fd_runtime_recover_banks( ctx->slot_ctx, 0, 1 ); } else { - fd_snapshot_load( snapshot, ctx->slot_ctx, ctx->tpool, false, false, FD_SNAPSHOT_TYPE_FULL ); + fd_snapshot_load_all( snapshot, ctx->slot_ctx, ctx->tpool, false, false, FD_SNAPSHOT_TYPE_FULL ); } /* Load incremental */ @@ -1842,7 +1842,7 @@ read_snapshot( void * _ctx, } if( strlen( incremental ) > 0 ) { - fd_snapshot_load( incremental, ctx->slot_ctx, ctx->tpool, false, false, FD_SNAPSHOT_TYPE_INCREMENTAL ); + fd_snapshot_load_all( incremental, ctx->slot_ctx, ctx->tpool, false, false, FD_SNAPSHOT_TYPE_INCREMENTAL ); } if( ctx->replay_plugin_out_mem ) { diff --git a/src/app/ledger/main.c b/src/app/ledger/main.c index 7679b9bc0f..6d2461dc0e 100644 --- a/src/app/ledger/main.c +++ b/src/app/ledger/main.c @@ -1101,11 +1101,11 @@ ingest( fd_ledger_args_t * args ) { /* Load in snapshot(s) */ if( args->snapshot ) { - fd_snapshot_load( args->snapshot, slot_ctx, args->tpool, args->verify_acc_hash, args->check_acc_hash , FD_SNAPSHOT_TYPE_FULL ); + fd_snapshot_load_all( args->snapshot, slot_ctx, args->tpool, args->verify_acc_hash, args->check_acc_hash , FD_SNAPSHOT_TYPE_FULL ); FD_LOG_NOTICE(( "imported %lu records from snapshot", fd_funk_rec_cnt( fd_funk_rec_map( funk, fd_funk_wksp( funk ) ) ) )); } if( args->incremental ) { - fd_snapshot_load( args->incremental, slot_ctx, args->tpool, args->verify_acc_hash, args->check_acc_hash, FD_SNAPSHOT_TYPE_INCREMENTAL ); + fd_snapshot_load_all( args->incremental, slot_ctx, args->tpool, args->verify_acc_hash, args->check_acc_hash, FD_SNAPSHOT_TYPE_INCREMENTAL ); FD_LOG_NOTICE(( "imported %lu records from incremental snapshot", fd_funk_rec_cnt( fd_funk_rec_map( funk, fd_funk_wksp( funk ) ) ) )); } @@ -1247,11 +1247,11 @@ replay( fd_ledger_args_t * args ) { if( !rec_cnt ) { /* Load in snapshot(s) */ if( args->snapshot ) { - fd_snapshot_load( args->snapshot, args->slot_ctx, args->tpool, args->verify_acc_hash, args->check_acc_hash, FD_SNAPSHOT_TYPE_FULL ); + fd_snapshot_load_all( args->snapshot, args->slot_ctx, args->tpool, args->verify_acc_hash, args->check_acc_hash, FD_SNAPSHOT_TYPE_FULL ); FD_LOG_NOTICE(( "imported %lu records from snapshot", fd_funk_rec_cnt( fd_funk_rec_map( funk, fd_funk_wksp( funk ) ) ) )); } if( args->incremental ) { - fd_snapshot_load( args->incremental, args->slot_ctx, args->tpool, args->verify_acc_hash, args->check_acc_hash, FD_SNAPSHOT_TYPE_INCREMENTAL ); + fd_snapshot_load_all( args->incremental, args->slot_ctx, args->tpool, args->verify_acc_hash, args->check_acc_hash, FD_SNAPSHOT_TYPE_INCREMENTAL ); FD_LOG_NOTICE(( "imported %lu records from snapshot", fd_funk_rec_cnt( fd_funk_rec_map( funk, fd_funk_wksp( funk ) ) ) )); } if( args->genesis ) { @@ -1306,11 +1306,11 @@ prune( fd_ledger_args_t * args ) { if( !rec_cnt ) { /* Load in snapshot(s) */ if( args->snapshot ) { - fd_snapshot_load( args->snapshot, args->slot_ctx, args->tpool, args->verify_acc_hash, args->check_acc_hash, FD_SNAPSHOT_TYPE_FULL ); + fd_snapshot_load_all( args->snapshot, args->slot_ctx, args->tpool, args->verify_acc_hash, args->check_acc_hash, FD_SNAPSHOT_TYPE_FULL ); FD_LOG_NOTICE(( "imported %lu records from snapshot", fd_funk_rec_cnt( fd_funk_rec_map( funk, fd_funk_wksp( funk ) ) ) )); } if( args->incremental ) { - fd_snapshot_load( args->incremental, args->slot_ctx, args->tpool, args->verify_acc_hash, args->check_acc_hash, FD_SNAPSHOT_TYPE_INCREMENTAL ); + fd_snapshot_load_all( args->incremental, args->slot_ctx, args->tpool, args->verify_acc_hash, args->check_acc_hash, FD_SNAPSHOT_TYPE_INCREMENTAL ); FD_LOG_NOTICE(( "imported %lu records from snapshot", fd_funk_rec_cnt( fd_funk_rec_map( funk, fd_funk_wksp( funk ) ) ) )); } } @@ -1415,11 +1415,11 @@ prune( fd_ledger_args_t * args ) { /* Load in snapshot(s) */ if( args->snapshot ) { - fd_snapshot_load( args->snapshot, args->slot_ctx, args->tpool, 0, 0, FD_SNAPSHOT_TYPE_FULL ); + fd_snapshot_load_all( args->snapshot, args->slot_ctx, args->tpool, 0, 0, FD_SNAPSHOT_TYPE_FULL ); FD_LOG_NOTICE(( "reload: imported %lu records from snapshot", fd_funk_rec_cnt( fd_funk_rec_map( funk, fd_funk_wksp( funk ) ) ) )); } if( args->incremental ) { - fd_snapshot_load( args->incremental, args->slot_ctx, args->tpool, 0, 0, FD_SNAPSHOT_TYPE_INCREMENTAL ); + fd_snapshot_load_all( args->incremental, args->slot_ctx, args->tpool, 0, 0, FD_SNAPSHOT_TYPE_INCREMENTAL ); FD_LOG_NOTICE(( "reload: imported %lu records from snapshot", fd_funk_rec_cnt( fd_funk_rec_map( funk, fd_funk_wksp( funk ) ) ) )); } diff --git a/src/disco/consensus/test_consensus.c b/src/disco/consensus/test_consensus.c index 4958821426..9264017be4 100644 --- a/src/disco/consensus/test_consensus.c +++ b/src/disco/consensus/test_consensus.c @@ -556,7 +556,7 @@ main( void ) { // FD_TEST( epoch_bank ); // FD_TEST( fd_slot_to_epoch( &epoch_bank->epoch_schedule, i, NULL ) == // fd_slot_to_epoch( &epoch_bank->epoch_schedule, j, NULL ) ); -// fd_snapshot_load( incremental_snapshot, snapshot_slot_ctx, 1, 1, FD_SNAPSHOT_TYPE_INCREMENTAL ); +// fd_snapshot_load_all( incremental_snapshot, snapshot_slot_ctx, 1, 1, FD_SNAPSHOT_TYPE_INCREMENTAL ); // } // ulong snapshot_slot = snapshot_slot_ctx->slot_bank.slot; diff --git a/src/flamenco/snapshot/fd_snapshot.c b/src/flamenco/snapshot/fd_snapshot.c index 53ab56b39b..00b02b36f7 100644 --- a/src/flamenco/snapshot/fd_snapshot.c +++ b/src/flamenco/snapshot/fd_snapshot.c @@ -12,6 +12,25 @@ #include #include +struct fd_snapshot_load_ctx { + /* User-defined parameters. */ + const char * snapshot_file; + fd_exec_slot_ctx_t * slot_ctx; + fd_tpool_t * tpool; + uint verify_hash; + uint check_hash; + int snapshot_type; + + /* Internal state. */ + fd_funk_txn_t * par_txn; + fd_funk_txn_t * child_txn; + + void * loader_mem; + void * restore_mem; + fd_snapshot_loader_t * loader; +}; +typedef struct fd_snapshot_load_ctx fd_snapshot_load_ctx_t; + static void fd_hashes_load(fd_exec_slot_ctx_t * slot_ctx) { FD_BORROWED_ACCOUNT_DECL(block_hashes_rec); @@ -54,148 +73,210 @@ restore_status_cache( void * ctx, return (!!fd_exec_slot_ctx_recover_status_cache( ctx, slot_deltas ) ? 0 : EINVAL); } -static void -load_one_snapshot( fd_exec_slot_ctx_t * slot_ctx, - char * source_cstr, - fd_snapshot_name_t * name_out ) { +ulong +fd_snapshot_load_ctx_align( void ) { + return alignof(fd_snapshot_load_ctx_t); +} + +ulong +fd_snapshot_load_ctx_footprint( void ) { + return sizeof(fd_snapshot_load_ctx_t); +} + +fd_snapshot_load_ctx_t * +fd_snapshot_load_new( uchar * mem, + const char * snapshot_file, + fd_exec_slot_ctx_t * slot_ctx, + fd_tpool_t * tpool, + uint verify_hash, + uint check_hash, + int snapshot_type ) { + + fd_snapshot_load_ctx_t * ctx = (fd_snapshot_load_ctx_t *)mem; + ctx->snapshot_file = snapshot_file; + ctx->slot_ctx = slot_ctx; + ctx->tpool = tpool; + ctx->verify_hash = verify_hash; + ctx->check_hash = check_hash; + ctx->snapshot_type = snapshot_type; + return ctx; +} + +void +fd_snapshot_load_init( fd_snapshot_load_ctx_t * ctx ) { + switch( ctx->snapshot_type ) { + case FD_SNAPSHOT_TYPE_UNSPECIFIED: + FD_LOG_ERR(("fd_snapshot_load(\"%s\", verify-hash=%s, check-hash=%s, FD_SNAPSHOT_TYPE_UNSPECIFIED)", ctx->snapshot_file, ctx->verify_hash ? "true" : "false", ctx->check_hash ? "true" : "false")); + break; + case FD_SNAPSHOT_TYPE_FULL: + FD_LOG_NOTICE(("fd_snapshot_load(\"%s\", verify-hash=%s, check-hash=%s, FD_SNAPSHOT_TYPE_FULL)", ctx->snapshot_file, ctx->verify_hash ? "true" : "false", ctx->check_hash ? "true" : "false")); + break; + case FD_SNAPSHOT_TYPE_INCREMENTAL: + FD_LOG_NOTICE(("fd_snapshot_load(\"%s\", verify-hash=%s, check-hash=%s, FD_SNAPSHOT_TYPE_INCREMENTAL)", ctx->snapshot_file, ctx->verify_hash ? "true" : "false", ctx->check_hash ? "true" : "false")); + break; + default: + FD_LOG_ERR(("fd_snapshot_load(\"%s\", verify-hash=%s, check-hash=%s, huh?)", ctx->snapshot_file, ctx->verify_hash ? "true" : "false", ctx->check_hash ? "true" : "false")); + break; + } + + fd_funk_start_write( ctx->slot_ctx->acc_mgr->funk ); + + ctx->par_txn = ctx->slot_ctx->funk_txn; + ctx->child_txn = ctx->slot_ctx->funk_txn; + if( ctx->verify_hash && FD_FEATURE_ACTIVE( ctx->slot_ctx, incremental_snapshot_only_incremental_hash_calculation ) ) { + fd_funk_txn_xid_t xid; + memset( &xid, 0xc3, sizeof(xid) ); + ctx->child_txn = fd_funk_txn_prepare( ctx->slot_ctx->acc_mgr->funk, ctx->child_txn, &xid, 0 ); + ctx->slot_ctx->funk_txn = ctx->child_txn; + } + +} + +void +fd_snapshot_load_manifest_and_status_cache( fd_snapshot_load_ctx_t * ctx ) { + + fd_scratch_push(); + size_t slen = strlen( ctx->snapshot_file ); + char * snapshot_cstr = fd_scratch_alloc( 1UL, slen + 1 ); + fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( snapshot_cstr ), ctx->snapshot_file, slen ) ); /* FIXME don't hardcode this param */ static ulong const zstd_window_sz = 33554432UL; fd_snapshot_src_t src[1]; - if( FD_UNLIKELY( !fd_snapshot_src_parse( src, source_cstr ) ) ) { + if( FD_UNLIKELY( !fd_snapshot_src_parse( src, snapshot_cstr ) ) ) { FD_LOG_ERR(( "Failed to load snapshot" )); } - fd_exec_epoch_ctx_bank_mem_clear( slot_ctx->epoch_ctx ); + fd_exec_epoch_ctx_bank_mem_clear( ctx->slot_ctx->epoch_ctx ); - fd_valloc_t valloc = slot_ctx->valloc; - fd_acc_mgr_t * acc_mgr = slot_ctx->acc_mgr; - fd_funk_txn_t * funk_txn = slot_ctx->funk_txn; + fd_valloc_t valloc = ctx->slot_ctx->valloc; + fd_acc_mgr_t * acc_mgr = ctx->slot_ctx->acc_mgr; + fd_funk_txn_t * funk_txn = ctx->slot_ctx->funk_txn; void * restore_mem = fd_valloc_malloc( valloc, fd_snapshot_restore_align(), fd_snapshot_restore_footprint() ); void * loader_mem = fd_valloc_malloc( valloc, fd_snapshot_loader_align(), fd_snapshot_loader_footprint( zstd_window_sz ) ); - fd_snapshot_restore_t * restore = fd_snapshot_restore_new( restore_mem, acc_mgr, funk_txn, valloc, slot_ctx, restore_manifest, restore_status_cache ); - fd_snapshot_loader_t * loader = fd_snapshot_loader_new ( loader_mem, zstd_window_sz ); + fd_snapshot_restore_t * restore = fd_snapshot_restore_new( restore_mem, acc_mgr, funk_txn, valloc, ctx->slot_ctx, restore_manifest, restore_status_cache ); + ctx->loader = fd_snapshot_loader_new ( loader_mem, zstd_window_sz ); - if( FD_UNLIKELY( !restore || !loader ) ) { + if( FD_UNLIKELY( !restore || !ctx->loader ) ) { fd_valloc_free( valloc, fd_snapshot_loader_delete ( loader_mem ) ); fd_valloc_free( valloc, fd_snapshot_restore_delete( restore_mem ) ); FD_LOG_ERR(( "Failed to load snapshot" )); } - if( FD_UNLIKELY( !fd_snapshot_loader_init( loader, restore, src, slot_ctx->slot_bank.slot ) ) ) { + if( FD_UNLIKELY( !fd_snapshot_loader_init( ctx->loader, restore, src, ctx->slot_ctx->slot_bank.slot ) ) ) { FD_LOG_ERR(( "Failed to init snapshot loader" )); } + /* First load in the manifest. */ for(;;) { - int err = fd_snapshot_loader_advance( loader ); - if( FD_LIKELY( err == 0 ) ) continue; - if( err == -1 ) break; + int err = fd_snapshot_loader_advance( ctx->loader ); + if( err==MANIFEST_DONE ) break; /* We have finished loading in the manifest. */ + if( FD_LIKELY( !err ) ) continue; /* Keep going. */ + /* If we have reached the end of the snapshot(err==-1), throw an error because + this is not expected. */ FD_LOG_ERR(( "Failed to load snapshot (%d-%s)", err, fd_io_strerror( err ) )); } - fd_snapshot_name_t const * name = fd_snapshot_loader_get_name( loader ); +} + +void +fd_snapshot_load_accounts( fd_snapshot_load_ctx_t * ctx ) { + /* Now, that the manifest is done being read in. Read in the rest of the accounts. */ + for(;;) { + int err = fd_snapshot_loader_advance( ctx->loader ); + if( err==-1 ) break; /* We have finished loading in the snapshot. */ + if( FD_LIKELY( err==0 ) ) continue; /* Keep going. */ + + FD_LOG_ERR(( "Failed to load snapshot (%d-%s)", err, fd_io_strerror( err ) )); + } + + fd_snapshot_name_t const * name = fd_snapshot_loader_get_name( ctx->loader ); if( FD_UNLIKELY( !name ) ) FD_LOG_ERR(( "name is NULL" )); - *name_out = *name; - fd_valloc_free( valloc, fd_snapshot_loader_delete ( loader_mem ) ); - fd_valloc_free( valloc, fd_snapshot_restore_delete( restore_mem ) ); + fd_valloc_free( ctx->slot_ctx->valloc, fd_snapshot_loader_delete ( ctx->loader_mem ) ); + fd_valloc_free( ctx->slot_ctx->valloc, fd_snapshot_restore_delete( ctx->restore_mem ) ); - FD_LOG_NOTICE(( "Finished reading snapshot %s", source_cstr )); + FD_LOG_NOTICE(( "Finished reading snapshot %s", ctx->snapshot_file )); } - void -fd_snapshot_load( const char * snapshotfile, - fd_exec_slot_ctx_t * slot_ctx, - fd_tpool_t * tpool, - uint verify_hash, - uint check_hash, - int snapshot_type ) { - - switch (snapshot_type) { - case FD_SNAPSHOT_TYPE_UNSPECIFIED: - FD_LOG_ERR(("fd_snapshot_load(\"%s\", verify-hash=%s, check-hash=%s, FD_SNAPSHOT_TYPE_UNSPECIFIED)", snapshotfile, verify_hash ? "true" : "false", check_hash ? "true" : "false")); - break; - case FD_SNAPSHOT_TYPE_FULL: - FD_LOG_NOTICE(("fd_snapshot_load(\"%s\", verify-hash=%s, check-hash=%s, FD_SNAPSHOT_TYPE_FULL)", snapshotfile, verify_hash ? "true" : "false", check_hash ? "true" : "false")); - break; - case FD_SNAPSHOT_TYPE_INCREMENTAL: - FD_LOG_NOTICE(("fd_snapshot_load(\"%s\", verify-hash=%s, check-hash=%s, FD_SNAPSHOT_TYPE_INCREMENTAL)", snapshotfile, verify_hash ? "true" : "false", check_hash ? "true" : "false")); - break; - default: - FD_LOG_ERR(("fd_snapshot_load(\"%s\", verify-hash=%s, check-hash=%s, huh?)", snapshotfile, verify_hash ? "true" : "false", check_hash ? "true" : "false")); - break; - } - - fd_funk_start_write( slot_ctx->acc_mgr->funk ); +fd_snapshot_load_fini( fd_snapshot_load_ctx_t * ctx ) { - fd_funk_txn_t * par_txn = slot_ctx->funk_txn; - fd_funk_txn_t * child_txn = slot_ctx->funk_txn; - if( verify_hash && FD_FEATURE_ACTIVE(slot_ctx, incremental_snapshot_only_incremental_hash_calculation) ) { - fd_funk_txn_xid_t xid; - memset( &xid, 0xc3, sizeof( xid ) ); - child_txn = fd_funk_txn_prepare( slot_ctx->acc_mgr->funk, child_txn, &xid, 0 ); - slot_ctx->funk_txn = child_txn; - } + fd_snapshot_name_t const * name = fd_snapshot_loader_get_name( ctx->loader ); + fd_hash_t const * fhash = &name->fhash; - fd_scratch_push(); - size_t slen = strlen( snapshotfile ); - char * snapshot_cstr = fd_scratch_alloc( 1UL, slen + 1 ); - fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( snapshot_cstr ), snapshotfile, slen ) ); - fd_snapshot_name_t name = {0}; - load_one_snapshot( slot_ctx, snapshot_cstr, &name ); - fd_hash_t const * fhash = &name.fhash; - fd_scratch_pop(); - - if( name.type != snapshot_type ) { - FD_LOG_ERR(( "snapshot %s is wrong type", snapshotfile )); + if( name->type != ctx->snapshot_type ) { + FD_LOG_ERR(( "snapshot %s is wrong type", ctx->snapshot_file )); } // In order to calculate the snapshot hash, we need to know what features are active... - fd_features_restore( slot_ctx ); - fd_calculate_epoch_accounts_hash_values( slot_ctx ); + fd_features_restore( ctx->slot_ctx ); + fd_calculate_epoch_accounts_hash_values( ctx->slot_ctx ); - if( verify_hash ) { - if (snapshot_type == FD_SNAPSHOT_TYPE_FULL) { + if( ctx->verify_hash ) { + if( ctx->snapshot_type==FD_SNAPSHOT_TYPE_FULL ) { fd_hash_t accounts_hash; - fd_snapshot_hash(slot_ctx, tpool, &accounts_hash, check_hash); + fd_snapshot_hash(ctx->slot_ctx, ctx->tpool, &accounts_hash, ctx->check_hash ); - if (memcmp(fhash->uc, accounts_hash.uc, 32) != 0) + if( memcmp( fhash->uc, accounts_hash.uc, sizeof(fd_hash_t) ) ) { FD_LOG_ERR(( "snapshot accounts_hash (calculated) %s != (expected) %s", FD_BASE58_ENC_32_ALLOCA( accounts_hash.hash ), FD_BASE58_ENC_32_ALLOCA( fhash->uc ) )); - else - FD_LOG_NOTICE(( "snapshot accounts_hash %s verified successfully", FD_BASE58_ENC_32_ALLOCA( accounts_hash.hash) )); - } else if (snapshot_type == FD_SNAPSHOT_TYPE_INCREMENTAL) { + } else { + FD_LOG_NOTICE(( "snapshot accounts_hash %s verified successfully", FD_BASE58_ENC_32_ALLOCA( accounts_hash.hash ) )); + } + } else if( ctx->snapshot_type == FD_SNAPSHOT_TYPE_INCREMENTAL) { fd_hash_t accounts_hash; - if (FD_FEATURE_ACTIVE(slot_ctx, incremental_snapshot_only_incremental_hash_calculation)) { + if( FD_FEATURE_ACTIVE( ctx->slot_ctx, incremental_snapshot_only_incremental_hash_calculation ) ) { FD_LOG_NOTICE(( "hashing incremental snapshot with only deltas" )); - fd_accounts_hash_inc_only(slot_ctx, &accounts_hash, child_txn, check_hash); + fd_accounts_hash_inc_only( ctx->slot_ctx, &accounts_hash, ctx->child_txn, ctx->check_hash ); } else { FD_LOG_NOTICE(( "hashing incremental snapshot with all accounts" )); - fd_snapshot_hash(slot_ctx, tpool, &accounts_hash, check_hash); + fd_snapshot_hash( ctx->slot_ctx, ctx->tpool, &accounts_hash, ctx->check_hash ); } - if (memcmp(fhash->uc, accounts_hash.uc, 32) != 0) - FD_LOG_ERR(("incremental accounts_hash %s != %s", FD_BASE58_ENC_32_ALLOCA( accounts_hash.hash ), FD_BASE58_ENC_32_ALLOCA( fhash->uc ) )); - else - FD_LOG_NOTICE(("incremental accounts_hash %s verified successfully", FD_BASE58_ENC_32_ALLOCA( accounts_hash.hash ) )); + if( memcmp( fhash->uc, accounts_hash.uc, sizeof(fd_hash_t) ) ) { + FD_LOG_ERR(( "incremental accounts_hash %s != %s", FD_BASE58_ENC_32_ALLOCA( accounts_hash.hash ), FD_BASE58_ENC_32_ALLOCA( fhash->uc ) )); + } else { + FD_LOG_NOTICE(( "incremental accounts_hash %s verified successfully", FD_BASE58_ENC_32_ALLOCA( accounts_hash.hash ) )); + } } else { - FD_LOG_ERR(( "invalid snapshot type %d", snapshot_type )); + FD_LOG_ERR(( "invalid snapshot type %d", ctx->snapshot_type )); } } - if( child_txn != par_txn ) { - fd_funk_txn_publish( slot_ctx->acc_mgr->funk, child_txn, 0 ); - slot_ctx->funk_txn = par_txn; + if( ctx->child_txn != ctx->par_txn ) { + fd_funk_txn_publish( ctx->slot_ctx->acc_mgr->funk, ctx->child_txn, 0 ); + ctx->slot_ctx->funk_txn = ctx->par_txn; } - fd_hashes_load(slot_ctx); + fd_hashes_load( ctx->slot_ctx ); + + fd_rewards_recalculate_partitioned_rewards( ctx->slot_ctx ); + + fd_funk_end_write( ctx->slot_ctx->acc_mgr->funk ); +} + +void +fd_snapshot_load_all( const char * source_cstr, + fd_exec_slot_ctx_t * slot_ctx, + fd_tpool_t * tpool, + uint verify_hash, + uint check_hash, + int snapshot_type ) { + + FD_SCRATCH_SCOPE_BEGIN { + + uchar * mem = fd_scratch_alloc( fd_snapshot_load_ctx_align(), fd_snapshot_load_ctx_footprint() ); + fd_snapshot_load_ctx_t * ctx = fd_snapshot_load_new( mem, source_cstr, slot_ctx, tpool, verify_hash, check_hash, snapshot_type ); - fd_rewards_recalculate_partitioned_rewards( slot_ctx ); + fd_snapshot_load_init( ctx ); + fd_snapshot_load_manifest_and_status_cache( ctx ); + fd_snapshot_load_accounts( ctx ); + fd_snapshot_load_fini( ctx ); - fd_funk_end_write( slot_ctx->acc_mgr->funk ); + } FD_SCRATCH_SCOPE_END; } diff --git a/src/flamenco/snapshot/fd_snapshot.h b/src/flamenco/snapshot/fd_snapshot.h index 43619adb2f..2c6ae7be61 100644 --- a/src/flamenco/snapshot/fd_snapshot.h +++ b/src/flamenco/snapshot/fd_snapshot.h @@ -1,6 +1,7 @@ #ifndef HEADER_fd_src_flamenco_snapshot_fd_snapshot_h #define HEADER_fd_src_flamenco_snapshot_fd_snapshot_h +#include "../../funk/fd_funk_txn.h" #if FD_HAS_ZSTD /* fd_snapshot.h provides high-level blocking APIs for Solana snapshots. */ @@ -9,10 +10,36 @@ FD_PROTOTYPES_BEGIN -/* fd_snapshot_load does a blocking load of a snapshot. +struct fd_snapshot_load_ctx; +typedef struct fd_snapshot_load_ctx fd_snapshot_load_ctx_t; - source_cstr is either a local file system path (absolute or relative) - or a HTTP URL (must start with 'http://', HTTPS not yet supported). +/* fd_snapshot_load_all does a blocking load of a snapshot. It is a wrapper + around fd_snapshot_load_new, fd_snapshot_load_init, + fd_snapshot_load_manifest_and_status_cache, and fd_snapshot_load_fini. + + fd_snapshot_load_new sets up the context that is passed around for the + other fd_snapshot_load_* functions. + + fd_snapshot_load_init starts the process of loading in the snapshot and + sets up the funk transactions, etc.. + + fd_snapshot_load_manifest_and_status_cache loads in the manifest and the + status cache exiting as soon as the manifest is loaded in. + + fd_snapshot_load_accounts will load in the rest of the snapshot file but + the runtime is still not setup to run. + + fd_snapshot_load_fini will use the slot context and funk which are now + populated to setup the runtime and finalize the snapshot load. + + The reason these are broken out is to support loading in the manifest as + quickly as possible, allowing for other operations to start (like + stake-weighted repair) while the rest of the snapshot is being loaded. This + is needed as loading in the manifest only takes a few seconds and the overall + time to load in a snapshot is dominated by loading in the append vecs. + + source_cstr is either a local file system path. + TODO: Add support for an HTTP url. slot_ctx is a valid initialized slot context (a funk, acc_mgr, heap valloc, zero-initialized slot bank). @@ -26,13 +53,40 @@ FD_PROTOTYPES_BEGIN snapshot_type is one of FD_SNAPSHOT_TYPE_{...}. */ +ulong +fd_snapshot_load_ctx_align( void ); + +ulong +fd_snapshot_load_ctx_footprint( void ); + +fd_snapshot_load_ctx_t * +fd_snapshot_load_new( uchar * mem, + const char * snapshot_file, + fd_exec_slot_ctx_t * slot_ctx, + fd_tpool_t * tpool, + uint verify_hash, + uint check_hash, + int snapshot_type ); + +void +fd_snapshot_load_init( fd_snapshot_load_ctx_t * ctx ); + +void +fd_snapshot_load_manifest_and_status_cache( fd_snapshot_load_ctx_t * ctx ); + +void +fd_snapshot_load_accounts( fd_snapshot_load_ctx_t * ctx ); + +void +fd_snapshot_load_fini( fd_snapshot_load_ctx_t * ctx ); + void -fd_snapshot_load( const char * source_cstr, - fd_exec_slot_ctx_t * slot_ctx, - fd_tpool_t * tpool, - uint verify_hash, - uint check_hash, - int snapshot_type ); +fd_snapshot_load_all( const char * source_cstr, + fd_exec_slot_ctx_t * slot_ctx, + fd_tpool_t * tpool, + uint verify_hash, + uint check_hash, + int snapshot_type ); FD_PROTOTYPES_END diff --git a/src/flamenco/snapshot/fd_snapshot_http.h b/src/flamenco/snapshot/fd_snapshot_http.h index f57fd512c6..0610eb3435 100644 --- a/src/flamenco/snapshot/fd_snapshot_http.h +++ b/src/flamenco/snapshot/fd_snapshot_http.h @@ -2,7 +2,6 @@ #define HEADER_fd_src_flamenco_snapshot_fd_snapshot_http_h #include "fd_snapshot.h" -#include "fd_snapshot_loader.h" #include "fd_snapshot_istream.h" /* fd_snapshot_http.h provides APIs for streaming download of Solana @@ -32,6 +31,8 @@ /* fd_snapshot_http_t is the snapshot HTTP client class. */ +FD_PROTOTYPES_BEGIN + struct fd_snapshot_http { uint next_ipv4; /* big-endian, see fd_ip4.h */ ushort next_port; @@ -83,8 +84,6 @@ struct fd_snapshot_http { typedef struct fd_snapshot_http fd_snapshot_http_t; -FD_PROTOTYPES_BEGIN - fd_snapshot_http_t * fd_snapshot_http_new( void * mem, const char * dst_str, diff --git a/src/flamenco/snapshot/fd_snapshot_istream.c b/src/flamenco/snapshot/fd_snapshot_istream.c index ed8b6c5732..7cc3ac2043 100644 --- a/src/flamenco/snapshot/fd_snapshot_istream.c +++ b/src/flamenco/snapshot/fd_snapshot_istream.c @@ -1,4 +1,5 @@ #include "fd_snapshot_istream.h" +#include "fd_snapshot_restore.h" #include "../../util/fd_util.h" #include @@ -142,17 +143,21 @@ fd_tar_io_reader_advance( fd_tar_io_reader_t * this ) { if( FD_LIKELY( read_err==0 ) ) { /* ok */ } else if( read_err<0 ) { /* EOF */ return -1; /* TODO handle unexpected EOF case */ } else { - FD_LOG_WARNING(( "snapshot tar stream failed (%d-%s)", read_err, fd_io_strerror( read_err ) )); + FD_LOG_WARNING(( "Snapshot tar stream failed (%d-%s)", read_err, fd_io_strerror( read_err ) )); return read_err; } - int tar_err = fd_tar_read( this->reader, buf, buf_sz ); + int tar_err = fd_tar_read( this->reader, buf, buf_sz, MANIFEST_DONE ); + if( tar_err==MANIFEST_DONE ) { + FD_LOG_NOTICE(( "Finished reading manifest" )); + return tar_err; + } if( FD_UNLIKELY( tar_err>0 ) ) { - FD_LOG_WARNING(( "snapshot tar stream failed (%d-%s)", tar_err, fd_io_strerror( tar_err ) )); + FD_LOG_WARNING(( "Snapshot tar stream failed (%d-%s)", tar_err, fd_io_strerror( tar_err ) )); return tar_err; } if( tar_err<0 ) { - FD_LOG_NOTICE(( "encountered end of tar stream" )); + FD_LOG_NOTICE(( "Encountered end of tar stream" )); return -1; } diff --git a/src/flamenco/snapshot/fd_snapshot_loader.c b/src/flamenco/snapshot/fd_snapshot_loader.c index 2aa46c5480..8e5ac2ad85 100644 --- a/src/flamenco/snapshot/fd_snapshot_loader.c +++ b/src/flamenco/snapshot/fd_snapshot_loader.c @@ -1,6 +1,4 @@ #include "fd_snapshot_loader.h" -#include "fd_snapshot.h" -#include "fd_snapshot_restore.h" #include "fd_snapshot_http.h" #include @@ -12,6 +10,8 @@ #include #include +#define FD_SNAPSHOT_LOADER_MAGIC (0xa78a73a69d33e6b1UL) + struct fd_snapshot_loader { ulong magic; @@ -46,12 +46,10 @@ struct fd_snapshot_loader { /* Hash and slot numbers from filename */ fd_snapshot_name_t name; -}; +}; typedef struct fd_snapshot_loader fd_snapshot_loader_t; -#define FD_SNAPSHOT_LOADER_MAGIC (0xa78a73a69d33e6b1UL) - ulong fd_snapshot_loader_align( void ) { return fd_ulong_max( alignof(fd_snapshot_loader_t), fd_zstd_dstream_align() ); @@ -198,9 +196,15 @@ fd_snapshot_loader_advance( fd_snapshot_loader_t * dumper ) { fd_tar_io_reader_t * vtar = dumper->vtar; int untar_err = fd_tar_io_reader_advance( vtar ); - if( untar_err==0 ) { /* ok */ } - else if( untar_err<0 ) { /* EOF */ return -1; } - else { + if( untar_err==0 ) { + /* Ok */ + } else if( untar_err==MANIFEST_DONE ) { + /* Finished reading the manifest for the first time. */ + return MANIFEST_DONE; + } else if( untar_err<0 ) { + /* EOF */ + return -1; + } else { FD_LOG_WARNING(( "Failed to load snapshot (%d-%s)", untar_err, fd_io_strerror( untar_err ) )); return untar_err; } diff --git a/src/flamenco/snapshot/fd_snapshot_loader.h b/src/flamenco/snapshot/fd_snapshot_loader.h index 3af44e3e1a..d3909f95e8 100644 --- a/src/flamenco/snapshot/fd_snapshot_loader.h +++ b/src/flamenco/snapshot/fd_snapshot_loader.h @@ -12,14 +12,9 @@ The loader is currently a single-threaded streaming pipeline. This is subject to change to the tile architecture in the future. */ -#include "../snapshot/fd_snapshot.h" -#include "../snapshot/fd_snapshot_restore.h" - -/* fd_snapshot_loader_t manages file descriptors and buffers used during - snapshot load. */ - -struct fd_snapshot_loader; -typedef struct fd_snapshot_loader fd_snapshot_loader_t; +#include "fd_snapshot.h" +#include "fd_snapshot_istream.h" +#include "fd_snapshot_restore.h" /* FD_SNAPSHOT_SRC_{...} specifies the type of snapshot source. */ @@ -28,6 +23,13 @@ typedef struct fd_snapshot_loader fd_snapshot_loader_t; /* fd_snapshot_src_t specifies the snapshot source. */ +FD_PROTOTYPES_BEGIN + +/* fd_snapshot_loader_t manages file descriptors and buffers used during + snapshot load. */ +struct fd_snapshot_loader; +typedef struct fd_snapshot_loader fd_snapshot_loader_t; + struct fd_snapshot_src { int type; union { @@ -49,8 +51,6 @@ struct fd_snapshot_src { typedef struct fd_snapshot_src fd_snapshot_src_t; -FD_PROTOTYPES_BEGIN - /* Constructor API for fd_snapshot_loader_t. */ ulong diff --git a/src/flamenco/snapshot/fd_snapshot_restore.c b/src/flamenco/snapshot/fd_snapshot_restore.c index 45d4fcb0f7..24ea11ce18 100644 --- a/src/flamenco/snapshot/fd_snapshot_restore.c +++ b/src/flamenco/snapshot/fd_snapshot_restore.c @@ -1,3 +1,4 @@ +#include "fd_snapshot_restore.h" #include "fd_snapshot_restore_private.h" #include "../../util/archive/fd_tar.h" #include "../runtime/fd_acc_mgr.h" @@ -308,7 +309,7 @@ fd_snapshot_restore_manifest( fd_snapshot_restore_t * restore ) { fd_snapshot_restore_discard_buf( restore ); restore->slot = slot; - restore->manifest_done = 1; + restore->manifest_done = MANIFEST_DONE_NOT_SEEN; return err; } @@ -691,6 +692,11 @@ fd_snapshot_restore_chunk( void * restore_, buf = buf_new; } + if( restore->manifest_done==MANIFEST_DONE_NOT_SEEN ) { + restore->manifest_done = MANIFEST_DONE_SEEN; + return MANIFEST_DONE; + } + return 0; } diff --git a/src/flamenco/snapshot/fd_snapshot_restore.h b/src/flamenco/snapshot/fd_snapshot_restore.h index 0b92b2a7c4..1adcf1a845 100644 --- a/src/flamenco/snapshot/fd_snapshot_restore.h +++ b/src/flamenco/snapshot/fd_snapshot_restore.h @@ -19,6 +19,15 @@ #include "../../util/archive/fd_tar.h" #include "../runtime/context/fd_exec_slot_ctx.h" +/* We want to exit out of snapshot loading once the manifest has been loaded in. + Once it has been seen, we don't want to exit out of snapshot loading if we + have already done so once. We exit out to allow for manifest data to be used + around the codebase. */ + +#define MANIFEST_DONE (INT_MAX) +#define MANIFEST_DONE_NOT_SEEN (1) +#define MANIFEST_DONE_SEEN (2) + /* fd_snapshot_restore_t implements a streaming TAR reader that parses archive records on the fly. Records include the manifest (at the start of the file), and account data. Notably, this object does on- diff --git a/src/flamenco/snapshot/fd_snapshot_restore_private.h b/src/flamenco/snapshot/fd_snapshot_restore_private.h index 41328c1cf1..9368a152e9 100644 --- a/src/flamenco/snapshot/fd_snapshot_restore_private.h +++ b/src/flamenco/snapshot/fd_snapshot_restore_private.h @@ -69,7 +69,7 @@ struct fd_snapshot_restore { ulong slot; /* Slot number the snapshot was taken at */ uchar state; - uchar manifest_done : 1; + uchar manifest_done; uchar status_cache_done : 1; uchar failed : 1; diff --git a/src/flamenco/snapshot/test_snapshot_restore.c b/src/flamenco/snapshot/test_snapshot_restore.c index c4fc50f405..dfc673ed12 100644 --- a/src/flamenco/snapshot/test_snapshot_restore.c +++ b/src/flamenco/snapshot/test_snapshot_restore.c @@ -100,7 +100,7 @@ main( int argc, # define NEW_RESTORE_POST_MANIFEST() __extension__({ \ fd_snapshot_restore_t * restore = fd_snapshot_restore_new( restore_mem, acc_mgr, NULL, _valloc, _dummy_ctx, cb_manifest, cb_status_cache ); \ - restore->manifest_done = 1; \ + restore->manifest_done = MANIFEST_DONE_SEEN; \ restore->slot = restore_slot; \ restore; \ }) @@ -198,11 +198,11 @@ main( int argc, _cb_v_manifest = NULL; _cb_retcode = 0; FD_TEST( 0==fd_snapshot_restore_file( restore, &meta, data_sz ) ); - FD_TEST( 0==fd_snapshot_restore_chunk( restore, data, data_sz ) ); - FD_TEST( _cb_v_ctx == _dummy_ctx ); - FD_TEST( _cb_v_manifest != NULL ); - FD_TEST( restore->manifest_done == 1 ); - FD_TEST( restore->slot == 3UL ); + FD_TEST( MANIFEST_DONE==fd_snapshot_restore_chunk( restore, data, data_sz ) ); + FD_TEST( _cb_v_ctx == _dummy_ctx ); + FD_TEST( _cb_v_manifest != NULL ); + FD_TEST( restore->manifest_done == MANIFEST_DONE_SEEN ); + FD_TEST( restore->slot == 3UL ); fd_snapshot_restore_delete( restore ); fd_scratch_pop(); @@ -281,7 +281,7 @@ main( int argc, _cb_v_manifest = NULL; _cb_retcode = 0; FD_TEST( 0==fd_snapshot_restore_file( restore, &meta, manifest_sz + 1 ) ); - FD_TEST( 0==fd_snapshot_restore_chunk( restore, data, manifest_sz + 1 ) ); + FD_TEST( MANIFEST_DONE==fd_snapshot_restore_chunk( restore, data, manifest_sz + 1 ) ); FD_TEST( _cb_v_ctx == _dummy_ctx ); /* callback must have been successful */ FD_TEST( _cb_v_manifest != NULL ); FD_TEST( 0==fd_snapshot_restore_chunk( restore, data, manifest_sz + 1 ) ); @@ -380,8 +380,8 @@ main( int argc, do { fd_snapshot_restore_t * restore = NEW_RESTORE_POST_MANIFEST(); FD_TEST( restore ); - restore->manifest_done = 1; - restore->funk_txn = fd_funk_txn_prepare( funk, NULL, xid, 0 ); + restore->manifest_done = MANIFEST_DONE_SEEN; + restore->funk_txn = fd_funk_txn_prepare( funk, NULL, xid, 0 ); FD_TEST( restore->funk_txn ); _set_accv_sz( restore, /* slot */ 1UL, /* id */ 1UL, /* sz */ sizeof(fd_solana_account_hdr_t) ); @@ -409,7 +409,7 @@ main( int argc, do { fd_snapshot_restore_t * restore = NEW_RESTORE_POST_MANIFEST(); FD_TEST( restore ); - restore->manifest_done = 1; + restore->manifest_done = MANIFEST_DONE_SEEN; restore->funk_txn = fd_funk_txn_prepare( funk, NULL, xid, 0 ); /* Insert a dead account (slot 9) */ @@ -455,7 +455,7 @@ main( int argc, do { fd_snapshot_restore_t * restore = NEW_RESTORE_POST_MANIFEST(); FD_TEST( restore ); - restore->manifest_done = 1; + restore->manifest_done = MANIFEST_DONE_SEEN; restore->funk_txn = fd_funk_txn_prepare( funk, NULL, xid, 0 ); /* Insert an account (key 9, slot 9) */ @@ -545,7 +545,7 @@ main( int argc, do { fd_snapshot_restore_t * restore = NEW_RESTORE_POST_MANIFEST(); FD_TEST( restore ); - restore->manifest_done = 1; + restore->manifest_done = MANIFEST_DONE_SEEN; ulong accv_sz = 2 * sizeof(fd_solana_account_hdr_t) + 2UL; _set_accv_sz( restore, /* slot */ 1UL, /* id */ 1UL, accv_sz ); diff --git a/src/util/archive/fd_tar.h b/src/util/archive/fd_tar.h index d084fea5cf..09de84c442 100644 --- a/src/util/archive/fd_tar.h +++ b/src/util/archive/fd_tar.h @@ -218,12 +218,16 @@ fd_tar_reader_delete( fd_tar_reader_t * reader ); Returns -1 on end-of-file. On failure, returns positive errno compatible error code. In case of error, caller should delete reader and must not issue any more fd_tar_read calls. Suitable as a - fd_decompress_cb_t callback. */ + fd_decompress_cb_t callback. If the underlying functions returns track_err + at any point, after fd_tar_read has processed the end of the data buffer, + we will proceed to return track_err assuming no other errors have been + thrown. Pass in 0 to not use this functionality. */ int fd_tar_read( void * reader, uchar const * data, - ulong data_sz ); + ulong data_sz, + int track_err ); /* Streaming writer ***************************************************/ diff --git a/src/util/archive/fd_tar_reader.c b/src/util/archive/fd_tar_reader.c index 88600d4f1c..6df7811c53 100644 --- a/src/util/archive/fd_tar_reader.c +++ b/src/util/archive/fd_tar_reader.c @@ -128,13 +128,15 @@ fd_tar_read_data( fd_tar_reader_t * reader, reader->file_sz -= chunk_sz; *pcur = cur; - return fd_int_if( err, EIO, 0 ); + + return err; } int fd_tar_read( void * const reader_, uchar const * const data, - ulong const data_sz ) { + ulong const data_sz, + int const track_err ) { fd_tar_reader_t * reader = reader_; ulong const pos = reader->pos; @@ -142,10 +144,17 @@ fd_tar_read( void * const reader_, uchar const * cur = data; uchar const * end = cur+data_sz; + /* We want to return this return value if it has been seen at any point, + but we want to make sure the reader has processed the entire data buffer. */ + int seen_tracked_err = 0; + while( cur!=end ) { if( reader->file_sz ) { int err = fd_tar_read_data( reader, &cur, end ); - if( FD_UNLIKELY( !!err ) ) return err; + if( FD_UNLIKELY( !!err && err!=track_err ) ) return err; + if( err==track_err ) { + seen_tracked_err = 1; + } reader->pos = pos + (ulong)( cur-data ); } if( !reader->file_sz ) { @@ -155,6 +164,10 @@ fd_tar_read( void * const reader_, } } + if( seen_tracked_err ) { + return track_err; + } + return 0; } diff --git a/src/util/archive/fuzz_tar.c b/src/util/archive/fuzz_tar.c index 748e135543..a7a3651e90 100644 --- a/src/util/archive/fuzz_tar.c +++ b/src/util/archive/fuzz_tar.c @@ -66,7 +66,7 @@ LLVMFuzzerTestOneInput( uchar const * data, FD_TEST( reader ); /* Read all in one */ - int err1 = fd_tar_read( reader, data, size ); + int err1 = fd_tar_read( reader, data, size, 0 ); FD_TEST( _reader==fd_tar_reader_delete( reader ) ); @@ -77,7 +77,7 @@ LLVMFuzzerTestOneInput( uchar const * data, int err2 = 0; for( ulong i=0UL; i