Skip to content

Commit

Permalink
tpu: restructure how txns are passed along
Browse files Browse the repository at this point in the history
  • Loading branch information
mmcgee-jump committed Jan 6, 2025
1 parent 8fc7d4f commit 09d4fd6
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 196 deletions.
127 changes: 39 additions & 88 deletions src/app/fdctl/run/tiles/fd_dedup.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
checks the transaction signature field for duplicates and filters
them out. */

#define GOSSIP_IN_IDX (0UL) /* Frankendancer and Firedancer */
#define VOTER_IN_IDX (1UL) /* Firedancer only */
#define IN_KIND_GOSSIP (0UL)
#define IN_KIND_VOTER (1UL)
#define IN_KIND_VERIFY (2UL)

/* fd_dedup_in_ctx_t is a context object for each in (producer) mcache
connected to the dedup tile. */
Expand All @@ -38,9 +39,7 @@ typedef struct {
ulong * tcache_ring;
ulong * tcache_map;

/* The first unparsed_in_cnt in links do not have the parsed fd_txn_t
in the payload trailer. */
ulong unparsed_in_cnt;
ulong in_kind[ 64UL ];
fd_dedup_in_ctx_t in[ 64UL ];

fd_wksp_t * out_mem;
Expand Down Expand Up @@ -102,13 +101,21 @@ during_frag( fd_dedup_ctx_t * ctx,
(void)seq;
(void)sig;

if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz > FD_TPU_DCACHE_MTU ) )
if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz>FD_TPU_PARSED_MTU ) )
FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));

uchar * src = (uchar *)fd_chunk_to_laddr( ctx->in[in_idx].mem, chunk );
uchar * src = (uchar *)fd_chunk_to_laddr( ctx->in[ in_idx ].mem, chunk );
uchar * dst = (uchar *)fd_chunk_to_laddr( ctx->out_mem, ctx->out_chunk );

fd_memcpy( dst, src, sz );
if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_GOSSIP || ctx->in_kind[ in_idx ]==IN_KIND_VOTER ) ) {
if( FD_UNLIKELY( sz>FD_TPU_MTU ) ) FD_LOG_ERR(( "received a gossip or voter transaction that was too large" ));

fd_txn_m_t * txnm = (fd_txn_m_t *)dst;
txnm->payload_sz = (ushort)sz;
fd_memcpy( fd_txn_m_payload( txnm ), src, sz );
} else {
fd_memcpy( dst, src, sz );
}
}

/* After the transaction has been fully received, and we know we were
Expand All @@ -128,72 +135,35 @@ after_frag( fd_dedup_ctx_t * ctx,
fd_stem_context_t * stem ) {
(void)seq;
(void)sig;
(void)sz;

/* Transactions coming from verify tile, already parsed.
We need to reconstruct fd_txn_t * txn, because we need the
tx signature to compute the dedup tag.
To find the position of fd_txn_t * txn, we need the (udp)
payload_sz that's stored as ushort at the end of the
dcache_entry. */
uchar * dcache_entry = fd_chunk_to_laddr( ctx->out_mem, ctx->out_chunk );
ushort * payload_sz_p = (ushort *)(dcache_entry + sz - sizeof(ushort));
ulong payload_sz = *payload_sz_p;
ulong txn_off = fd_ulong_align_up( payload_sz, 2UL );
fd_txn_t * txn = (fd_txn_t *)(dcache_entry + txn_off);

