Skip to content

Commit 2f7580b

Browse files
snapshots: send manifest
1 parent 0d8386f commit 2f7580b

File tree

8 files changed

+113
-63
lines changed

8 files changed

+113
-63
lines changed

src/app/firedancer-dev/commands/backtest.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ backtest_topo( config_t * config ) {
311311

312312
/* Replay decoded manifest dcache topo obj */
313313
fd_topo_obj_t * replay_manifest_dcache = fd_topob_obj( topo, "dcache", "replay_manif" );
314-
fd_pod_insertf_ulong( topo->props, 2UL << 30UL, "obj.%lu.data_sz", replay_manifest_dcache->id );
314+
fd_pod_insertf_ulong( topo->props, (4 * 1UL << 30UL /* gib */), "obj.%lu.data_sz", replay_manifest_dcache->id );
315315
fd_pod_insert_ulong( topo->props, "manifest_dcache", replay_manifest_dcache->id );
316316

317317
fd_topob_tile_uses( topo, snapin_tile, funk_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );

src/app/firedancer-dev/commands/snapshot_load.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ snapshot_load_topo( config_t * config,
4646
manifest */
4747
fd_topob_wksp( topo, "replay_manif" );
4848
fd_topo_obj_t * replay_manifest_dcache = fd_topob_obj( topo, "dcache", "replay_manif" );
49-
fd_pod_insertf_ulong( topo->props, 1UL << 30UL, "obj.%lu.data_sz", replay_manifest_dcache->id );
49+
fd_pod_insertf_ulong( topo->props, (4 * 1UL << 30UL /* gib */), "obj.%lu.data_sz", replay_manifest_dcache->id );
5050
fd_pod_insert_ulong( topo->props, "manifest_dcache", replay_manifest_dcache->id );
5151

5252
/* read() tile */

src/app/firedancer/topology.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ fd_topo_initialize( config_t * config ) {
381381

382382
/* Replay decoded manifest dcache topo obj */
383383
fd_topo_obj_t * replay_manifest_dcache = fd_topob_obj( topo, "dcache", "replay_manif" );
384-
fd_pod_insertf_ulong( topo->props, 2UL << 30UL, "obj.%lu.data_sz", replay_manifest_dcache->id );
384+
fd_pod_insertf_ulong( topo->props, (4 * 1UL << 30UL /* gib */), "obj.%lu.data_sz", replay_manifest_dcache->id );
385385
fd_pod_insert_ulong( topo->props, "manifest_dcache", replay_manifest_dcache->id );
386386

387387
ushort parsed_tile_to_cpu[ FD_TILE_MAX ];

src/discof/replay/fd_replay_tile.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,15 @@ restore_slot_ctx( fd_replay_tile_ctx_t * ctx,
656656

657657
fd_solana_manifest_global_t * manifest_global
658658
= (fd_solana_manifest_global_t *)fd_chunk_to_laddr( fd_wksp_containing( ctx->manifest_dcache ), chunk );
659+
660+
/* If the bank already exists, that means that we have already
661+
restored the bank for this slot. */
662+
fd_bank_t * bank = fd_banks_get_bank( ctx->slot_ctx->banks, manifest_global->bank.slot );
663+
if( FD_UNLIKELY( !!bank ) ) {
664+
FD_LOG_NOTICE(( "The bank for slot %lu already exists, skipping slot context restore", manifest_global->bank.slot ));
665+
return;
666+
}
667+
659668
fd_exec_slot_ctx_t * recovered_slot_ctx = fd_exec_slot_ctx_recover( ctx->slot_ctx,
660669
manifest_global,
661670
ctx->runtime_spad );
@@ -985,6 +994,7 @@ on_snapshot_message( fd_replay_tile_ctx_t * ctx,
985994
}
986995
case FD_FULL_SNAPSHOT_MANIFEST_EXTERNAL:
987996
case FD_INCREMENTAL_SNAPSHOT_MANIFEST_EXTERNAL: {
997+
FD_LOG_NOTICE(( "Received decoded global snapshot manifest message" ));
988998
/* We may either receive a full snapshot manifest or an
989999
incremental snapshot manifest. Note that this external message
9901000
id is only used temporarily because replay cannot yet receive

src/discof/restore/fd_snapin_tile.c

Lines changed: 82 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,20 @@ struct fd_snapin_tile {
3838
manifest. */
3939
fd_lthash_value_t lthash;
4040

41+
/* stem pointer for snapshot parser callbacks */
42+
fd_stem_context_t * stem;
43+
4144
/* A shared dcache object between snapin and replay that holds the
4245
decoded solana manifest.
4346
TODO: remove when replay can receive the snapshot manifest. */
44-
uchar * replay_manifest_dcache;
45-
ulong replay_manifest_dcache_obj_id;
46-
47-
/* TODO: remove when replay can receive the snapshot manifest. */
48-
ulong manifest_sz;
47+
struct {
48+
fd_wksp_t * wksp;
49+
uchar * dcache;
50+
ulong chunk0;
51+
ulong wmark;
52+
ulong chunk;
53+
ulong obj_id;
54+
} replay_manifest_dcache;
4955

5056
struct {
5157
fd_snapshot_parser_metrics_t full;
@@ -106,10 +112,53 @@ metrics_write( fd_snapin_tile_t * ctx ) {
106112
}
107113

108114
static void
109-
save_manifest( fd_snapshot_parser_t * parser,
110-
void * _ctx,
111-
fd_solana_manifest_global_t * manifest,
112-
ulong manifest_sz ) {
115+
send_manifest( fd_snapin_tile_t * ctx,
116+
ulong manifest_sz ) {
117+
ulong sig = 0UL;
118+
ulong external_sig = 0UL;
119+
if( ctx->full ) {
120+
sig = FD_FULL_SNAPSHOT_MANIFEST;
121+
external_sig = FD_FULL_SNAPSHOT_MANIFEST_EXTERNAL;
122+
} else {
123+
sig = FD_INCREMENTAL_SNAPSHOT_MANIFEST;
124+
external_sig = FD_INCREMENTAL_SNAPSHOT_MANIFEST_EXTERNAL;
125+
}
126+
127+
/* Send snapshot manifest message over snap_out link */
128+
fd_stem_publish( ctx->stem,
129+
0UL,
130+
sig,
131+
ctx->manifest_out.chunk,
132+
sizeof(fd_snapshot_manifest_t),
133+
0UL,
134+
0UL,
135+
0UL );
136+
ctx->manifest_out.chunk = fd_dcache_compact_next( ctx->manifest_out.chunk,
137+
sizeof(fd_snapshot_manifest_t),
138+
ctx->manifest_out.chunk0,
139+
ctx->manifest_out.wmark );
140+
141+
/* send manifest over replay manifest dcache */
142+
fd_stem_publish( ctx->stem,
143+
0UL,
144+
external_sig,
145+
ctx->replay_manifest_dcache.chunk,
146+
manifest_sz,
147+
0UL,
148+
ctx->replay_manifest_dcache.obj_id,
149+
0UL );
150+
ctx->replay_manifest_dcache.chunk = fd_dcache_compact_next( ctx->replay_manifest_dcache.chunk,
151+
manifest_sz,
152+
ctx->replay_manifest_dcache.chunk0,
153+
ctx->replay_manifest_dcache.wmark );
154+
FD_TEST( ctx->replay_manifest_dcache.chunk <= ctx->replay_manifest_dcache.wmark );
155+
}
156+
157+
static void
158+
handle_manifest( fd_snapshot_parser_t * parser,
159+
void * _ctx,
160+
fd_solana_manifest_global_t * manifest,
161+
ulong manifest_sz ) {
113162
(void)parser;
114163
fd_snapin_tile_t * ctx = _ctx;
115164

@@ -119,8 +168,13 @@ save_manifest( fd_snapshot_parser_t * parser,
119168
FD_LOG_NOTICE(( "Snapshot manifest loaded for slot %lu", ssmanifest->slot ));
120169

121170
/* Send decoded manifest to replay */
122-
fd_memcpy( ctx->replay_manifest_dcache, manifest, manifest_sz );
123-
ctx->manifest_sz = manifest_sz;
171+
uchar * next_dcache_mem = fd_chunk_to_laddr( ctx->replay_manifest_dcache.wksp,
172+
ctx->replay_manifest_dcache.chunk );
173+
fd_memcpy( next_dcache_mem,
174+
manifest,
175+
manifest_sz );
176+
177+
send_manifest( ctx, manifest_sz );
124178
}
125179

126180
static int
@@ -177,10 +231,11 @@ snapshot_insert_account( fd_snapshot_parser_t * parser,
177231
}
178232

179233
static void
180-
snapshot_copy_acc_data( fd_snapshot_parser_t * parser FD_PARAM_UNUSED,
234+
snapshot_copy_acc_data( fd_snapshot_parser_t * parser,
181235
void * _ctx,
182236
uchar const * buf,
183237
ulong data_sz ) {
238+
(void)parser;
184239
fd_snapin_tile_t * ctx = fd_type_pun( _ctx );
185240

186241
if( ctx->acc_data ) {
@@ -190,8 +245,9 @@ snapshot_copy_acc_data( fd_snapshot_parser_t * parser FD_PARAM_UNUSED,
190245
}
191246

192247
static void
193-
snapshot_reset_acc_data( fd_snapshot_parser_t * parser FD_PARAM_UNUSED,
248+
snapshot_reset_acc_data( fd_snapshot_parser_t * parser,
194249
void * _ctx ) {
250+
(void)parser;
195251
fd_snapin_tile_t * ctx = fd_type_pun( _ctx );
196252
ctx->acc_data = NULL;
197253
}
@@ -214,29 +270,6 @@ hard_reset_funk( fd_snapin_tile_t * ctx ) {
214270
/* TODO: Assert that hard reset suceeded */
215271
}
216272

217-
static void
218-
send_manifest( fd_snapin_tile_t * ctx,
219-
fd_stem_context_t * stem ) {
220-
/* Assumes the manifest is already mem copied into the snap_out
221-
dcache and the replay_manifest_dcache from the save_manifest
222-
callback. */
223-
FD_TEST( ctx->manifest_sz );
224-
225-
ulong sig = ctx->full ? FD_FULL_SNAPSHOT_MANIFEST : FD_INCREMENTAL_SNAPSHOT_MANIFEST;
226-
ulong external_sig = ctx->full ? FD_FULL_SNAPSHOT_MANIFEST_EXTERNAL : FD_INCREMENTAL_SNAPSHOT_MANIFEST_EXTERNAL;
227-
228-
/* Send snapshot manifest message over snap_out link */
229-
fd_stem_publish( stem, 0UL, sig, ctx->manifest_out.chunk, sizeof(fd_snapshot_manifest_t), 0UL, 0UL, 0UL );
230-
ctx->manifest_out.chunk = fd_dcache_compact_next( ctx->manifest_out.chunk,
231-
sizeof(fd_snapshot_manifest_t),
232-
ctx->manifest_out.chunk0,
233-
ctx->manifest_out.wmark );
234-
235-
/* send manifest over replay manifest dcache */
236-
ulong chunk = fd_dcache_compact_chunk0( fd_wksp_containing( ctx->replay_manifest_dcache ), ctx->replay_manifest_dcache );
237-
fd_stem_publish( stem, 0UL, external_sig, chunk, ctx->manifest_sz, 0UL, ctx->replay_manifest_dcache_obj_id, 0UL );
238-
}
239-
240273
static void
241274
transition_malformed( fd_snapin_tile_t * ctx,
242275
fd_stem_context_t * stem ) {
@@ -259,6 +292,8 @@ handle_data_frag( fd_snapin_tile_t * ctx,
259292
return;
260293
}
261294

295+
ctx->stem = stem;
296+
262297
uchar const * const chunk_start = fd_chunk_to_laddr_const( ctx->in.wksp, chunk );
263298
uchar const * const chunk_end = chunk_start + sz;
264299
uchar const * cur = chunk_start;
@@ -312,10 +347,6 @@ handle_control_frag( fd_snapin_tile_t * ctx,
312347
/* Publish any outstanding funk txn. */
313348
if( FD_LIKELY( ctx->funk_txn ) ) fd_funk_txn_publish_into_parent( ctx->funk, ctx->funk_txn, 0 );
314349

315-
/* Once the snapshot is fully loaded, we can send the manifest
316-
message over. */
317-
send_manifest( ctx, stem );
318-
319350
/* Notify consumers of manifest out that the snapshot is fully
320351
loaded. */
321352
fd_stem_publish( stem, 0UL, FD_SNAPSHOT_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
@@ -377,7 +408,7 @@ unprivileged_init( fd_topo_t * topo,
377408

378409
fd_snapshot_parser_process_manifest_fn_t manifest_cb = NULL;
379410
if( 0==strcmp( topo->links[tile->out_link_id[ 0UL ]].name, "snap_out" ) ) {
380-
manifest_cb = save_manifest;
411+
manifest_cb = handle_manifest;
381412
}
382413

383414
ctx->parser = fd_snapshot_parser_new( parser_mem,
@@ -395,10 +426,6 @@ unprivileged_init( fd_topo_t * topo,
395426

396427
fd_memset( &ctx->metrics, 0, sizeof(ctx->metrics) );
397428

398-
ctx->replay_manifest_dcache = fd_topo_obj_laddr( topo, tile->snapin.manifest_dcache_obj_id );
399-
ctx->replay_manifest_dcache_obj_id = tile->snapin.manifest_dcache_obj_id;
400-
ctx->manifest_sz = 0UL;
401-
402429
if( FD_UNLIKELY( tile->kind_id ) ) FD_LOG_ERR(( "There can only be one `" NAME "` tile" ));
403430
if( FD_UNLIKELY( tile->in_cnt!=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected 1", tile->in_cnt ));
404431
if( FD_UNLIKELY( tile->out_cnt!=2UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 2", tile->out_cnt ));
@@ -409,6 +436,16 @@ unprivileged_init( fd_topo_t * topo,
409436
ctx->manifest_out.wmark = fd_dcache_compact_wmark ( ctx->manifest_out.wksp, writer_link->dcache, writer_link->mtu );
410437
ctx->manifest_out.chunk = ctx->manifest_out.chunk0;
411438

439+
/* join replay manifest dcache */
440+
ctx->replay_manifest_dcache.dcache = fd_topo_obj_laddr( topo, tile->snapin.manifest_dcache_obj_id );
441+
ctx->replay_manifest_dcache.wksp = fd_wksp_containing( ctx->replay_manifest_dcache.dcache );
442+
ctx->replay_manifest_dcache.obj_id = tile->snapin.manifest_dcache_obj_id;
443+
ctx->replay_manifest_dcache.chunk0 = fd_dcache_compact_chunk0( ctx->replay_manifest_dcache.wksp, ctx->replay_manifest_dcache.dcache );
444+
ctx->replay_manifest_dcache.chunk = ctx->replay_manifest_dcache.chunk0;
445+
ctx->replay_manifest_dcache.wmark = fd_dcache_compact_wmark( ctx->replay_manifest_dcache.wksp,
446+
ctx->replay_manifest_dcache.dcache,
447+
writer_link->mtu);
448+
412449
fd_topo_link_t const * in_link = &topo->links[ tile->in_link_id[ 0UL ] ];
413450
fd_topo_wksp_t const * in_wksp = &topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ];
414451
ctx->in.wksp = in_wksp->wksp;;

src/discof/restore/utils/fd_snapshot_parser.c

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -337,8 +337,8 @@ fd_snapshot_parser_hdr_read_is_complete( fd_snapshot_parser_t const * self ) {
337337

338338
static uchar const *
339339
fd_snapshot_parser_read_buffered( fd_snapshot_parser_t * self,
340-
uchar const * buf,
341-
ulong bufsz ) {
340+
uchar const * buf,
341+
ulong bufsz ) {
342342
/* Should not be called if read is complete */
343343
FD_TEST( self->buf_ctr < self->buf_sz );
344344

@@ -414,8 +414,8 @@ fd_snapshot_parser_restore_account_hdr( fd_snapshot_parser_t * self ) {
414414

415415
static uchar const *
416416
fd_snapshot_parser_read_account_hdr_chunk( fd_snapshot_parser_t * self,
417-
uchar const * buf,
418-
ulong bufsz ) {
417+
uchar const * buf,
418+
ulong bufsz ) {
419419
if( !self->accv_sz ) {
420420
/* Reached end of AppendVec */
421421
self->state = SNAP_STATE_IGNORE;
@@ -429,25 +429,18 @@ fd_snapshot_parser_read_account_hdr_chunk( fd_snapshot_parser_t * self,
429429
self->accv_sz -= hdr_read;
430430
bufsz -= hdr_read;
431431

432-
// ulong peek_sz = 0UL;
433432
if( FD_LIKELY( fd_snapshot_parser_hdr_read_is_complete( self ) ) ) {
434433
if( FD_UNLIKELY( 0!=fd_snapshot_parser_restore_account_hdr( self ) ) ) {
435434
return buf; /* parse error */
436435
}
437-
// peek_sz = fd_ulong_min( self->acc_rem, bufsz );
438436
}
439-
440-
// self->acc_rem -= peek_sz;
441-
// self->accv_sz -= peek_sz;
442-
// buf_next += peek_sz;
443-
444437
return buf_next;
445438
}
446439

447440
static uchar const *
448441
fd_snapshot_parser_read_account_chunk( fd_snapshot_parser_t * self,
449-
uchar const * buf,
450-
ulong bufsz ) {
442+
uchar const * buf,
443+
ulong bufsz ) {
451444

452445
ulong chunk_sz = fd_ulong_min( self->acc_rem, bufsz );
453446
if( FD_UNLIKELY( chunk_sz > self->accv_sz ) )

src/discof/restore/utils/fd_snapshot_parser.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ fd_snapshot_parser_reset( fd_snapshot_parser_t * self ) {
165165
}
166166

167167
static inline fd_snapshot_parser_t *
168-
fd_snapshot_parser_new( void * mem,
168+
fd_snapshot_parser_new( void * mem,
169169
fd_snapshot_parser_process_manifest_fn_t manifest_cb,
170170
fd_snapshot_process_acc_hdr_fn_t acc_hdr_cb,
171171
fd_snapshot_process_acc_data_fn_t acc_data_cb,

src/flamenco/runtime/context/fd_exec_slot_ctx.c

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,18 @@ fd_exec_slot_ctx_t *
168168
fd_exec_slot_ctx_recover( fd_exec_slot_ctx_t * slot_ctx,
169169
fd_solana_manifest_global_t const * manifest,
170170
fd_spad_t * runtime_spad ) {
171+
fd_bank_t * bank = fd_banks_get_bank( slot_ctx->banks, manifest->bank.slot );
172+
if( FD_UNLIKELY( !!bank ) ) {
173+
/* If the bank already exists, that means that we have already
174+
restored the bank for this slot. Clear the bank to restore the new
175+
manifest. */
176+
fd_banks_clear_bank( slot_ctx->banks, bank );
177+
} else {
178+
bank = fd_banks_clone_from_parent( slot_ctx->banks, manifest->bank.slot, 0UL );
179+
}
180+
181+
slot_ctx->bank = bank;
171182

172-
slot_ctx->bank = fd_banks_clone_from_parent( slot_ctx->banks, manifest->bank.slot, 0UL );
173183
if( FD_UNLIKELY( !slot_ctx->bank ) ) {
174184
FD_LOG_CRIT(( "fd_banks_clone_from_parent failed" ));
175185
}

0 commit comments

Comments
 (0)