@@ -82,11 +82,14 @@ struct fd_snapin_tile {
82
82
/* A shared dcache object between snapin and replay that holds the
83
83
decoded solana manifest.
84
84
TODO: remove when replay can receive the snapshot manifest. */
85
- uchar * replay_manifest_dcache ;
86
- ulong replay_manifest_dcache_obj_id ;
87
-
88
- /* TODO: remove when replay can receive the snapshot manifest. */
89
- ulong manifest_sz ;
85
+ struct {
86
+ fd_wksp_t * wksp ;
87
+ uchar * dcache ;
88
+ ulong chunk0 ;
89
+ ulong wmark ;
90
+ ulong chunk ;
91
+ ulong obj_id ;
92
+ } replay_manifest_dcache ;
90
93
91
94
int shutdown ;
92
95
@@ -117,13 +120,57 @@ fd_snapin_accumulate_metrics( fd_snapin_tile_t * ctx ) {
117
120
}
118
121
}
119
122
123
+ static void
124
+ send_manifest ( fd_snapin_tile_t * ctx ,
125
+ fd_stem_context_t * stem ,
126
+ ulong manifest_sz ) {
127
+ ulong sig = 0UL ;
128
+ ulong external_sig = 0UL ;
129
+ if ( ctx -> state == FD_SNAPIN_STATE_LOADING_FULL ) {
130
+ sig = FD_FULL_SNAPSHOT_MANIFEST ;
131
+ external_sig = FD_FULL_SNAPSHOT_MANIFEST_EXTERNAL ;
132
+ } else if ( ctx -> state == FD_SNAPIN_STATE_LOADING_INCREMENTAL ) {
133
+ sig = FD_INCREMENTAL_SNAPSHOT_MANIFEST ;
134
+ external_sig = FD_INCREMENTAL_SNAPSHOT_MANIFEST_EXTERNAL ;
135
+ }
136
+
137
+ /* Send snapshot manifest message over snap_out link */
138
+ fd_stem_publish ( stem ,
139
+ 0UL ,
140
+ sig ,
141
+ ctx -> manifest_out .chunk ,
142
+ sizeof (fd_snapshot_manifest_t ),
143
+ 0UL ,
144
+ 0UL ,
145
+ 0UL );
146
+ ctx -> manifest_out .chunk = fd_dcache_compact_next ( ctx -> manifest_out .chunk ,
147
+ sizeof (fd_snapshot_manifest_t ),
148
+ ctx -> manifest_out .chunk0 ,
149
+ ctx -> manifest_out .wmark );
150
+
151
+ /* send manifest over replay manifest dcache */
152
+ fd_stem_publish ( stem ,
153
+ 0UL ,
154
+ external_sig ,
155
+ ctx -> replay_manifest_dcache .chunk ,
156
+ manifest_sz ,
157
+ 0UL ,
158
+ ctx -> replay_manifest_dcache .obj_id ,
159
+ 0UL );
160
+ ctx -> replay_manifest_dcache .chunk = fd_dcache_compact_next ( ctx -> replay_manifest_dcache .chunk ,
161
+ manifest_sz ,
162
+ ctx -> replay_manifest_dcache .chunk0 ,
163
+ ctx -> replay_manifest_dcache .wmark );
164
+ }
165
+
120
166
/* Snapshot parser callbacks ******************************************/
121
167
122
168
static void
123
- save_manifest ( fd_snapshot_parser_t * parser ,
124
- void * _ctx ,
125
- fd_solana_manifest_global_t * manifest ,
126
- ulong manifest_sz ) {
169
+ handle_manifest ( fd_snapshot_parser_t * parser ,
170
+ void * _ctx ,
171
+ fd_stem_context_t * stem ,
172
+ fd_solana_manifest_global_t * manifest ,
173
+ ulong manifest_sz ) {
127
174
(void )parser ;
128
175
fd_snapin_tile_t * ctx = fd_type_pun ( _ctx );
129
176
@@ -135,10 +182,13 @@ save_manifest( fd_snapshot_parser_t * parser,
135
182
FD_LOG_NOTICE (( "Snapshot manifest loaded for slot %lu" , snapshot_manifest_mem -> slot ));
136
183
137
184
/* Send decoded manifest to replay */
138
- fd_memcpy ( ctx -> replay_manifest_dcache ,
185
+ uchar * next_dcache_mem = fd_chunk_to_laddr ( ctx -> replay_manifest_dcache .wksp ,
186
+ ctx -> replay_manifest_dcache .chunk );
187
+ fd_memcpy ( next_dcache_mem ,
139
188
manifest ,
140
189
manifest_sz );
141
- ctx -> manifest_sz = manifest_sz ;
190
+
191
+ send_manifest ( ctx , stem , manifest_sz );
142
192
}
143
193
144
194
static int
@@ -167,7 +217,9 @@ snapshot_is_duplicate_account( fd_snapshot_parser_t * parser,
167
217
static void
168
218
snapshot_insert_account ( fd_snapshot_parser_t * parser ,
169
219
fd_solana_account_hdr_t const * hdr ,
170
- void * _ctx ) {
220
+ void * _ctx ,
221
+ fd_stem_context_t * stem ) {
222
+ (void )stem ;
171
223
fd_snapin_tile_t * ctx = fd_type_pun ( _ctx );
172
224
fd_pubkey_t const * account_key = fd_type_pun_const ( hdr -> meta .pubkey );
173
225
@@ -195,10 +247,13 @@ snapshot_insert_account( fd_snapshot_parser_t * parser,
195
247
}
196
248
197
249
static void
198
- snapshot_copy_acc_data ( fd_snapshot_parser_t * parser FD_PARAM_UNUSED ,
250
+ snapshot_copy_acc_data ( fd_snapshot_parser_t * parser ,
199
251
void * _ctx ,
252
+ fd_stem_context_t * stem ,
200
253
uchar const * buf ,
201
254
ulong data_sz ) {
255
+ (void )parser ;
256
+ (void )stem ;
202
257
fd_snapin_tile_t * ctx = fd_type_pun ( _ctx );
203
258
204
259
if ( ctx -> acc_data ) {
@@ -208,8 +263,11 @@ snapshot_copy_acc_data( fd_snapshot_parser_t * parser FD_PARAM_UNUSED,
208
263
}
209
264
210
265
static void
211
- snapshot_reset_acc_data ( fd_snapshot_parser_t * parser FD_PARAM_UNUSED ,
212
- void * _ctx ) {
266
+ snapshot_reset_acc_data ( fd_snapshot_parser_t * parser ,
267
+ void * _ctx ,
268
+ fd_stem_context_t * stem ) {
269
+ (void )parser ;
270
+ (void )stem ;
213
271
fd_snapin_tile_t * ctx = fd_type_pun ( _ctx );
214
272
ctx -> acc_data = NULL ;
215
273
}
@@ -228,6 +286,7 @@ scratch_footprint( fd_topo_tile_t const * tile ) {
228
286
l = FD_LAYOUT_APPEND ( l , alignof(fd_snapin_tile_t ), sizeof (fd_snapin_tile_t ) );
229
287
l = FD_LAYOUT_APPEND ( l , fd_snapshot_parser_align (), fd_snapshot_parser_footprint () );
230
288
l = FD_LAYOUT_APPEND ( l , fd_scratch_smem_align (), fd_scratch_smem_footprint ( FD_SNAPIN_SCRATCH_MAX ) );
289
+ l = FD_LAYOUT_APPEND ( l , fd_scratch_fmem_align (), fd_scratch_fmem_footprint ( FD_SNAPIN_SCRATCH_DEPTH ) );
231
290
return FD_LAYOUT_FINI ( l , alignof(fd_snapin_tile_t ) );
232
291
}
233
292
@@ -245,7 +304,7 @@ unprivileged_init( fd_topo_t * topo,
245
304
246
305
fd_snapshot_parser_process_manifest_fn_t manifest_cb = NULL ;
247
306
if ( 0 == strcmp ( topo -> links [tile -> out_link_id [ MANIFEST_OUT_IDX ]].name , "snap_out" ) ) {
248
- manifest_cb = save_manifest ;
307
+ manifest_cb = handle_manifest ;
249
308
}
250
309
251
310
ctx -> shutdown = 0 ;
@@ -276,18 +335,23 @@ unprivileged_init( fd_topo_t * topo,
276
335
ctx -> metrics .incremental .accounts_processed = 0UL ;
277
336
ctx -> metrics .num_accounts_inserted = 0UL ;
278
337
279
- /* join replay manifest dcache */
280
- ctx -> replay_manifest_dcache = fd_topo_obj_laddr ( topo , tile -> snapin .manifest_dcache_obj_id );
281
- ctx -> replay_manifest_dcache_obj_id = tile -> snapin .manifest_dcache_obj_id ;
282
- ctx -> manifest_sz = 0UL ;
283
-
284
338
/* set up the manifest message producer */
285
339
fd_topo_link_t * writer_link = & topo -> links [ tile -> out_link_id [ MANIFEST_OUT_IDX ] ];
286
340
ctx -> manifest_out .wksp = topo -> workspaces [ topo -> objs [ writer_link -> dcache_obj_id ].wksp_id ].wksp ;
287
341
ctx -> manifest_out .chunk0 = fd_dcache_compact_chunk0 ( fd_wksp_containing ( writer_link -> dcache ), writer_link -> dcache );
288
342
ctx -> manifest_out .wmark = fd_dcache_compact_wmark ( ctx -> manifest_out .wksp , writer_link -> dcache , writer_link -> mtu );
289
343
ctx -> manifest_out .chunk = ctx -> manifest_out .chunk0 ;
290
344
345
+ /* join replay manifest dcache */
346
+ ctx -> replay_manifest_dcache .dcache = fd_topo_obj_laddr ( topo , tile -> snapin .manifest_dcache_obj_id );
347
+ ctx -> replay_manifest_dcache .wksp = fd_wksp_containing ( ctx -> replay_manifest_dcache .dcache );
348
+ ctx -> replay_manifest_dcache .obj_id = tile -> snapin .manifest_dcache_obj_id ;
349
+ ctx -> replay_manifest_dcache .chunk0 = fd_dcache_compact_chunk0 ( ctx -> replay_manifest_dcache .wksp , ctx -> replay_manifest_dcache .dcache );
350
+ ctx -> replay_manifest_dcache .chunk = ctx -> replay_manifest_dcache .chunk0 ;
351
+ ctx -> replay_manifest_dcache .wmark = fd_dcache_compact_wmark ( ctx -> replay_manifest_dcache .wksp ,
352
+ ctx -> replay_manifest_dcache .dcache ,
353
+ writer_link -> mtu );
354
+
291
355
/* set up in link */
292
356
fd_topo_link_t const * in_link = & topo -> links [ tile -> in_link_id [ SNAPSHOT_IN_LINK_IDX ] ];
293
357
fd_topo_wksp_t const * in_wksp = & topo -> workspaces [ topo -> objs [ in_link -> dcache_obj_id ].wksp_id ];
@@ -332,50 +396,6 @@ hard_reset_funk( fd_snapin_tile_t * ctx ) {
332
396
/* TODO: Assert that hard reset suceeded */
333
397
}
334
398
335
- static void
336
- send_manifest ( fd_snapin_tile_t * ctx ,
337
- fd_stem_context_t * stem ) {
338
- /* Assumes the manifest is already mem copied into the snap_out
339
- dcache and the replay_manifest_dcache from the save_manifest
340
- callback. */
341
- FD_TEST ( ctx -> manifest_sz );
342
-
343
- ulong sig = 0UL ;
344
- ulong external_sig = 0UL ;
345
- if ( ctx -> state == FD_SNAPIN_STATE_LOADING_FULL ) {
346
- sig = FD_FULL_SNAPSHOT_MANIFEST ;
347
- external_sig = FD_FULL_SNAPSHOT_MANIFEST_EXTERNAL ;
348
- } else if ( ctx -> state == FD_SNAPIN_STATE_LOADING_INCREMENTAL ) {
349
- sig = FD_INCREMENTAL_SNAPSHOT_MANIFEST ;
350
- external_sig = FD_INCREMENTAL_SNAPSHOT_MANIFEST_EXTERNAL ;
351
- }
352
-
353
- /* Send snapshot manifest message over snap_out link */
354
- fd_stem_publish ( stem ,
355
- 0UL ,
356
- sig ,
357
- ctx -> manifest_out .chunk ,
358
- sizeof (fd_snapshot_manifest_t ),
359
- 0UL ,
360
- 0UL ,
361
- 0UL );
362
- ctx -> manifest_out .chunk = fd_dcache_compact_next ( ctx -> manifest_out .chunk ,
363
- sizeof (fd_snapshot_manifest_t ),
364
- ctx -> manifest_out .chunk0 ,
365
- ctx -> manifest_out .wmark );
366
-
367
- /* send manifest over replay manifest dcache */
368
- ulong chunk = fd_dcache_compact_chunk0 ( fd_wksp_containing ( ctx -> replay_manifest_dcache ), ctx -> replay_manifest_dcache );
369
- fd_stem_publish ( stem ,
370
- 0UL ,
371
- external_sig ,
372
- chunk ,
373
- ctx -> manifest_sz ,
374
- 0UL ,
375
- ctx -> replay_manifest_dcache_obj_id ,
376
- 0UL );
377
- }
378
-
379
399
static void
380
400
handle_control_frag ( fd_snapin_tile_t * ctx ,
381
401
fd_stem_context_t * stem ,
@@ -389,10 +409,6 @@ handle_control_frag( fd_snapin_tile_t * ctx,
389
409
0 );
390
410
}
391
411
392
- /* Once the snapshot is fully loaded, we can send the manifest
393
- message over. */
394
- send_manifest ( ctx , stem );
395
-
396
412
/* Notify consumers of manifest out that the snapshot is fully
397
413
loaded. */
398
414
fd_stem_publish ( stem ,
@@ -442,9 +458,10 @@ handle_control_frag( fd_snapin_tile_t * ctx,
442
458
}
443
459
444
460
static void
445
- handle_data_frag ( fd_snapin_tile_t * ctx ,
446
- ulong chunk ,
447
- ulong sz ) {
461
+ handle_data_frag ( fd_snapin_tile_t * ctx ,
462
+ ulong chunk ,
463
+ ulong sz ,
464
+ fd_stem_context_t * stem ) {
448
465
FD_TEST ( ctx -> state == FD_SNAPIN_STATE_LOADING_FULL ||
449
466
ctx -> state == FD_SNAPIN_STATE_LOADING_INCREMENTAL );
450
467
FD_TEST ( chunk >=ctx -> in .chunk0 && chunk <=ctx -> in .wmark );
@@ -465,7 +482,8 @@ handle_data_frag( fd_snapin_tile_t * ctx,
465
482
}
466
483
cur = fd_snapshot_parser_process_chunk ( ctx -> parser ,
467
484
cur ,
468
- (ulong )( chunk_end - cur ) );
485
+ (ulong )( chunk_end - cur ),
486
+ stem );
469
487
if ( FD_UNLIKELY ( ctx -> parser -> flags ) ) {
470
488
if ( FD_UNLIKELY ( ctx -> parser -> flags & SNAP_FLAG_FAILED ) ) {
471
489
/* abort app if parser failed */
@@ -517,7 +535,7 @@ after_frag( fd_snapin_tile_t * ctx,
517
535
518
536
/* handle frag */
519
537
if ( FD_UNLIKELY ( sz == 0 ) ) handle_control_frag ( ctx , stem , sig );
520
- else handle_data_frag ( ctx , ctx -> in ._chunk , sz );
538
+ else handle_data_frag ( ctx , ctx -> in ._chunk , sz , stem );
521
539
}
522
540
523
541
#define STEM_BURST 1UL
0 commit comments