@@ -60,6 +60,11 @@ struct fd_snapin_tile {
60
60
/* State machine */
61
61
int state ;
62
62
63
+ /* Stem pointer
64
+ TODO: a hack to store the stem pointer so that it can be used
65
+ in snapshot parser callbacks. */
66
+ fd_stem_context_t * stem ;
67
+
63
68
/* Account insertion */
64
69
fd_funk_t funk [1 ];
65
70
fd_funk_txn_t * funk_txn ;
@@ -82,11 +87,14 @@ struct fd_snapin_tile {
82
87
/* A shared dcache object between snapin and replay that holds the
83
88
decoded solana manifest.
84
89
TODO: remove when replay can receive the snapshot manifest. */
85
- uchar * replay_manifest_dcache ;
86
- ulong replay_manifest_dcache_obj_id ;
87
-
88
- /* TODO: remove when replay can receive the snapshot manifest. */
89
- ulong manifest_sz ;
90
+ struct {
91
+ fd_wksp_t * wksp ;
92
+ uchar * dcache ;
93
+ ulong chunk0 ;
94
+ ulong wmark ;
95
+ ulong chunk ;
96
+ ulong obj_id ;
97
+ } replay_manifest_dcache ;
90
98
91
99
int shutdown ;
92
100
@@ -117,13 +125,56 @@ fd_snapin_accumulate_metrics( fd_snapin_tile_t * ctx ) {
117
125
}
118
126
}
119
127
128
+ static void
129
+ send_manifest ( fd_snapin_tile_t * ctx ,
130
+ ulong manifest_sz ) {
131
+ ulong sig = 0UL ;
132
+ ulong external_sig = 0UL ;
133
+ if ( ctx -> state == FD_SNAPIN_STATE_LOADING_FULL ) {
134
+ sig = FD_FULL_SNAPSHOT_MANIFEST ;
135
+ external_sig = FD_FULL_SNAPSHOT_MANIFEST_EXTERNAL ;
136
+ } else if ( ctx -> state == FD_SNAPIN_STATE_LOADING_INCREMENTAL ) {
137
+ sig = FD_INCREMENTAL_SNAPSHOT_MANIFEST ;
138
+ external_sig = FD_INCREMENTAL_SNAPSHOT_MANIFEST_EXTERNAL ;
139
+ }
140
+
141
+ /* Send snapshot manifest message over snap_out link */
142
+ fd_stem_publish ( ctx -> stem ,
143
+ 0UL ,
144
+ sig ,
145
+ ctx -> manifest_out .chunk ,
146
+ sizeof (fd_snapshot_manifest_t ),
147
+ 0UL ,
148
+ 0UL ,
149
+ 0UL );
150
+ ctx -> manifest_out .chunk = fd_dcache_compact_next ( ctx -> manifest_out .chunk ,
151
+ sizeof (fd_snapshot_manifest_t ),
152
+ ctx -> manifest_out .chunk0 ,
153
+ ctx -> manifest_out .wmark );
154
+
155
+ /* send manifest over replay manifest dcache */
156
+ fd_stem_publish ( ctx -> stem ,
157
+ 0UL ,
158
+ external_sig ,
159
+ ctx -> replay_manifest_dcache .chunk ,
160
+ manifest_sz ,
161
+ 0UL ,
162
+ ctx -> replay_manifest_dcache .obj_id ,
163
+ 0UL );
164
+ ctx -> replay_manifest_dcache .chunk = fd_dcache_compact_next ( ctx -> replay_manifest_dcache .chunk ,
165
+ manifest_sz ,
166
+ ctx -> replay_manifest_dcache .chunk0 ,
167
+ ctx -> replay_manifest_dcache .wmark );
168
+ FD_TEST ( ctx -> replay_manifest_dcache .chunk <= ctx -> replay_manifest_dcache .wmark );
169
+ }
170
+
120
171
/* Snapshot parser callbacks ******************************************/
121
172
122
173
static void
123
- save_manifest ( fd_snapshot_parser_t * parser ,
124
- void * _ctx ,
125
- fd_solana_manifest_global_t * manifest ,
126
- ulong manifest_sz ) {
174
+ handle_manifest ( fd_snapshot_parser_t * parser ,
175
+ void * _ctx ,
176
+ fd_solana_manifest_global_t * manifest ,
177
+ ulong manifest_sz ) {
127
178
(void )parser ;
128
179
fd_snapin_tile_t * ctx = fd_type_pun ( _ctx );
129
180
@@ -135,10 +186,13 @@ save_manifest( fd_snapshot_parser_t * parser,
135
186
FD_LOG_NOTICE (( "Snapshot manifest loaded for slot %lu" , snapshot_manifest_mem -> slot ));
136
187
137
188
/* Send decoded manifest to replay */
138
- fd_memcpy ( ctx -> replay_manifest_dcache ,
189
+ uchar * next_dcache_mem = fd_chunk_to_laddr ( ctx -> replay_manifest_dcache .wksp ,
190
+ ctx -> replay_manifest_dcache .chunk );
191
+ fd_memcpy ( next_dcache_mem ,
139
192
manifest ,
140
193
manifest_sz );
141
- ctx -> manifest_sz = manifest_sz ;
194
+
195
+ send_manifest ( ctx , manifest_sz );
142
196
}
143
197
144
198
static int
@@ -195,10 +249,11 @@ snapshot_insert_account( fd_snapshot_parser_t * parser,
195
249
}
196
250
197
251
static void
198
- snapshot_copy_acc_data ( fd_snapshot_parser_t * parser FD_PARAM_UNUSED ,
252
+ snapshot_copy_acc_data ( fd_snapshot_parser_t * parser ,
199
253
void * _ctx ,
200
254
uchar const * buf ,
201
255
ulong data_sz ) {
256
+ (void )parser ;
202
257
fd_snapin_tile_t * ctx = fd_type_pun ( _ctx );
203
258
204
259
if ( ctx -> acc_data ) {
@@ -208,8 +263,9 @@ snapshot_copy_acc_data( fd_snapshot_parser_t * parser FD_PARAM_UNUSED,
208
263
}
209
264
210
265
static void
211
- snapshot_reset_acc_data ( fd_snapshot_parser_t * parser FD_PARAM_UNUSED ,
266
+ snapshot_reset_acc_data ( fd_snapshot_parser_t * parser ,
212
267
void * _ctx ) {
268
+ (void )parser ;
213
269
fd_snapin_tile_t * ctx = fd_type_pun ( _ctx );
214
270
ctx -> acc_data = NULL ;
215
271
}
@@ -246,7 +302,7 @@ unprivileged_init( fd_topo_t * topo,
246
302
247
303
fd_snapshot_parser_process_manifest_fn_t manifest_cb = NULL ;
248
304
if ( 0 == strcmp ( topo -> links [tile -> out_link_id [ MANIFEST_OUT_IDX ]].name , "snap_out" ) ) {
249
- manifest_cb = save_manifest ;
305
+ manifest_cb = handle_manifest ;
250
306
}
251
307
252
308
ctx -> shutdown = 0 ;
@@ -277,18 +333,23 @@ unprivileged_init( fd_topo_t * topo,
277
333
ctx -> metrics .incremental .accounts_processed = 0UL ;
278
334
ctx -> metrics .num_accounts_inserted = 0UL ;
279
335
280
- /* join replay manifest dcache */
281
- ctx -> replay_manifest_dcache = fd_topo_obj_laddr ( topo , tile -> snapin .manifest_dcache_obj_id );
282
- ctx -> replay_manifest_dcache_obj_id = tile -> snapin .manifest_dcache_obj_id ;
283
- ctx -> manifest_sz = 0UL ;
284
-
285
336
/* set up the manifest message producer */
286
337
fd_topo_link_t * writer_link = & topo -> links [ tile -> out_link_id [ MANIFEST_OUT_IDX ] ];
287
338
ctx -> manifest_out .wksp = topo -> workspaces [ topo -> objs [ writer_link -> dcache_obj_id ].wksp_id ].wksp ;
288
339
ctx -> manifest_out .chunk0 = fd_dcache_compact_chunk0 ( fd_wksp_containing ( writer_link -> dcache ), writer_link -> dcache );
289
340
ctx -> manifest_out .wmark = fd_dcache_compact_wmark ( ctx -> manifest_out .wksp , writer_link -> dcache , writer_link -> mtu );
290
341
ctx -> manifest_out .chunk = ctx -> manifest_out .chunk0 ;
291
342
343
+ /* join replay manifest dcache */
344
+ ctx -> replay_manifest_dcache .dcache = fd_topo_obj_laddr ( topo , tile -> snapin .manifest_dcache_obj_id );
345
+ ctx -> replay_manifest_dcache .wksp = fd_wksp_containing ( ctx -> replay_manifest_dcache .dcache );
346
+ ctx -> replay_manifest_dcache .obj_id = tile -> snapin .manifest_dcache_obj_id ;
347
+ ctx -> replay_manifest_dcache .chunk0 = fd_dcache_compact_chunk0 ( ctx -> replay_manifest_dcache .wksp , ctx -> replay_manifest_dcache .dcache );
348
+ ctx -> replay_manifest_dcache .chunk = ctx -> replay_manifest_dcache .chunk0 ;
349
+ ctx -> replay_manifest_dcache .wmark = fd_dcache_compact_wmark ( ctx -> replay_manifest_dcache .wksp ,
350
+ ctx -> replay_manifest_dcache .dcache ,
351
+ writer_link -> mtu );
352
+
292
353
/* set up in link */
293
354
fd_topo_link_t const * in_link = & topo -> links [ tile -> in_link_id [ SNAPSHOT_IN_LINK_IDX ] ];
294
355
fd_topo_wksp_t const * in_wksp = & topo -> workspaces [ topo -> objs [ in_link -> dcache_obj_id ].wksp_id ];
@@ -333,50 +394,6 @@ hard_reset_funk( fd_snapin_tile_t * ctx ) {
333
394
/* TODO: Assert that hard reset suceeded */
334
395
}
335
396
336
- static void
337
- send_manifest ( fd_snapin_tile_t * ctx ,
338
- fd_stem_context_t * stem ) {
339
- /* Assumes the manifest is already mem copied into the snap_out
340
- dcache and the replay_manifest_dcache from the save_manifest
341
- callback. */
342
- FD_TEST ( ctx -> manifest_sz );
343
-
344
- ulong sig = 0UL ;
345
- ulong external_sig = 0UL ;
346
- if ( ctx -> state == FD_SNAPIN_STATE_LOADING_FULL ) {
347
- sig = FD_FULL_SNAPSHOT_MANIFEST ;
348
- external_sig = FD_FULL_SNAPSHOT_MANIFEST_EXTERNAL ;
349
- } else if ( ctx -> state == FD_SNAPIN_STATE_LOADING_INCREMENTAL ) {
350
- sig = FD_INCREMENTAL_SNAPSHOT_MANIFEST ;
351
- external_sig = FD_INCREMENTAL_SNAPSHOT_MANIFEST_EXTERNAL ;
352
- }
353
-
354
- /* Send snapshot manifest message over snap_out link */
355
- fd_stem_publish ( stem ,
356
- 0UL ,
357
- sig ,
358
- ctx -> manifest_out .chunk ,
359
- sizeof (fd_snapshot_manifest_t ),
360
- 0UL ,
361
- 0UL ,
362
- 0UL );
363
- ctx -> manifest_out .chunk = fd_dcache_compact_next ( ctx -> manifest_out .chunk ,
364
- sizeof (fd_snapshot_manifest_t ),
365
- ctx -> manifest_out .chunk0 ,
366
- ctx -> manifest_out .wmark );
367
-
368
- /* send manifest over replay manifest dcache */
369
- ulong chunk = fd_dcache_compact_chunk0 ( fd_wksp_containing ( ctx -> replay_manifest_dcache ), ctx -> replay_manifest_dcache );
370
- fd_stem_publish ( stem ,
371
- 0UL ,
372
- external_sig ,
373
- chunk ,
374
- ctx -> manifest_sz ,
375
- 0UL ,
376
- ctx -> replay_manifest_dcache_obj_id ,
377
- 0UL );
378
- }
379
-
380
397
static void
381
398
handle_control_frag ( fd_snapin_tile_t * ctx ,
382
399
fd_stem_context_t * stem ,
@@ -390,10 +407,6 @@ handle_control_frag( fd_snapin_tile_t * ctx,
390
407
0 );
391
408
}
392
409
393
- /* Once the snapshot is fully loaded, we can send the manifest
394
- message over. */
395
- send_manifest ( ctx , stem );
396
-
397
410
/* Notify consumers of manifest out that the snapshot is fully
398
411
loaded. */
399
412
fd_stem_publish ( stem ,
@@ -443,13 +456,16 @@ handle_control_frag( fd_snapin_tile_t * ctx,
443
456
}
444
457
445
458
static void
446
- handle_data_frag ( fd_snapin_tile_t * ctx ,
447
- ulong chunk ,
448
- ulong sz ) {
459
+ handle_data_frag ( fd_snapin_tile_t * ctx ,
460
+ ulong chunk ,
461
+ ulong sz ,
462
+ fd_stem_context_t * stem ) {
449
463
FD_TEST ( ctx -> state == FD_SNAPIN_STATE_LOADING_FULL ||
450
464
ctx -> state == FD_SNAPIN_STATE_LOADING_INCREMENTAL );
451
465
FD_TEST ( chunk >=ctx -> in .chunk0 && chunk <=ctx -> in .wmark );
452
466
467
+ ctx -> stem = stem ;
468
+
453
469
if ( FD_UNLIKELY ( ctx -> parser -> flags & SNAP_FLAG_BLOCKED ||
454
470
ctx -> parser -> flags & SNAP_FLAG_DONE ) ) {
455
471
/* Don't consume the frag if blocked or done */
@@ -518,7 +534,7 @@ after_frag( fd_snapin_tile_t * ctx,
518
534
519
535
/* handle frag */
520
536
if ( FD_UNLIKELY ( sz == 0 ) ) handle_control_frag ( ctx , stem , sig );
521
- else handle_data_frag ( ctx , ctx -> in ._chunk , sz );
537
+ else handle_data_frag ( ctx , ctx -> in ._chunk , sz , stem );
522
538
}
523
539
524
540
#define STEM_BURST 1UL
0 commit comments