Skip to content

snapshots: send manifest #5631

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/app/firedancer-dev/commands/backtest.c
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ backtest_topo( config_t * config ) {

/* Replay decoded manifest dcache topo obj */
fd_topo_obj_t * replay_manifest_dcache = fd_topob_obj( topo, "dcache", "replay_manif" );
fd_pod_insertf_ulong( topo->props, 2UL << 30UL, "obj.%lu.data_sz", replay_manifest_dcache->id );
fd_pod_insertf_ulong( topo->props, (4 * 1UL << 30UL /* gib */), "obj.%lu.data_sz", replay_manifest_dcache->id );
fd_pod_insert_ulong( topo->props, "manifest_dcache", replay_manifest_dcache->id );

fd_topob_tile_uses( topo, snapin_tile, funk_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );
Expand Down
2 changes: 1 addition & 1 deletion src/app/firedancer-dev/commands/snapshot_load.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ snapshot_load_topo( config_t * config,
manifest */
fd_topob_wksp( topo, "replay_manif" );
fd_topo_obj_t * replay_manifest_dcache = fd_topob_obj( topo, "dcache", "replay_manif" );
fd_pod_insertf_ulong( topo->props, 1UL << 30UL, "obj.%lu.data_sz", replay_manifest_dcache->id );
fd_pod_insertf_ulong( topo->props, (4 * 1UL << 30UL /* gib */), "obj.%lu.data_sz", replay_manifest_dcache->id );
fd_pod_insert_ulong( topo->props, "manifest_dcache", replay_manifest_dcache->id );

/* read() tile */
Expand Down
2 changes: 1 addition & 1 deletion src/app/firedancer/topology.c
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ fd_topo_initialize( config_t * config ) {

/* Replay decoded manifest dcache topo obj */
fd_topo_obj_t * replay_manifest_dcache = fd_topob_obj( topo, "dcache", "replay_manif" );
fd_pod_insertf_ulong( topo->props, 2UL << 30UL, "obj.%lu.data_sz", replay_manifest_dcache->id );
fd_pod_insertf_ulong( topo->props, (4 * 1UL << 30UL /* gib */), "obj.%lu.data_sz", replay_manifest_dcache->id );
fd_pod_insert_ulong( topo->props, "manifest_dcache", replay_manifest_dcache->id );

ushort parsed_tile_to_cpu[ FD_TILE_MAX ];
Expand Down
2 changes: 2 additions & 0 deletions src/discof/replay/fd_replay_tile.c
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,7 @@ restore_slot_ctx( fd_replay_tile_ctx_t * ctx,

fd_solana_manifest_global_t * manifest_global
= (fd_solana_manifest_global_t *)fd_chunk_to_laddr( fd_wksp_containing( ctx->manifest_dcache ), chunk );

fd_exec_slot_ctx_t * recovered_slot_ctx = fd_exec_slot_ctx_recover( ctx->slot_ctx,
manifest_global,
ctx->runtime_spad );
Expand Down Expand Up @@ -985,6 +986,7 @@ on_snapshot_message( fd_replay_tile_ctx_t * ctx,
}
case FD_FULL_SNAPSHOT_MANIFEST_EXTERNAL:
case FD_INCREMENTAL_SNAPSHOT_MANIFEST_EXTERNAL: {
FD_LOG_NOTICE(( "Received decoded global snapshot manifest message" ));
/* We may either receive a full snapshot manifest or an
incremental snapshot manifest. Note that this external message
id is only used temporarily because replay cannot yet receive
Expand Down
127 changes: 82 additions & 45 deletions src/discof/restore/fd_snapin_tile.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,20 @@ struct fd_snapin_tile {
manifest. */
fd_lthash_value_t lthash;

/* stem pointer for snapshot parser callbacks */
fd_stem_context_t * stem;

/* A shared dcache object between snapin and replay that holds the
decoded solana manifest.
TODO: remove when replay can receive the snapshot manifest. */
uchar * replay_manifest_dcache;
ulong replay_manifest_dcache_obj_id;

/* TODO: remove when replay can receive the snapshot manifest. */
ulong manifest_sz;
struct {
fd_wksp_t * wksp;
uchar * dcache;
ulong chunk0;
ulong wmark;
ulong chunk;
ulong obj_id;
} replay_manifest_dcache;

struct {
fd_snapshot_parser_metrics_t full;
Expand Down Expand Up @@ -106,10 +112,53 @@ metrics_write( fd_snapin_tile_t * ctx ) {
}

static void
save_manifest( fd_snapshot_parser_t * parser,
void * _ctx,
fd_solana_manifest_global_t * manifest,
ulong manifest_sz ) {
send_manifest( fd_snapin_tile_t * ctx,
ulong manifest_sz ) {
ulong sig = 0UL;
ulong external_sig = 0UL;
if( ctx->full ) {
sig = FD_FULL_SNAPSHOT_MANIFEST;
external_sig = FD_FULL_SNAPSHOT_MANIFEST_EXTERNAL;
} else {
sig = FD_INCREMENTAL_SNAPSHOT_MANIFEST;
external_sig = FD_INCREMENTAL_SNAPSHOT_MANIFEST_EXTERNAL;
}

/* Send snapshot manifest message over snap_out link */
fd_stem_publish( ctx->stem,
0UL,
sig,
ctx->manifest_out.chunk,
sizeof(fd_snapshot_manifest_t),
0UL,
0UL,
0UL );
ctx->manifest_out.chunk = fd_dcache_compact_next( ctx->manifest_out.chunk,
sizeof(fd_snapshot_manifest_t),
ctx->manifest_out.chunk0,
ctx->manifest_out.wmark );

/* send manifest over replay manifest dcache */
fd_stem_publish( ctx->stem,
0UL,
external_sig,
ctx->replay_manifest_dcache.chunk,
manifest_sz,
0UL,
ctx->replay_manifest_dcache.obj_id,
0UL );
ctx->replay_manifest_dcache.chunk = fd_dcache_compact_next( ctx->replay_manifest_dcache.chunk,
manifest_sz,
ctx->replay_manifest_dcache.chunk0,
ctx->replay_manifest_dcache.wmark );
FD_TEST( ctx->replay_manifest_dcache.chunk <= ctx->replay_manifest_dcache.wmark );
}

static void
handle_manifest( fd_snapshot_parser_t * parser,
void * _ctx,
fd_solana_manifest_global_t * manifest,
ulong manifest_sz ) {
(void)parser;
fd_snapin_tile_t * ctx = _ctx;

Expand All @@ -119,8 +168,13 @@ save_manifest( fd_snapshot_parser_t * parser,
FD_LOG_NOTICE(( "Snapshot manifest loaded for slot %lu", ssmanifest->slot ));

/* Send decoded manifest to replay */
fd_memcpy( ctx->replay_manifest_dcache, manifest, manifest_sz );
ctx->manifest_sz = manifest_sz;
uchar * next_dcache_mem = fd_chunk_to_laddr( ctx->replay_manifest_dcache.wksp,
ctx->replay_manifest_dcache.chunk );
fd_memcpy( next_dcache_mem,
manifest,
manifest_sz );

send_manifest( ctx, manifest_sz );
}

static int
Expand Down Expand Up @@ -177,10 +231,11 @@ snapshot_insert_account( fd_snapshot_parser_t * parser,
}

static void
snapshot_copy_acc_data( fd_snapshot_parser_t * parser FD_PARAM_UNUSED,
snapshot_copy_acc_data( fd_snapshot_parser_t * parser,
void * _ctx,
uchar const * buf,
ulong data_sz ) {
(void)parser;
fd_snapin_tile_t * ctx = fd_type_pun( _ctx );

if( ctx->acc_data ) {
Expand All @@ -190,8 +245,9 @@ snapshot_copy_acc_data( fd_snapshot_parser_t * parser FD_PARAM_UNUSED,
}

static void
snapshot_reset_acc_data( fd_snapshot_parser_t * parser FD_PARAM_UNUSED,
snapshot_reset_acc_data( fd_snapshot_parser_t * parser,
void * _ctx ) {
(void)parser;
Comment on lines -193 to +250
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why FD_PARAM_UNUSED to (void)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just thought being consistent with using (void) was better. let me know if you disagree

fd_snapin_tile_t * ctx = fd_type_pun( _ctx );
ctx->acc_data = NULL;
}
Expand All @@ -214,29 +270,6 @@ hard_reset_funk( fd_snapin_tile_t * ctx ) {
/* TODO: Assert that hard reset suceeded */
}

static void
send_manifest( fd_snapin_tile_t * ctx,
fd_stem_context_t * stem ) {
/* Assumes the manifest is already mem copied into the snap_out
dcache and the replay_manifest_dcache from the save_manifest
callback. */
FD_TEST( ctx->manifest_sz );

ulong sig = ctx->full ? FD_FULL_SNAPSHOT_MANIFEST : FD_INCREMENTAL_SNAPSHOT_MANIFEST;
ulong external_sig = ctx->full ? FD_FULL_SNAPSHOT_MANIFEST_EXTERNAL : FD_INCREMENTAL_SNAPSHOT_MANIFEST_EXTERNAL;

/* Send snapshot manifest message over snap_out link */
fd_stem_publish( stem, 0UL, sig, ctx->manifest_out.chunk, sizeof(fd_snapshot_manifest_t), 0UL, 0UL, 0UL );
ctx->manifest_out.chunk = fd_dcache_compact_next( ctx->manifest_out.chunk,
sizeof(fd_snapshot_manifest_t),
ctx->manifest_out.chunk0,
ctx->manifest_out.wmark );

/* send manifest over replay manifest dcache */
ulong chunk = fd_dcache_compact_chunk0( fd_wksp_containing( ctx->replay_manifest_dcache ), ctx->replay_manifest_dcache );
fd_stem_publish( stem, 0UL, external_sig, chunk, ctx->manifest_sz, 0UL, ctx->replay_manifest_dcache_obj_id, 0UL );
}

static void
transition_malformed( fd_snapin_tile_t * ctx,
fd_stem_context_t * stem ) {
Expand All @@ -259,6 +292,8 @@ handle_data_frag( fd_snapin_tile_t * ctx,
return;
}

ctx->stem = stem;

uchar const * const chunk_start = fd_chunk_to_laddr_const( ctx->in.wksp, chunk );
uchar const * const chunk_end = chunk_start + sz;
uchar const * cur = chunk_start;
Expand Down Expand Up @@ -312,10 +347,6 @@ handle_control_frag( fd_snapin_tile_t * ctx,
/* Publish any outstanding funk txn. */
if( FD_LIKELY( ctx->funk_txn ) ) fd_funk_txn_publish_into_parent( ctx->funk, ctx->funk_txn, 0 );

/* Once the snapshot is fully loaded, we can send the manifest
message over. */
send_manifest( ctx, stem );

/* Notify consumers of manifest out that the snapshot is fully
loaded. */
fd_stem_publish( stem, 0UL, FD_SNAPSHOT_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
Expand Down Expand Up @@ -377,7 +408,7 @@ unprivileged_init( fd_topo_t * topo,

fd_snapshot_parser_process_manifest_fn_t manifest_cb = NULL;
if( 0==strcmp( topo->links[tile->out_link_id[ 0UL ]].name, "snap_out" ) ) {
manifest_cb = save_manifest;
manifest_cb = handle_manifest;
}

ctx->parser = fd_snapshot_parser_new( parser_mem,
Expand All @@ -395,10 +426,6 @@ unprivileged_init( fd_topo_t * topo,

fd_memset( &ctx->metrics, 0, sizeof(ctx->metrics) );

ctx->replay_manifest_dcache = fd_topo_obj_laddr( topo, tile->snapin.manifest_dcache_obj_id );
ctx->replay_manifest_dcache_obj_id = tile->snapin.manifest_dcache_obj_id;
ctx->manifest_sz = 0UL;

if( FD_UNLIKELY( tile->kind_id ) ) FD_LOG_ERR(( "There can only be one `" NAME "` tile" ));
if( FD_UNLIKELY( tile->in_cnt!=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected 1", tile->in_cnt ));
if( FD_UNLIKELY( tile->out_cnt!=2UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 2", tile->out_cnt ));
Expand All @@ -409,6 +436,16 @@ unprivileged_init( fd_topo_t * topo,
ctx->manifest_out.wmark = fd_dcache_compact_wmark ( ctx->manifest_out.wksp, writer_link->dcache, writer_link->mtu );
ctx->manifest_out.chunk = ctx->manifest_out.chunk0;

/* join replay manifest dcache */
ctx->replay_manifest_dcache.dcache = fd_topo_obj_laddr( topo, tile->snapin.manifest_dcache_obj_id );
ctx->replay_manifest_dcache.wksp = fd_wksp_containing( ctx->replay_manifest_dcache.dcache );
ctx->replay_manifest_dcache.obj_id = tile->snapin.manifest_dcache_obj_id;
ctx->replay_manifest_dcache.chunk0 = fd_dcache_compact_chunk0( ctx->replay_manifest_dcache.wksp, ctx->replay_manifest_dcache.dcache );
ctx->replay_manifest_dcache.chunk = ctx->replay_manifest_dcache.chunk0;
ctx->replay_manifest_dcache.wmark = fd_dcache_compact_wmark( ctx->replay_manifest_dcache.wksp,
ctx->replay_manifest_dcache.dcache,
writer_link->mtu);

fd_topo_link_t const * in_link = &topo->links[ tile->in_link_id[ 0UL ] ];
fd_topo_wksp_t const * in_wksp = &topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ];
ctx->in.wksp = in_wksp->wksp;;
Expand Down
19 changes: 6 additions & 13 deletions src/discof/restore/utils/fd_snapshot_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,8 @@ fd_snapshot_parser_hdr_read_is_complete( fd_snapshot_parser_t const * self ) {

static uchar const *
fd_snapshot_parser_read_buffered( fd_snapshot_parser_t * self,
uchar const * buf,
ulong bufsz ) {
uchar const * buf,
ulong bufsz ) {
/* Should not be called if read is complete */
FD_TEST( self->buf_ctr < self->buf_sz );

Expand Down Expand Up @@ -414,8 +414,8 @@ fd_snapshot_parser_restore_account_hdr( fd_snapshot_parser_t * self ) {

static uchar const *
fd_snapshot_parser_read_account_hdr_chunk( fd_snapshot_parser_t * self,
uchar const * buf,
ulong bufsz ) {
uchar const * buf,
ulong bufsz ) {
if( !self->accv_sz ) {
/* Reached end of AppendVec */
self->state = SNAP_STATE_IGNORE;
Expand All @@ -429,25 +429,18 @@ fd_snapshot_parser_read_account_hdr_chunk( fd_snapshot_parser_t * self,
self->accv_sz -= hdr_read;
bufsz -= hdr_read;

// ulong peek_sz = 0UL;
if( FD_LIKELY( fd_snapshot_parser_hdr_read_is_complete( self ) ) ) {
if( FD_UNLIKELY( 0!=fd_snapshot_parser_restore_account_hdr( self ) ) ) {
return buf; /* parse error */
}
// peek_sz = fd_ulong_min( self->acc_rem, bufsz );
}

// self->acc_rem -= peek_sz;
// self->accv_sz -= peek_sz;
// buf_next += peek_sz;

return buf_next;
}

static uchar const *
fd_snapshot_parser_read_account_chunk( fd_snapshot_parser_t * self,
uchar const * buf,
ulong bufsz ) {
uchar const * buf,
ulong bufsz ) {

ulong chunk_sz = fd_ulong_min( self->acc_rem, bufsz );
if( FD_UNLIKELY( chunk_sz > self->accv_sz ) )
Expand Down
2 changes: 1 addition & 1 deletion src/discof/restore/utils/fd_snapshot_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ fd_snapshot_parser_reset( fd_snapshot_parser_t * self ) {
}

static inline fd_snapshot_parser_t *
fd_snapshot_parser_new( void * mem,
fd_snapshot_parser_new( void * mem,
fd_snapshot_parser_process_manifest_fn_t manifest_cb,
fd_snapshot_process_acc_hdr_fn_t acc_hdr_cb,
fd_snapshot_process_acc_data_fn_t acc_data_cb,
Expand Down
12 changes: 11 additions & 1 deletion src/flamenco/runtime/context/fd_exec_slot_ctx.c
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,18 @@ fd_exec_slot_ctx_t *
fd_exec_slot_ctx_recover( fd_exec_slot_ctx_t * slot_ctx,
fd_solana_manifest_global_t const * manifest,
fd_spad_t * runtime_spad ) {
fd_bank_t * bank = fd_banks_get_bank( slot_ctx->banks, manifest->bank.slot );
if( FD_UNLIKELY( !!bank ) ) {
/* If the bank already exists, that means that we have already
restored the bank for this slot. Clear the bank to restore the new
manifest. */
fd_banks_clear_bank( slot_ctx->banks, bank );
} else {
bank = fd_banks_clone_from_parent( slot_ctx->banks, manifest->bank.slot, 0UL );
}

slot_ctx->bank = bank;

slot_ctx->bank = fd_banks_clone_from_parent( slot_ctx->banks, manifest->bank.slot, 0UL );
if( FD_UNLIKELY( !slot_ctx->bank ) ) {
FD_LOG_CRIT(( "fd_banks_clone_from_parent failed" ));
}
Expand Down
1 change: 0 additions & 1 deletion src/flamenco/runtime/fd_bank.c
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,6 @@ fd_banks_get_bank( fd_banks_t * banks, ulong slot ) {
bank = fd_banks_map_ele_query( bank_map, &slot, NULL, bank_pool );
if( FD_UNLIKELY( !bank ) ) {
FD_LOG_WARNING(( "Failed to get bank" ));
return NULL;
}

fd_rwlock_unread( &banks->rwlock );
Expand Down
Loading