@@ -82,11 +82,14 @@ struct fd_snapin_tile {
82
82
/* A shared dcache object between snapin and replay that holds the
83
83
decoded solana manifest.
84
84
TODO: remove when replay can receive the snapshot manifest. */
85
- uchar * replay_manifest_dcache ;
86
- ulong replay_manifest_dcache_obj_id ;
87
-
88
- /* TODO: remove when replay can receive the snapshot manifest. */
89
- ulong manifest_sz ;
85
+ struct {
86
+ fd_wksp_t * wksp ;
87
+ uchar * dcache ;
88
+ ulong chunk0 ;
89
+ ulong wmark ;
90
+ ulong chunk ;
91
+ ulong obj_id ;
92
+ } replay_manifest_dcache ;
90
93
91
94
int shutdown ;
92
95
@@ -117,13 +120,58 @@ fd_snapin_accumulate_metrics( fd_snapin_tile_t * ctx ) {
117
120
}
118
121
}
119
122
123
+ static void
124
+ send_manifest ( fd_snapin_tile_t * ctx ,
125
+ fd_stem_context_t * stem ,
126
+ ulong manifest_sz ) {
127
+ ulong sig = 0UL ;
128
+ ulong external_sig = 0UL ;
129
+ if ( ctx -> state == FD_SNAPIN_STATE_LOADING_FULL ) {
130
+ sig = FD_FULL_SNAPSHOT_MANIFEST ;
131
+ external_sig = FD_FULL_SNAPSHOT_MANIFEST_EXTERNAL ;
132
+ } else if ( ctx -> state == FD_SNAPIN_STATE_LOADING_INCREMENTAL ) {
133
+ sig = FD_INCREMENTAL_SNAPSHOT_MANIFEST ;
134
+ external_sig = FD_INCREMENTAL_SNAPSHOT_MANIFEST_EXTERNAL ;
135
+ }
136
+
137
+ /* Send snapshot manifest message over snap_out link */
138
+ fd_stem_publish ( stem ,
139
+ 0UL ,
140
+ sig ,
141
+ ctx -> manifest_out .chunk ,
142
+ sizeof (fd_snapshot_manifest_t ),
143
+ 0UL ,
144
+ 0UL ,
145
+ 0UL );
146
+ ctx -> manifest_out .chunk = fd_dcache_compact_next ( ctx -> manifest_out .chunk ,
147
+ sizeof (fd_snapshot_manifest_t ),
148
+ ctx -> manifest_out .chunk0 ,
149
+ ctx -> manifest_out .wmark );
150
+
151
+ /* send manifest over replay manifest dcache */
152
+ fd_stem_publish ( stem ,
153
+ 0UL ,
154
+ external_sig ,
155
+ ctx -> replay_manifest_dcache .chunk ,
156
+ manifest_sz ,
157
+ 0UL ,
158
+ ctx -> replay_manifest_dcache .obj_id ,
159
+ 0UL );
160
+ ctx -> replay_manifest_dcache .chunk = fd_dcache_compact_next ( ctx -> replay_manifest_dcache .chunk ,
161
+ manifest_sz ,
162
+ ctx -> replay_manifest_dcache .chunk0 ,
163
+ ctx -> replay_manifest_dcache .wmark );
164
+ FD_TEST ( ctx -> replay_manifest_dcache .chunk <= ctx -> replay_manifest_dcache .wmark );
165
+ }
166
+
120
167
/* Snapshot parser callbacks ******************************************/
121
168
122
169
static void
123
- save_manifest ( fd_snapshot_parser_t * parser ,
124
- void * _ctx ,
125
- fd_solana_manifest_global_t * manifest ,
126
- ulong manifest_sz ) {
170
+ handle_manifest ( fd_snapshot_parser_t * parser ,
171
+ void * _ctx ,
172
+ fd_stem_context_t * stem ,
173
+ fd_solana_manifest_global_t * manifest ,
174
+ ulong manifest_sz ) {
127
175
(void )parser ;
128
176
fd_snapin_tile_t * ctx = fd_type_pun ( _ctx );
129
177
@@ -135,10 +183,13 @@ save_manifest( fd_snapshot_parser_t * parser,
135
183
FD_LOG_NOTICE (( "Snapshot manifest loaded for slot %lu" , snapshot_manifest_mem -> slot ));
136
184
137
185
/* Send decoded manifest to replay */
138
- fd_memcpy ( ctx -> replay_manifest_dcache ,
186
+ uchar * next_dcache_mem = fd_chunk_to_laddr ( ctx -> replay_manifest_dcache .wksp ,
187
+ ctx -> replay_manifest_dcache .chunk );
188
+ fd_memcpy ( next_dcache_mem ,
139
189
manifest ,
140
190
manifest_sz );
141
- ctx -> manifest_sz = manifest_sz ;
191
+
192
+ send_manifest ( ctx , stem , manifest_sz );
142
193
}
143
194
144
195
static int
@@ -167,7 +218,9 @@ snapshot_is_duplicate_account( fd_snapshot_parser_t * parser,
167
218
static void
168
219
snapshot_insert_account ( fd_snapshot_parser_t * parser ,
169
220
fd_solana_account_hdr_t const * hdr ,
170
- void * _ctx ) {
221
+ void * _ctx ,
222
+ fd_stem_context_t * stem ) {
223
+ (void )stem ;
171
224
fd_snapin_tile_t * ctx = fd_type_pun ( _ctx );
172
225
fd_pubkey_t const * account_key = fd_type_pun_const ( hdr -> meta .pubkey );
173
226
@@ -195,10 +248,13 @@ snapshot_insert_account( fd_snapshot_parser_t * parser,
195
248
}
196
249
197
250
static void
198
- snapshot_copy_acc_data ( fd_snapshot_parser_t * parser FD_PARAM_UNUSED ,
251
+ snapshot_copy_acc_data ( fd_snapshot_parser_t * parser ,
199
252
void * _ctx ,
253
+ fd_stem_context_t * stem ,
200
254
uchar const * buf ,
201
255
ulong data_sz ) {
256
+ (void )parser ;
257
+ (void )stem ;
202
258
fd_snapin_tile_t * ctx = fd_type_pun ( _ctx );
203
259
204
260
if ( ctx -> acc_data ) {
@@ -208,8 +264,11 @@ snapshot_copy_acc_data( fd_snapshot_parser_t * parser FD_PARAM_UNUSED,
208
264
}
209
265
210
266
static void
211
- snapshot_reset_acc_data ( fd_snapshot_parser_t * parser FD_PARAM_UNUSED ,
212
- void * _ctx ) {
267
+ snapshot_reset_acc_data ( fd_snapshot_parser_t * parser ,
268
+ void * _ctx ,
269
+ fd_stem_context_t * stem ) {
270
+ (void )parser ;
271
+ (void )stem ;
213
272
fd_snapin_tile_t * ctx = fd_type_pun ( _ctx );
214
273
ctx -> acc_data = NULL ;
215
274
}
@@ -228,6 +287,7 @@ scratch_footprint( fd_topo_tile_t const * tile ) {
228
287
l = FD_LAYOUT_APPEND ( l , alignof(fd_snapin_tile_t ), sizeof (fd_snapin_tile_t ) );
229
288
l = FD_LAYOUT_APPEND ( l , fd_snapshot_parser_align (), fd_snapshot_parser_footprint () );
230
289
l = FD_LAYOUT_APPEND ( l , fd_scratch_smem_align (), fd_scratch_smem_footprint ( FD_SNAPIN_SCRATCH_MAX ) );
290
+ l = FD_LAYOUT_APPEND ( l , fd_scratch_fmem_align (), fd_scratch_fmem_footprint ( FD_SNAPIN_SCRATCH_DEPTH ) );
231
291
return FD_LAYOUT_FINI ( l , alignof(fd_snapin_tile_t ) );
232
292
}
233
293
@@ -245,7 +305,7 @@ unprivileged_init( fd_topo_t * topo,
245
305
246
306
fd_snapshot_parser_process_manifest_fn_t manifest_cb = NULL ;
247
307
if ( 0 == strcmp ( topo -> links [tile -> out_link_id [ MANIFEST_OUT_IDX ]].name , "snap_out" ) ) {
248
- manifest_cb = save_manifest ;
308
+ manifest_cb = handle_manifest ;
249
309
}
250
310
251
311
ctx -> shutdown = 0 ;
@@ -276,18 +336,23 @@ unprivileged_init( fd_topo_t * topo,
276
336
ctx -> metrics .incremental .accounts_processed = 0UL ;
277
337
ctx -> metrics .num_accounts_inserted = 0UL ;
278
338
279
- /* join replay manifest dcache */
280
- ctx -> replay_manifest_dcache = fd_topo_obj_laddr ( topo , tile -> snapin .manifest_dcache_obj_id );
281
- ctx -> replay_manifest_dcache_obj_id = tile -> snapin .manifest_dcache_obj_id ;
282
- ctx -> manifest_sz = 0UL ;
283
-
284
339
/* set up the manifest message producer */
285
340
fd_topo_link_t * writer_link = & topo -> links [ tile -> out_link_id [ MANIFEST_OUT_IDX ] ];
286
341
ctx -> manifest_out .wksp = topo -> workspaces [ topo -> objs [ writer_link -> dcache_obj_id ].wksp_id ].wksp ;
287
342
ctx -> manifest_out .chunk0 = fd_dcache_compact_chunk0 ( fd_wksp_containing ( writer_link -> dcache ), writer_link -> dcache );
288
343
ctx -> manifest_out .wmark = fd_dcache_compact_wmark ( ctx -> manifest_out .wksp , writer_link -> dcache , writer_link -> mtu );
289
344
ctx -> manifest_out .chunk = ctx -> manifest_out .chunk0 ;
290
345
346
+ /* join replay manifest dcache */
347
+ ctx -> replay_manifest_dcache .dcache = fd_topo_obj_laddr ( topo , tile -> snapin .manifest_dcache_obj_id );
348
+ ctx -> replay_manifest_dcache .wksp = fd_wksp_containing ( ctx -> replay_manifest_dcache .dcache );
349
+ ctx -> replay_manifest_dcache .obj_id = tile -> snapin .manifest_dcache_obj_id ;
350
+ ctx -> replay_manifest_dcache .chunk0 = fd_dcache_compact_chunk0 ( ctx -> replay_manifest_dcache .wksp , ctx -> replay_manifest_dcache .dcache );
351
+ ctx -> replay_manifest_dcache .chunk = ctx -> replay_manifest_dcache .chunk0 ;
352
+ ctx -> replay_manifest_dcache .wmark = fd_dcache_compact_wmark ( ctx -> replay_manifest_dcache .wksp ,
353
+ ctx -> replay_manifest_dcache .dcache ,
354
+ writer_link -> mtu );
355
+
291
356
/* set up in link */
292
357
fd_topo_link_t const * in_link = & topo -> links [ tile -> in_link_id [ SNAPSHOT_IN_LINK_IDX ] ];
293
358
fd_topo_wksp_t const * in_wksp = & topo -> workspaces [ topo -> objs [ in_link -> dcache_obj_id ].wksp_id ];
@@ -332,50 +397,6 @@ hard_reset_funk( fd_snapin_tile_t * ctx ) {
332
397
/* TODO: Assert that hard reset suceeded */
333
398
}
334
399
335
- static void
336
- send_manifest ( fd_snapin_tile_t * ctx ,
337
- fd_stem_context_t * stem ) {
338
- /* Assumes the manifest is already mem copied into the snap_out
339
- dcache and the replay_manifest_dcache from the save_manifest
340
- callback. */
341
- FD_TEST ( ctx -> manifest_sz );
342
-
343
- ulong sig = 0UL ;
344
- ulong external_sig = 0UL ;
345
- if ( ctx -> state == FD_SNAPIN_STATE_LOADING_FULL ) {
346
- sig = FD_FULL_SNAPSHOT_MANIFEST ;
347
- external_sig = FD_FULL_SNAPSHOT_MANIFEST_EXTERNAL ;
348
- } else if ( ctx -> state == FD_SNAPIN_STATE_LOADING_INCREMENTAL ) {
349
- sig = FD_INCREMENTAL_SNAPSHOT_MANIFEST ;
350
- external_sig = FD_INCREMENTAL_SNAPSHOT_MANIFEST_EXTERNAL ;
351
- }
352
-
353
- /* Send snapshot manifest message over snap_out link */
354
- fd_stem_publish ( stem ,
355
- 0UL ,
356
- sig ,
357
- ctx -> manifest_out .chunk ,
358
- sizeof (fd_snapshot_manifest_t ),
359
- 0UL ,
360
- 0UL ,
361
- 0UL );
362
- ctx -> manifest_out .chunk = fd_dcache_compact_next ( ctx -> manifest_out .chunk ,
363
- sizeof (fd_snapshot_manifest_t ),
364
- ctx -> manifest_out .chunk0 ,
365
- ctx -> manifest_out .wmark );
366
-
367
- /* send manifest over replay manifest dcache */
368
- ulong chunk = fd_dcache_compact_chunk0 ( fd_wksp_containing ( ctx -> replay_manifest_dcache ), ctx -> replay_manifest_dcache );
369
- fd_stem_publish ( stem ,
370
- 0UL ,
371
- external_sig ,
372
- chunk ,
373
- ctx -> manifest_sz ,
374
- 0UL ,
375
- ctx -> replay_manifest_dcache_obj_id ,
376
- 0UL );
377
- }
378
-
379
400
static void
380
401
handle_control_frag ( fd_snapin_tile_t * ctx ,
381
402
fd_stem_context_t * stem ,
@@ -389,10 +410,6 @@ handle_control_frag( fd_snapin_tile_t * ctx,
389
410
0 );
390
411
}
391
412
392
- /* Once the snapshot is fully loaded, we can send the manifest
393
- message over. */
394
- send_manifest ( ctx , stem );
395
-
396
413
/* Notify consumers of manifest out that the snapshot is fully
397
414
loaded. */
398
415
fd_stem_publish ( stem ,
@@ -442,9 +459,10 @@ handle_control_frag( fd_snapin_tile_t * ctx,
442
459
}
443
460
444
461
static void
445
- handle_data_frag ( fd_snapin_tile_t * ctx ,
446
- ulong chunk ,
447
- ulong sz ) {
462
+ handle_data_frag ( fd_snapin_tile_t * ctx ,
463
+ ulong chunk ,
464
+ ulong sz ,
465
+ fd_stem_context_t * stem ) {
448
466
FD_TEST ( ctx -> state == FD_SNAPIN_STATE_LOADING_FULL ||
449
467
ctx -> state == FD_SNAPIN_STATE_LOADING_INCREMENTAL );
450
468
FD_TEST ( chunk >=ctx -> in .chunk0 && chunk <=ctx -> in .wmark );
@@ -465,7 +483,8 @@ handle_data_frag( fd_snapin_tile_t * ctx,
465
483
}
466
484
cur = fd_snapshot_parser_process_chunk ( ctx -> parser ,
467
485
cur ,
468
- (ulong )( chunk_end - cur ) );
486
+ (ulong )( chunk_end - cur ),
487
+ stem );
469
488
if ( FD_UNLIKELY ( ctx -> parser -> flags ) ) {
470
489
if ( FD_UNLIKELY ( ctx -> parser -> flags & SNAP_FLAG_FAILED ) ) {
471
490
/* abort app if parser failed */
@@ -517,7 +536,7 @@ after_frag( fd_snapin_tile_t * ctx,
517
536
518
537
/* handle frag */
519
538
if ( FD_UNLIKELY ( sz == 0 ) ) handle_control_frag ( ctx , stem , sig );
520
- else handle_data_frag ( ctx , ctx -> in ._chunk , sz );
539
+ else handle_data_frag ( ctx , ctx -> in ._chunk , sz , stem );
521
540
}
522
541
523
542
#define STEM_BURST 1UL
0 commit comments