if ( FD_UNLIKELY( in_idx < ctx->unparsed_in_cnt ) ) {
fd_txn_m_t * txnm = (fd_txn_m_t *)fd_chunk_to_laddr( ctx->out_mem, ctx->out_chunk );
fd_txn_t * txn = fd_txn_m_txn_t( txnm );

if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_GOSSIP || ctx->in_kind[ in_idx]==IN_KIND_VOTER ) ) {
/* Transactions coming in from these links are not parsed.
We'll need to parse it so it's ready for downstream consumers.
Equally importantly, we need to parse to extract the signature
for dedup. Just parse it right into the output dcache. */
txnm->txn_t_sz = (ushort)fd_txn_parse( fd_txn_m_payload( txnm ), txnm->payload_sz, txn, NULL );
if( FD_UNLIKELY( !txnm->txn_t_sz ) ) FD_LOG_ERR(( "fd_txn_parse failed for vote transactions that should have been sigverified" ));

/* Here, *opt_sz is the size of udp payload, as the tx has not
been parsed yet. Code here is similar to the verify tile. */
ulong payload_sz = sz;
ulong txn_off = fd_ulong_align_up( payload_sz, 2UL );

/* Ensure sufficient trailing space for parsing. */
if( FD_UNLIKELY( txn_off>FD_TPU_DCACHE_MTU - FD_TXN_MAX_SZ - sizeof(ushort)) ) {
FD_LOG_ERR(( "got malformed txn (sz %lu) insufficient space left in dcache for fd_txn_t", payload_sz ));
}

txn = (fd_txn_t *)(dcache_entry + txn_off);
ulong txn_t_sz = fd_txn_parse( dcache_entry, payload_sz, txn, NULL );
if( FD_UNLIKELY( !txn_t_sz ) ) FD_LOG_ERR(( "fd_txn_parse failed for vote transactions that should have been sigverified" ));

/* Increment on GOSSIP_IN_IDX but not VOTER_IN_IDX */
FD_MCNT_INC( DEDUP, GOSSIPED_VOTES_RECEIVED, 1UL - in_idx );

/* Write payload_sz into trailer.
fd_txn_parse always returns a multiple of 2 so this sz is
correctly aligned. */
payload_sz_p = (ushort *)((ulong)txn + txn_t_sz);
*payload_sz_p = (ushort)payload_sz;

/* End of parsed message. */

/* Paranoia post parsing. */
ulong new_sz = ( (ulong)payload_sz_p + sizeof(ushort) ) - (ulong)dcache_entry;
if( FD_UNLIKELY( new_sz>FD_TPU_DCACHE_MTU ) ) {
FD_LOG_CRIT(( "memory corruption detected (txn_sz=%lu txn_t_sz=%lu)",
payload_sz, txn_t_sz ));
}

/* Write new size for mcache. */
sz = new_sz;
if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_GOSSIP ) ) FD_MCNT_INC( DEDUP, GOSSIPED_VOTES_RECEIVED, 1UL );
}

/* Compute fd_hash(signature) for dedup. */
ulong ha_dedup_tag = fd_hash( ctx->hashmap_seed, dcache_entry + txn->signature_off, 64UL );
ulong ha_dedup_tag = fd_hash( ctx->hashmap_seed, fd_txn_m_payload( txnm )+txn->signature_off, 64UL );

int is_dup;
FD_TCACHE_INSERT( is_dup, *ctx->tcache_sync, ctx->tcache_ring, ctx->tcache_depth, ctx->tcache_map, ctx->tcache_map_cnt, ha_dedup_tag );
if( FD_LIKELY( is_dup ) ) {
ctx->metrics.dedup_fail_cnt++;
} else {
fd_stem_publish( stem, 0UL, 0, ctx->out_chunk, sz, 0UL, tsorig, 0UL );
ctx->out_chunk = fd_dcache_compact_next( ctx->out_chunk, sz, ctx->out_chunk0, ctx->out_wmark );
ulong realized_sz = fd_txn_m_realized_footprint( txnm, 0 );
ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_tickcount() );
fd_stem_publish( stem, 0UL, 0, ctx->out_chunk, realized_sz, 0UL, tsorig, tspub );
ctx->out_chunk = fd_dcache_compact_next( ctx->out_chunk, realized_sz, ctx->out_chunk0, ctx->out_wmark );
}
}

Expand All @@ -212,34 +182,6 @@ unprivileged_init( fd_topo_t * topo,
fd_topo_tile_t * tile ) {
void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );

