Skip to content

Commit

Permalink
refactor(tower): rework tower in-mem repr and cleanup API
Browse files Browse the repository at this point in the history
  • Loading branch information
lidatong committed Jan 9, 2025
1 parent cd81bbe commit 5c0932c
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 262 deletions.
67 changes: 34 additions & 33 deletions src/app/fdctl/run/tiles/fd_replay.c
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,23 @@ struct fd_replay_tile_ctx {
ulong bank_cnt;
fd_replay_out_ctx_t bank_out[ FD_PACK_MAX_BANK_TILES ];

ulong * wmk; /* publish watermark */
ulong root; /* the root slot is the most recent slot to have reached
max lockout in the tower */

ulong * wmk; /* publish watermark. The watermark is defined as the
minimum of the tower root (root above) and blockstore
smr (blockstore->smr). The watermark is used to
publish our fork-aware structures eg. blockstore,
forks, ghost. In general, publishing has the effect of
pruning minority forks in those structures,
indicating that is ok to release the memory being
occupied by said forks.
The reason it has to be the minimum of the two, is the
tower root can lag the SMR and vice versa, but both
the fork-aware structures need to maintain information
through both of those slots. */

ulong * poh; /* proof-of-history slot */
uint poh_init_done;
int snapshot_init_done;
Expand Down Expand Up @@ -448,7 +464,7 @@ during_frag( fd_replay_tile_ctx_t * ctx,
Microblock as a list of fd_txn_p_t (sz * sizeof(fd_txn_p_t)) */

ctx->curr_slot = fd_disco_replay_sig_slot( sig );
if( FD_UNLIKELY( ctx->curr_slot < ctx->tower->root ) ) {
if( FD_UNLIKELY( ctx->curr_slot < fd_fseq_query( ctx->wmk ) ) ) {
FD_LOG_WARNING(( "store sent slot %lu before our root.", ctx->curr_slot ));
}
ctx->flags = fd_disco_replay_sig_flags( sig );
Expand All @@ -474,8 +490,8 @@ during_frag( fd_replay_tile_ctx_t * ctx,
Microblock bank trailer
*/
ctx->curr_slot = fd_disco_poh_sig_slot( sig );
if( FD_UNLIKELY( ctx->curr_slot < ctx->tower->root ) ) {
FD_LOG_WARNING(( "pack sent slot %lu before our root %lu.", ctx->curr_slot, ctx->tower->root ));
if( FD_UNLIKELY( ctx->curr_slot < fd_fseq_query( ctx->wmk ) ) ) {
FD_LOG_WARNING(( "pack sent slot %lu before our watermark %lu.", ctx->curr_slot, fd_fseq_query( ctx->wmk ) ));
}
if( fd_disco_poh_sig_pkt_type( sig )==POH_PKT_TYPE_MICROBLOCK ) {
ulong bank_idx = fd_disco_poh_sig_bank_tile( sig );
Expand Down Expand Up @@ -930,7 +946,7 @@ static void
send_tower_sync( fd_replay_tile_ctx_t * ctx ) {
if( FD_UNLIKELY( !ctx->vote ) ) return;
FD_LOG_NOTICE( ( "sending tower sync" ) );
ulong vote_slot = fd_tower_votes_peek_tail_const( ctx->tower->votes )->slot;
ulong vote_slot = fd_tower_votes_peek_tail_const( ctx->tower )->slot;
fd_blockstore_start_read( ctx->blockstore );
fd_hash_t const * vote_bank_hash = fd_blockstore_bank_hash_query( ctx->blockstore, vote_slot );
fd_hash_t const * vote_block_hash = fd_blockstore_block_hash_query( ctx->blockstore, vote_slot );
Expand All @@ -953,7 +969,7 @@ send_tower_sync( fd_replay_tile_ctx_t * ctx ) {

FD_SCRATCH_SCOPE_BEGIN {
fd_txn_p_t * txn = (fd_txn_p_t *)fd_chunk_to_laddr( ctx->sender_out_mem, ctx->sender_out_chunk );
fd_tower_to_vote_txn( ctx->tower, vote_bank_hash, vote_block_hash, ctx->validator_identity, ctx->vote_authority, ctx->vote_acc, txn );
fd_tower_to_vote_txn( ctx->tower, ctx->root, vote_bank_hash, vote_block_hash, ctx->validator_identity, ctx->vote_authority, ctx->vote_acc, txn );
} FD_SCRATCH_SCOPE_END;

/* TODO: Can use a smaller size, adjusted for payload length */
Expand All @@ -974,7 +990,7 @@ send_tower_sync( fd_replay_tile_ctx_t * ctx ) {
ctx->sender_out_wmark );

/* Dump the latest sent tower into the tower checkpoint file */
if( FD_LIKELY( ctx->tower_checkpt_fileno > 0 ) ) fd_restart_tower_checkpt( vote_bank_hash, ctx->tower, ctx->tower_checkpt_fileno );
if( FD_LIKELY( ctx->tower_checkpt_fileno > 0 ) ) fd_restart_tower_checkpt( vote_bank_hash, ctx->tower, ctx->root, ctx->tower_checkpt_fileno );
}

static fd_fork_t *
Expand Down Expand Up @@ -1148,13 +1164,13 @@ after_frag( fd_replay_tile_ctx_t * ctx,
ulong parent_slot = ctx->parent_slot;
ulong flags = ctx->flags;
ulong bank_idx = ctx->bank_idx;
if( FD_UNLIKELY( curr_slot < ctx->tower->root ) ) {
FD_LOG_WARNING(( "ignoring replay of slot %lu (parent: %lu). earlier than our root %lu.", curr_slot, parent_slot, ctx->tower->root ));
if( FD_UNLIKELY( curr_slot < fd_fseq_query( ctx->wmk ) ) ) {
FD_LOG_WARNING(( "ignoring replay of slot %lu (parent: %lu). earlier than our watermark %lu.", curr_slot, parent_slot, fd_fseq_query( ctx->wmk ) ));
return;
}

if( FD_UNLIKELY( parent_slot < ctx->tower->root ) ) {
FD_LOG_WARNING(( "ignoring replay of slot %lu (parent: %lu). parent slot is earlier than our root %lu.", curr_slot, parent_slot, ctx->tower->root ));
if( FD_UNLIKELY( parent_slot < fd_fseq_query( ctx->wmk ) ) ) {
FD_LOG_WARNING(( "ignoring replay of slot %lu (parent: %lu). parent slot is earlier than our watermark %lu.", curr_slot, parent_slot, fd_fseq_query( ctx->wmk ) ) );
return;
}

Expand Down Expand Up @@ -1377,7 +1393,7 @@ after_frag( fd_replay_tile_ctx_t * ctx,

fd_forks_print( ctx->forks );
fd_ghost_print( ctx->ghost, ctx->epoch, fd_ghost_root( ctx-> ghost ) );
fd_tower_print( ctx->tower );
fd_tower_print( ctx->tower, ctx->root );

ulong vote_slot = fd_tower_vote_slot( ctx->tower, ctx->epoch, ctx->funk, child->slot_ctx.funk_txn, ctx->ghost );

Expand Down Expand Up @@ -1409,27 +1425,11 @@ after_frag( fd_replay_tile_ctx_t * ctx,

/* Vote locally */

fd_tower_vote( ctx->tower, vote_slot );
ulong root = fd_tower_vote( ctx->tower, vote_slot );

/* Check if we've reached max lockout. */
/* Update to a new root, if there is one. */

if( FD_UNLIKELY( fd_tower_is_max_lockout( ctx->tower ) ) ) {

/* Publish tower and get the new root. */

ulong root = fd_tower_publish( ctx->tower );
FD_LOG_NOTICE(( "new tower root: %lu", root ));

/* Note that our local tower root is not used to publish our
fork-aware structures eg. blockstore, forks, ghost.
Instead the SMR is used. The main reason to avoid using
tower root is while starting up, the tower will be loaded
from the vote account state (the "cluster tower") which
might have an earlier root slot than the snapshot slot.
The other structures are initialized to the snapshot
slot. */
}
if ( FD_LIKELY ( root != FD_SLOT_NULL ) ) ctx->root = root; /* optimize for full tower (replay is keeping up) */
}

/* Send our updated tower to the cluster. */
Expand Down Expand Up @@ -1685,7 +1685,8 @@ init_after_snapshot( fd_replay_tile_ctx_t * ctx ) {
fd_memcpy( key.c, ctx->vote_acc, sizeof(fd_pubkey_t) );
key.c[FD_FUNK_REC_KEY_FOOTPRINT - 1] = FD_FUNK_KEY_TYPE_ACC;
fd_tower_from_vote_acc( ctx->tower, ctx->funk, snapshot_fork->slot_ctx.funk_txn, &key );
fd_tower_print( ctx->tower );
FD_LOG_NOTICE(( "vote account: %s", FD_BASE58_ENC_32_ALLOCA( key.c ) ));
fd_tower_print( ctx->tower, ctx->root );

fd_bank_hash_cmp_t * bank_hash_cmp = ctx->epoch_ctx->bank_hash_cmp;
bank_hash_cmp->total_stake = ctx->epoch->total_stake;
Expand Down Expand Up @@ -1874,7 +1875,7 @@ during_housekeeping( void * _ctx ) {
root and blockstore smr. */

fd_blockstore_start_read( ctx->blockstore );
ulong wmk = fd_ulong_min( ctx->tower->root , ctx->blockstore->smr );
ulong wmk = fd_ulong_min( ctx->root, ctx->blockstore->smr );
fd_blockstore_end_read( ctx->blockstore );

if ( FD_LIKELY( wmk <= fd_fseq_query( ctx->wmk ) ) ) return;
Expand Down
Loading

0 comments on commit 5c0932c

Please sign in to comment.