@@ -82,11 +82,14 @@ struct fd_snapin_tile {
82
82
/* A shared dcache object between snapin and replay that holds the
83
83
decoded solana manifest.
84
84
TODO: remove when replay can receive the snapshot manifest. */
85
- uchar * replay_manifest_dcache ;
86
- ulong replay_manifest_dcache_obj_id ;
87
-
88
- /* TODO: remove when replay can receive the snapshot manifest. */
89
- ulong manifest_sz ;
85
+ struct {
86
+ fd_wksp_t * wksp ;
87
+ uchar * dcache ;
88
+ ulong chunk0 ;
89
+ ulong wmark ;
90
+ ulong chunk ;
91
+ ulong obj_id ;
92
+ } replay_manifest_dcache ;
90
93
91
94
int shutdown ;
92
95
@@ -117,13 +120,57 @@ fd_snapin_accumulate_metrics( fd_snapin_tile_t * ctx ) {
117
120
}
118
121
}
119
122
123
+ static void
124
+ send_manifest ( fd_snapin_tile_t * ctx ,
125
+ fd_stem_context_t * stem ,
126
+ ulong manifest_sz ) {
127
+ ulong sig = 0UL ;
128
+ ulong external_sig = 0UL ;
129
+ if ( ctx -> state == FD_SNAPIN_STATE_LOADING_FULL ) {
130
+ sig = FD_FULL_SNAPSHOT_MANIFEST ;
131
+ external_sig = FD_FULL_SNAPSHOT_MANIFEST_EXTERNAL ;
132
+ } else if ( ctx -> state == FD_SNAPIN_STATE_LOADING_INCREMENTAL ) {
133
+ sig = FD_INCREMENTAL_SNAPSHOT_MANIFEST ;
134
+ external_sig = FD_INCREMENTAL_SNAPSHOT_MANIFEST_EXTERNAL ;
135
+ }
136
+
137
+ /* Send snapshot manifest message over snap_out link */
138
+ fd_stem_publish ( stem ,
139
+ 0UL ,
140
+ sig ,
141
+ ctx -> manifest_out .chunk ,
142
+ sizeof (fd_snapshot_manifest_t ),
143
+ 0UL ,
144
+ 0UL ,
145
+ 0UL );
146
+ ctx -> manifest_out .chunk = fd_dcache_compact_next ( ctx -> manifest_out .chunk ,
147
+ sizeof (fd_snapshot_manifest_t ),
148
+ ctx -> manifest_out .chunk0 ,
149
+ ctx -> manifest_out .wmark );
150
+
151
+ /* send manifest over replay manifest dcache */
152
+ fd_stem_publish ( stem ,
153
+ 0UL ,
154
+ external_sig ,
155
+ ctx -> replay_manifest_dcache .chunk ,
156
+ manifest_sz ,
157
+ 0UL ,
158
+ ctx -> replay_manifest_dcache .obj_id ,
159
+ 0UL );
160
+ ctx -> replay_manifest_dcache .chunk = fd_dcache_compact_next ( ctx -> replay_manifest_dcache .chunk ,
161
+ manifest_sz ,
162
+ ctx -> replay_manifest_dcache .chunk0 ,
163
+ ctx -> replay_manifest_dcache .wmark );
164
+ }
165
+
120
166
/* Snapshot parser callbacks ******************************************/
121
167
122
168
static void
123
- save_manifest ( fd_snapshot_parser_t * parser ,
124
- void * _ctx ,
125
- fd_solana_manifest_global_t * manifest ,
126
- ulong manifest_sz ) {
169
+ handle_manifest ( fd_snapshot_parser_t * parser ,
170
+ void * _ctx ,
171
+ fd_stem_context_t * stem ,
172
+ fd_solana_manifest_global_t * manifest ,
173
+ ulong manifest_sz ) {
127
174
(void )parser ;
128
175
fd_snapin_tile_t * ctx = fd_type_pun ( _ctx );
129
176
@@ -135,10 +182,13 @@ save_manifest( fd_snapshot_parser_t * parser,
135
182
FD_LOG_NOTICE (( "Snapshot manifest loaded for slot %lu" , snapshot_manifest_mem -> slot ));
136
183
137
184
/* Send decoded manifest to replay */
138
- fd_memcpy ( ctx -> replay_manifest_dcache ,
185
+ uchar * next_dcache_mem = fd_chunk_to_laddr ( ctx -> replay_manifest_dcache .wksp ,
186
+ ctx -> replay_manifest_dcache .chunk );
187
+ fd_memcpy ( next_dcache_mem ,
139
188
manifest ,
140
189
manifest_sz );
141
- ctx -> manifest_sz = manifest_sz ;
190
+
191
+ send_manifest ( ctx , stem , manifest_sz );
142
192
}
143
193
144
194
static int
@@ -167,7 +217,9 @@ snapshot_is_duplicate_account( fd_snapshot_parser_t * parser,
167
217
static void
168
218
snapshot_insert_account ( fd_snapshot_parser_t * parser ,
169
219
fd_solana_account_hdr_t const * hdr ,
170
- void * _ctx ) {
220
+ void * _ctx ,
221
+ fd_stem_context_t * stem ) {
222
+ (void )stem ;
171
223
fd_snapin_tile_t * ctx = fd_type_pun ( _ctx );
172
224
fd_pubkey_t const * account_key = fd_type_pun_const ( hdr -> meta .pubkey );
173
225
@@ -195,10 +247,13 @@ snapshot_insert_account( fd_snapshot_parser_t * parser,
195
247
}
196
248
197
249
static void
198
- snapshot_copy_acc_data ( fd_snapshot_parser_t * parser FD_PARAM_UNUSED ,
250
+ snapshot_copy_acc_data ( fd_snapshot_parser_t * parser ,
199
251
void * _ctx ,
252
+ fd_stem_context_t * stem ,
200
253
uchar const * buf ,
201
254
ulong data_sz ) {
255
+ (void )parser ;
256
+ (void )stem ;
202
257
fd_snapin_tile_t * ctx = fd_type_pun ( _ctx );
203
258
204
259
if ( ctx -> acc_data ) {
@@ -208,8 +263,11 @@ snapshot_copy_acc_data( fd_snapshot_parser_t * parser FD_PARAM_UNUSED,
208
263
}
209
264
210
265
static void
211
- snapshot_reset_acc_data ( fd_snapshot_parser_t * parser FD_PARAM_UNUSED ,
212
- void * _ctx ) {
266
+ snapshot_reset_acc_data ( fd_snapshot_parser_t * parser ,
267
+ void * _ctx ,
268
+ fd_stem_context_t * stem ) {
269
+ (void )parser ;
270
+ (void )stem ;
213
271
fd_snapin_tile_t * ctx = fd_type_pun ( _ctx );
214
272
ctx -> acc_data = NULL ;
215
273
}
@@ -245,7 +303,7 @@ unprivileged_init( fd_topo_t * topo,
245
303
246
304
fd_snapshot_parser_process_manifest_fn_t manifest_cb = NULL ;
247
305
if ( 0 == strcmp ( topo -> links [tile -> out_link_id [ MANIFEST_OUT_IDX ]].name , "snap_out" ) ) {
248
- manifest_cb = save_manifest ;
306
+ manifest_cb = handle_manifest ;
249
307
}
250
308
251
309
ctx -> shutdown = 0 ;
@@ -276,18 +334,23 @@ unprivileged_init( fd_topo_t * topo,
276
334
ctx -> metrics .incremental .accounts_processed = 0UL ;
277
335
ctx -> metrics .num_accounts_inserted = 0UL ;
278
336
279
- /* join replay manifest dcache */
280
- ctx -> replay_manifest_dcache = fd_topo_obj_laddr ( topo , tile -> snapin .manifest_dcache_obj_id );
281
- ctx -> replay_manifest_dcache_obj_id = tile -> snapin .manifest_dcache_obj_id ;
282
- ctx -> manifest_sz = 0UL ;
283
-
284
337
/* set up the manifest message producer */
285
338
fd_topo_link_t * writer_link = & topo -> links [ tile -> out_link_id [ MANIFEST_OUT_IDX ] ];
286
339
ctx -> manifest_out .wksp = topo -> workspaces [ topo -> objs [ writer_link -> dcache_obj_id ].wksp_id ].wksp ;
287
340
ctx -> manifest_out .chunk0 = fd_dcache_compact_chunk0 ( fd_wksp_containing ( writer_link -> dcache ), writer_link -> dcache );
288
341
ctx -> manifest_out .wmark = fd_dcache_compact_wmark ( ctx -> manifest_out .wksp , writer_link -> dcache , writer_link -> mtu );
289
342
ctx -> manifest_out .chunk = ctx -> manifest_out .chunk0 ;
290
343
344
+ /* join replay manifest dcache */
345
+ ctx -> replay_manifest_dcache .dcache = fd_topo_obj_laddr ( topo , tile -> snapin .manifest_dcache_obj_id );
346
+ ctx -> replay_manifest_dcache .wksp = fd_wksp_containing ( ctx -> replay_manifest_dcache .dcache );
347
+ ctx -> replay_manifest_dcache .obj_id = tile -> snapin .manifest_dcache_obj_id ;
348
+ ctx -> replay_manifest_dcache .chunk0 = fd_dcache_compact_chunk0 ( ctx -> replay_manifest_dcache .wksp , ctx -> replay_manifest_dcache .dcache );
349
+ ctx -> replay_manifest_dcache .chunk = ctx -> replay_manifest_dcache .chunk0 ;
350
+ ctx -> replay_manifest_dcache .wmark = fd_dcache_compact_wmark ( ctx -> replay_manifest_dcache .wksp ,
351
+ ctx -> replay_manifest_dcache .dcache ,
352
+ writer_link -> mtu );
353
+
291
354
/* set up in link */
292
355
fd_topo_link_t const * in_link = & topo -> links [ tile -> in_link_id [ SNAPSHOT_IN_LINK_IDX ] ];
293
356
fd_topo_wksp_t const * in_wksp = & topo -> workspaces [ topo -> objs [ in_link -> dcache_obj_id ].wksp_id ];
@@ -332,50 +395,6 @@ hard_reset_funk( fd_snapin_tile_t * ctx ) {
332
395
/* TODO: Assert that hard reset suceeded */
333
396
}
334
397
335
- static void
336
- send_manifest ( fd_snapin_tile_t * ctx ,
337
- fd_stem_context_t * stem ) {
338
- /* Assumes the manifest is already mem copied into the snap_out
339
- dcache and the replay_manifest_dcache from the save_manifest
340
- callback. */
341
- FD_TEST ( ctx -> manifest_sz );
342
-
343
- ulong sig = 0UL ;
344
- ulong external_sig = 0UL ;
345
- if ( ctx -> state == FD_SNAPIN_STATE_LOADING_FULL ) {
346
- sig = FD_FULL_SNAPSHOT_MANIFEST ;
347
- external_sig = FD_FULL_SNAPSHOT_MANIFEST_EXTERNAL ;
348
- } else if ( ctx -> state == FD_SNAPIN_STATE_LOADING_INCREMENTAL ) {
349
- sig = FD_INCREMENTAL_SNAPSHOT_MANIFEST ;
350
- external_sig = FD_INCREMENTAL_SNAPSHOT_MANIFEST_EXTERNAL ;
351
- }
352
-
353
- /* Send snapshot manifest message over snap_out link */
354
- fd_stem_publish ( stem ,
355
- 0UL ,
356
- sig ,
357
- ctx -> manifest_out .chunk ,
358
- sizeof (fd_snapshot_manifest_t ),
359
- 0UL ,
360
- 0UL ,
361
- 0UL );
362
- ctx -> manifest_out .chunk = fd_dcache_compact_next ( ctx -> manifest_out .chunk ,
363
- sizeof (fd_snapshot_manifest_t ),
364
- ctx -> manifest_out .chunk0 ,
365
- ctx -> manifest_out .wmark );
366
-
367
- /* send manifest over replay manifest dcache */
368
- ulong chunk = fd_dcache_compact_chunk0 ( fd_wksp_containing ( ctx -> replay_manifest_dcache ), ctx -> replay_manifest_dcache );
369
- fd_stem_publish ( stem ,
370
- 0UL ,
371
- external_sig ,
372
- chunk ,
373
- ctx -> manifest_sz ,
374
- 0UL ,
375
- ctx -> replay_manifest_dcache_obj_id ,
376
- 0UL );
377
- }
378
-
379
398
static void
380
399
handle_control_frag ( fd_snapin_tile_t * ctx ,
381
400
fd_stem_context_t * stem ,
@@ -389,10 +408,6 @@ handle_control_frag( fd_snapin_tile_t * ctx,
389
408
0 );
390
409
}
391
410
392
- /* Once the snapshot is fully loaded, we can send the manifest
393
- message over. */
394
- send_manifest ( ctx , stem );
395
-
396
411
/* Notify consumers of manifest out that the snapshot is fully
397
412
loaded. */
398
413
fd_stem_publish ( stem ,
@@ -442,9 +457,10 @@ handle_control_frag( fd_snapin_tile_t * ctx,
442
457
}
443
458
444
459
static void
445
- handle_data_frag ( fd_snapin_tile_t * ctx ,
446
- ulong chunk ,
447
- ulong sz ) {
460
+ handle_data_frag ( fd_snapin_tile_t * ctx ,
461
+ ulong chunk ,
462
+ ulong sz ,
463
+ fd_stem_context_t * stem ) {
448
464
FD_TEST ( ctx -> state == FD_SNAPIN_STATE_LOADING_FULL ||
449
465
ctx -> state == FD_SNAPIN_STATE_LOADING_INCREMENTAL );
450
466
FD_TEST ( chunk >=ctx -> in .chunk0 && chunk <=ctx -> in .wmark );
@@ -465,7 +481,8 @@ handle_data_frag( fd_snapin_tile_t * ctx,
465
481
}
466
482
cur = fd_snapshot_parser_process_chunk ( ctx -> parser ,
467
483
cur ,
468
- (ulong )( chunk_end - cur ) );
484
+ (ulong )( chunk_end - cur ),
485
+ stem );
469
486
if ( FD_UNLIKELY ( ctx -> parser -> flags ) ) {
470
487
if ( FD_UNLIKELY ( ctx -> parser -> flags & SNAP_FLAG_FAILED ) ) {
471
488
/* abort app if parser failed */
@@ -517,7 +534,7 @@ after_frag( fd_snapin_tile_t * ctx,
517
534
518
535
/* handle frag */
519
536
if ( FD_UNLIKELY ( sz == 0 ) ) handle_control_frag ( ctx , stem , sig );
520
- else handle_data_frag ( ctx , ctx -> in ._chunk , sz );
537
+ else handle_data_frag ( ctx , ctx -> in ._chunk , sz , stem );
521
538
}
522
539
523
540
#define STEM_BURST 1UL
0 commit comments