/* Frankendancer has gossip_dedup, verify_dedup+
Firedancer has gossip_dedup, voter_dedup, verify_dedup+ */
ulong unparsed_in_cnt = 1;
if( FD_UNLIKELY( tile->in_cnt<2UL ) ) {
FD_LOG_ERR(( "dedup tile needs at least two input links, got %lu", tile->in_cnt ));
} else if( FD_UNLIKELY( strcmp( topo->links[ tile->in_link_id[ GOSSIP_IN_IDX ] ].name, "gossip_dedup" ) ) ) {
/* We have one link for gossip messages... */
FD_LOG_ERR(( "dedup tile has unexpected input links %lu %lu %s",
tile->in_cnt, GOSSIP_IN_IDX, topo->links[ tile->in_link_id[ GOSSIP_IN_IDX ] ].name ));
} else {
/* ...followed by a voter_dedup link if it were the Firedancer topology */
ulong voter_dedup_idx = fd_topo_find_tile_in_link( topo, tile, "voter_dedup", 0 );
if( voter_dedup_idx!=ULONG_MAX ) {
FD_TEST( voter_dedup_idx == VOTER_IN_IDX );
unparsed_in_cnt = 2;
} else {
unparsed_in_cnt = 1;
}

/* ...followed by a sequence of verify_dedup links */
for( ulong i=unparsed_in_cnt; i<tile->in_cnt; i++ ) {
if( FD_UNLIKELY( strcmp( topo->links[ tile->in_link_id[ i ] ].name, "verify_dedup" ) ) ) {
FD_LOG_ERR(( "dedup tile has unexpected input links %lu %lu %s",
tile->in_cnt, i, topo->links[ tile->in_link_id[ i ] ].name ));
}
}
}

FD_SCRATCH_ALLOC_INIT( l, scratch );
fd_dedup_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof( fd_dedup_ctx_t ), sizeof( fd_dedup_ctx_t ) );
fd_tcache_t * tcache = fd_tcache_join( fd_tcache_new( FD_SCRATCH_ALLOC_APPEND( l, fd_tcache_align(), fd_tcache_footprint( tile->dedup.tcache_depth, 0) ), tile->dedup.tcache_depth, 0 ) );
Expand All @@ -252,14 +194,23 @@ unprivileged_init( fd_topo_t * topo,
ctx->tcache_map = fd_tcache_map_laddr ( tcache );

FD_TEST( tile->in_cnt<=sizeof( ctx->in )/sizeof( ctx->in[ 0 ] ) );
ctx->unparsed_in_cnt = unparsed_in_cnt;
for( ulong i=0; i<tile->in_cnt; i++ ) {
for( ulong i=0UL; i<tile->in_cnt; i++ ) {
fd_topo_link_t * link = &topo->links[ tile->in_link_id[ i ] ];
fd_topo_wksp_t * link_wksp = &topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ];

ctx->in[i].mem = link_wksp->wksp;
ctx->in[i].chunk0 = fd_dcache_compact_chunk0( ctx->in[i].mem, link->dcache );
ctx->in[i].wmark = fd_dcache_compact_wmark ( ctx->in[i].mem, link->dcache, link->mtu );

if( FD_UNLIKELY( !strcmp( link->name, "gossip_dedup" ) ) ) {
ctx->in_kind[ i ] = IN_KIND_GOSSIP;
} else if( FD_UNLIKELY( !strcmp( link->name, "voter_dedup" ) ) ) {
ctx->in_kind[ i ] = IN_KIND_VOTER;
} else if( FD_UNLIKELY( !strcmp( link->name, "verify_dedup" ) ) ) {
ctx->in_kind[ i ] = IN_KIND_VERIFY;
} else {
FD_LOG_ERR(( "unexpected link name %s", link->name ));
}
}

ctx->out_mem = topo->workspaces[ topo->objs[ topo->links[ tile->out_link_id[ 0 ] ].dcache_obj_id ].wksp_id ].wksp;
Expand Down
33 changes: 10 additions & 23 deletions src/app/fdctl/run/tiles/fd_pack.c
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ during_frag( fd_pack_ctx_t * ctx,
return;
}
case IN_KIND_RESOLV: {
if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz>FD_TPU_RESOLVED_DCACHE_MTU ) )
if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz>FD_TPU_RESOLVED_MTU ) )
FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));

if( FD_UNLIKELY( (ctx->leader_slot==ULONG_MAX) & (sig>ctx->highest_observed_slot) ) ) {
Expand Down Expand Up @@ -676,33 +676,20 @@ during_frag( fd_pack_ctx_t * ctx,
ctx->cur_spot = fd_pack_insert_txn_init( ctx->pack );
#endif

ulong payload_sz;
/* We get transactions from the resolv tile.
The transactions should have been parsed and verified. */
FD_MCNT_INC( PACK, NORMAL_TRANSACTION_RECEIVED, 1UL );
/* Assume that the dcache entry is:
Payload ....... (payload_sz bytes)
0 or 1 byte of padding (since alignof(fd_txn) is 2)
fd_txn ....... (size computed by fd_txn_footprint)
Optionally:
0 to 30 bytes of padding
expanded alt (32*txn->addr_table_adtl_cnt) bytes
payload_sz (2B)
mline->sz includes all three or four fields and the padding */
payload_sz = *(ushort*)(dcache_entry + sz - sizeof(ushort));
uchar const * payload = dcache_entry;
fd_txn_t const * txn = (fd_txn_t const *)( dcache_entry + fd_ulong_align_up( payload_sz, 2UL ) );
ulong txn_footprint = fd_txn_footprint( txn->instr_cnt, txn->addr_table_lookup_cnt );
fd_acct_addr_t const * alt = (fd_acct_addr_t const *)( (uchar const *)dcache_entry + fd_ulong_align_up( fd_ulong_align_up( payload_sz, 2UL ) + txn_footprint, 32UL ) );
fd_memcpy( ctx->cur_spot->txnp->payload, payload, payload_sz );
fd_memcpy( TXN(ctx->cur_spot->txnp), txn, txn_footprint );
fd_memcpy( ctx->cur_spot->alt_accts, alt, 32UL*txn->addr_table_adtl_cnt );
ctx->cur_spot->txnp->payload_sz = payload_sz;

fd_txn_m_t * txnm = (fd_txn_m_t *)dcache_entry;
fd_txn_t * txn = fd_txn_m_txn_t( txnm );

fd_memcpy( ctx->cur_spot->txnp->payload, fd_txn_m_payload( txnm), txnm->payload_sz );
fd_memcpy( TXN(ctx->cur_spot->txnp), txn, txnm->txn_t_sz );
fd_memcpy( ctx->cur_spot->alt_accts, fd_txn_m_alut( txnm ), 32UL*txn->addr_table_adtl_cnt );
ctx->cur_spot->txnp->payload_sz = txnm->payload_sz;

#if DETAILED_LOGGING
FD_LOG_NOTICE(( "Pack got a packet. Payload size: %lu, txn footprint: %lu", payload_sz,
fd_txn_footprint( txn->instr_cnt, txn->addr_table_lookup_cnt )
));
FD_LOG_NOTICE(( "Pack got a packet. Payload size: %lu, txn footprint: %lu", txnm->payload_sz, txnm->txn_t_sz ));
#endif
break;
}
Expand Down
1 change: 1 addition & 0 deletions src/app/fdctl/run/tiles/fd_replay.c
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,7 @@ send_tower_sync( fd_replay_tile_ctx_t * ctx ) {
txn->payload );
} FD_SCRATCH_SCOPE_END;

/* TODO: Can use a smaller size, adjusted for payload length */
ulong msg_sz = sizeof( fd_txn_p_t );
fd_mcache_publish( ctx->sender_out_mcache,
ctx->sender_out_depth,
Expand Down
31 changes: 12 additions & 19 deletions src/app/fdctl/run/tiles/fd_resolv.c
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ after_frag( fd_resolv_ctx_t * ctx,
fd_stem_context_t * stem ) {
(void)seq;
(void)sig;
(void)sz;

if( FD_UNLIKELY( ctx->in[in_idx].kind==FD_RESOLV_IN_KIND_BANK ) ) {
switch( sig ) {
Expand Down Expand Up @@ -193,11 +194,8 @@ after_frag( fd_resolv_ctx_t * ctx,
return;
}

uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->out_mem, ctx->out_chunk );

ulong payload_sz = *(ushort*)(dcache_entry + sz - sizeof(ushort));
uchar const * payload = dcache_entry;
fd_txn_t const * txn = (fd_txn_t const *)( dcache_entry + fd_ulong_align_up( payload_sz, 2UL ) );
fd_txn_m_t * txnm = (fd_txn_m_t *)fd_chunk_to_laddr( ctx->out_mem, ctx->out_chunk );
fd_txn_t const * txnt = fd_txn_m_txn_t( txnm );

/* If we can't find the recent blockhash ... it means one of three things,
Expand All @@ -211,40 +209,35 @@ after_frag( fd_resolv_ctx_t * ctx,
introduce a holding area here to keep them until we know if they
are valid or not. */

ulong reference_slot = ctx->completed_slot;
blockhash_map_t const * blockhash = map_query_const( ctx->blockhash_map, *(blockhash_t*)( payload+txn->recent_blockhash_off ), NULL );
txnm->reference_slot = ctx->completed_slot;
blockhash_map_t const * blockhash = map_query_const( ctx->blockhash_map, *(blockhash_t*)( fd_txn_m_payload( txnm )+txnt->recent_blockhash_off ), NULL );
if( FD_LIKELY( blockhash ) ) {
reference_slot = blockhash->slot;
if( FD_UNLIKELY( reference_slot+151UL<ctx->completed_slot ) ) {
txnm->reference_slot = blockhash->slot;
if( FD_UNLIKELY( txnm->reference_slot+151UL<ctx->completed_slot ) ) {
ctx->metrics.blockhash_expired++;
return;
}
} else {
ctx->metrics.blockhash_unknown++;
}

if( FD_UNLIKELY( txn->addr_table_adtl_cnt ) ) {
if( FD_UNLIKELY( txnt->addr_table_adtl_cnt ) ) {
if( FD_UNLIKELY( !ctx->root_bank ) ) {
FD_MCNT_INC( RESOLV, NO_BANK_DROP, 1 );
return;
}

ulong txn_t_sz = fd_ulong_align_up( fd_ulong_align_up( payload_sz, 2UL ) + fd_txn_footprint( txn->instr_cnt, txn->addr_table_lookup_cnt ), 32UL );
fd_acct_addr_t * lut_accts = (fd_acct_addr_t*)(dcache_entry+txn_t_sz);
ushort * next_payload_sz = (ushort*)(dcache_entry+txn_t_sz+txn->addr_table_adtl_cnt*sizeof(fd_acct_addr_t));
int result = fd_bank_abi_resolve_address_lookup_tables( ctx->root_bank, 0, ctx->root_slot, txn, payload, lut_accts );
int result = fd_bank_abi_resolve_address_lookup_tables( ctx->root_bank, 0, ctx->root_slot, txnt, fd_txn_m_payload( txnm ), fd_txn_m_alut( txnm ) );
/* result is in [-5, 0]. We want to map -5 to 0, -4 to 1, etc. */
ctx->metrics.lut[ (ulong)((long)FD_METRICS_COUNTER_RESOLV_LUT_RESOLVED_CNT+result-1L) ]++;

if( FD_UNLIKELY( result!=FD_BANK_ABI_TXN_INIT_SUCCESS ) ) return;

*next_payload_sz = (ushort)payload_sz;
sz = txn_t_sz+txn->addr_table_adtl_cnt*sizeof(fd_acct_addr_t)+sizeof(ushort);
}

ulong realized_sz = fd_txn_m_realized_footprint( txnm, 1 );
ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
fd_stem_publish( stem, 0UL, reference_slot, ctx->out_chunk, sz, 0UL, tsorig, tspub );
ctx->out_chunk = fd_dcache_compact_next( ctx->out_chunk, sz, ctx->out_chunk0, ctx->out_wmark );
fd_stem_publish( stem, 0UL, txnm->reference_slot, ctx->out_chunk, realized_sz, 0UL, tsorig, tspub );
ctx->out_chunk = fd_dcache_compact_next( ctx->out_chunk, realized_sz, ctx->out_chunk0, ctx->out_wmark );
}

static void
Expand Down
Loading

0 comments on commit 09d4fd6

Please sign in to comment.