Skip to content

Revive test_dedup #5744

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/app/firedancer-dev/commands/gossip.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ gossip_topo( config_t * config ) {

fd_topos_net_tile_finish( topo, 0UL );
fd_topob_auto_layout( topo, 0 );
topo->agave_affinity_cnt = 0;
fd_topob_finish( topo, CALLBACKS );
fd_topo_print_log( /* stdout */ 1, topo );
}
Expand Down
6 changes: 6 additions & 0 deletions src/app/firedancer-dev/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ extern fd_topo_run_tile_t fd_tile_benchs;
extern fd_topo_run_tile_t fd_tile_bundle;
extern fd_topo_run_tile_t fd_tile_pktgen;
extern fd_topo_run_tile_t fd_tile_udpecho;
extern fd_topo_run_tile_t fd_tile_TDupRx;
extern fd_topo_run_tile_t fd_tile_TDupTx;

extern fd_topo_run_tile_t fd_tile_gossip;
extern fd_topo_run_tile_t fd_tile_repair;
Expand Down Expand Up @@ -156,6 +158,8 @@ fd_topo_run_tile_t * TILES[] = {
&fd_tile_snaprd,
&fd_tile_snapdc,
&fd_tile_snapin,
&fd_tile_TDupRx,
&fd_tile_TDupTx,
NULL,
};

Expand Down Expand Up @@ -185,6 +189,7 @@ extern action_t fd_action_gossip;
extern action_t fd_action_sim;
extern action_t fd_action_backtest;
extern action_t fd_action_snapshot_load;
extern action_t fd_action_test_dedup;

action_t * ACTIONS[] = {
&fd_action_run,
Expand Down Expand Up @@ -213,6 +218,7 @@ action_t * ACTIONS[] = {
&fd_action_sim,
&fd_action_backtest,
&fd_action_snapshot_load,
&fd_action_test_dedup,
NULL,
};

Expand Down
23 changes: 23 additions & 0 deletions src/app/shared/fd_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,29 @@ union fdctl_args {
ushort listen_port;
} udpecho;

struct {
char affinity[ AFFINITY_SZ ];

ulong tx_cnt;
ulong tx_depth;
ulong tx_mtu;
ulong tcache_depth;
ulong tcache_map_cnt;
ulong dedup_depth;
ulong dedup_cr_max;
long dedup_lazy;
ulong rx_cnt;
ulong test_depth;
ulong test_map_cnt;
float burst_avg;

ulong pkt_payload_max;
ulong pkt_framing;
float pkt_bw;
float dup_frac;
float dup_avg_age;
} test_dedup;

};

typedef union fdctl_args args_t;
Expand Down
3 changes: 1 addition & 2 deletions src/app/shared_dev/commands/pktgen/pktgen.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pktgen_topo( config_t * config ) {
}
if( FD_LIKELY( !is_auto_affinity ) ) {
if( FD_UNLIKELY( affinity_tile_cnt!=4UL ) )
FD_LOG_ERR(( "Invalid [development.pktgen.affinity]: must include exactly three CPUs" ));
FD_LOG_ERR(( "Invalid [development.pktgen.affinity]: must include exactly 4 CPUs" ));
}

/* Reset topology from scratch */
Expand Down Expand Up @@ -71,7 +71,6 @@ pktgen_topo( config_t * config ) {

fd_topos_net_tile_finish( topo, 0UL );
if( FD_UNLIKELY( is_auto_affinity ) ) fd_topob_auto_layout( topo, 0 );
topo->agave_affinity_cnt = 0;
fd_topob_finish( topo, CALLBACKS );
fd_topo_print_log( /* stdout */ 1, topo );
}
Expand Down
11 changes: 11 additions & 0 deletions src/app/shared_dev/commands/test_dedup/Local.mk
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
ifdef FD_HAS_HOSTED
ifdef FD_HAS_LINUX # FIXME why is this needed
ifdef FD_HAS_AVX

$(call add-objs,test_dedup,fddev_shared)
$(call add-objs,test_dedup_rx_tile,fddev_shared)
$(call add-objs,test_dedup_tx_tile,fddev_shared)

endif
endif
endif
116 changes: 116 additions & 0 deletions src/app/shared_dev/commands/test_dedup/test_dedup.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
#include "../../../shared/commands/configure/configure.h" /* CONFIGURE_CMD_INIT */
#include "../../../shared/commands/run/run.h" /* fdctl_check_configure */
#include "../../../../disco/net/fd_net_tile.h"
#include "../../../../disco/topo/fd_topob.h"
#include "../../../../disco/topo/fd_cpu_topo.h"
#include "../../../../util/tile/fd_tile_private.h" /* fd_tile_private_cpus_parse */

#include <unistd.h> /* pause */

extern fd_topo_obj_callbacks_t * CALLBACKS[];

fd_topo_run_tile_t
fdctl_tile_run( fd_topo_tile_t const * tile );

static void
test_dedup_topo( config_t * config,
char const * affinity ) {
int is_auto_affinity = !strcmp( affinity, "auto" );
ushort parsed_tile_to_cpu[ FD_TILE_MAX ];
for( ulong i=0UL; i<FD_TILE_MAX; i++ ) parsed_tile_to_cpu[ i ] = USHORT_MAX;

fd_topo_cpus_t cpus[1];
fd_topo_cpus_init( cpus );

ulong affinity_tile_cnt = 0UL;
if( FD_LIKELY( !is_auto_affinity ) ) affinity_tile_cnt = fd_tile_private_cpus_parse( affinity, parsed_tile_to_cpu );

ulong tile_to_cpu[ FD_TILE_MAX ] = {0};
for( ulong i=0UL; i<affinity_tile_cnt; i++ ) {
if( FD_UNLIKELY( parsed_tile_to_cpu[ i ]!=USHORT_MAX && parsed_tile_to_cpu[ i ]>=cpus->cpu_cnt ) )
FD_LOG_ERR(( "The --affinity flag specifies a CPU index of %hu, but the system only has %lu CPUs. You should either change the CPU allocations in the affinity string, or increase the number of CPUs in the system.",
parsed_tile_to_cpu[ i ], cpus->cpu_cnt ));
tile_to_cpu[ i ] = fd_ulong_if( parsed_tile_to_cpu[ i ]==USHORT_MAX, ULONG_MAX, (ulong)parsed_tile_to_cpu[ i ] );
}
if( FD_LIKELY( !is_auto_affinity ) ) {
FD_LOG_NOTICE(( "Using --affinity %s", affinity ));
FD_LOG_NOTICE(( "affinity_tile_cnt %lu", affinity_tile_cnt ));
if( FD_UNLIKELY( affinity_tile_cnt!=3UL ) )
FD_LOG_ERR(( "Invalid --affinity: must include exactly 3 CPUs" ));
}

/* Reset topology from scratch */
fd_topo_t * topo = &config->topo;
fd_topob_new( &config->topo, config->name );
topo->max_page_size = fd_cstr_to_shmem_page_sz( config->hugetlbfs.max_page_size );

fd_topob_wksp( topo, "test_dedup" );
fd_topob_wksp( topo, "metric_in" );
fd_topo_tile_t * dedup_tile = fd_topob_tile( topo, "dedup", "test_dedup", "metric_in", tile_to_cpu[ 0 ], 0, 0 );
fd_topo_tile_t * tduptx_tile = fd_topob_tile( topo, "TDupTx", "test_dedup", "metric_in", tile_to_cpu[ 1 ], 0, 0 );
fd_topo_tile_t * tduprx_tile = fd_topob_tile( topo, "TDupRx", "test_dedup", "metric_in", tile_to_cpu[ 2 ], 0, 0 );

if( FD_UNLIKELY( is_auto_affinity ) ) fd_topob_auto_layout( topo, 0 );
fd_topob_finish( topo, CALLBACKS );
fd_topo_print_log( /* stdout */ 1, topo );
}

void
test_dedup_cmd_args( int * pargc,
char *** pargv,
args_t * args ) {
char const * affinity = fd_env_strip_cmdline_cstr ( pargc, pargv, "--affinity", NULL, "auto" );
args->test_dedup.tx_cnt = fd_env_strip_cmdline_ulong( pargc, pargv, "--tx-cnt", NULL, 2UL );
args->test_dedup.tx_depth = fd_env_strip_cmdline_ulong( pargc, pargv, "--tx-depth", NULL, 32768UL );
args->test_dedup.tx_mtu = fd_env_strip_cmdline_ulong( pargc, pargv, "--tx-mtu", NULL, 1472UL );
args->test_dedup.tcache_depth = fd_env_strip_cmdline_ulong( pargc, pargv, "--tcache-depth", NULL, 4194302UL );
args->test_dedup.tcache_map_cnt = fd_env_strip_cmdline_ulong( pargc, pargv, "--tcache-map-cnt", NULL, 0UL /* use default */ );
args->test_dedup.dedup_depth = fd_env_strip_cmdline_ulong( pargc, pargv, "--dedup-depth", NULL, 32768UL );
args->test_dedup.dedup_cr_max = fd_env_strip_cmdline_ulong( pargc, pargv, "--dedup-cr-max", NULL, 0UL /* use default */ );
args->test_dedup.dedup_lazy = fd_env_strip_cmdline_long ( pargc, pargv, "--dedup-lazy", NULL, 0L /* use default */ );
args->test_dedup.rx_cnt = fd_env_strip_cmdline_ulong( pargc, pargv, "--rx-cnt", NULL, 2UL );
args->test_dedup.test_depth = fd_env_strip_cmdline_ulong( pargc, pargv, "--test-depth", NULL, 2046UL );
args->test_dedup.test_map_cnt = fd_env_strip_cmdline_ulong( pargc, pargv, "--test-map-cnt", NULL, 0UL /* use default */ );

args->test_dedup.burst_avg = fd_env_strip_cmdline_float( pargc, pargv, "--burst-avg", NULL, 1472.f );
args->test_dedup.pkt_payload_max = fd_env_strip_cmdline_ulong( pargc, pargv, "--pkt-payload-max", NULL, 1472UL );
args->test_dedup.pkt_framing = fd_env_strip_cmdline_ulong( pargc, pargv, "--pkt-framing", NULL, 70UL );
args->test_dedup.pkt_bw = fd_env_strip_cmdline_float( pargc, pargv, "--pkt-bw", NULL, 25e9f );
args->test_dedup.dup_frac = fd_env_strip_cmdline_float( pargc, pargv, "--dup-frac", NULL, 0.9f );
args->test_dedup.dup_avg_age = fd_env_strip_cmdline_float( pargc, pargv, "--dup-avg-age", NULL, 1e-3f*(float)args->test_dedup.tcache_depth );

FD_TEST( strlen( affinity )<sizeof(args->test_dedup.affinity) );
fd_cstr_fini( fd_cstr_append_cstr( fd_cstr_init( args->test_dedup.affinity ), affinity ) );

if( FD_UNLIKELY( !args->test_dedup.tx_cnt ) ) FD_LOG_ERR(( "tx_cnt should be positive" ));
if( FD_UNLIKELY( !args->test_dedup.rx_cnt ) ) FD_LOG_ERR(( "rx_cnt should be positive" ));

if( FD_UNLIKELY( args->test_dedup.test_depth>args->test_dedup.tcache_depth ) ) {
FD_LOG_ERR(( "--test-depth should be at most --tcache-depth" ));
}
}

void
test_dedup_cmd_fn( args_t * args,
config_t * config ) {
test_dedup_topo( config, args->test_dedup.affinity );
fd_topo_t * topo = &config->topo;

configure_stage( &fd_cfg_stage_hugetlbfs, CONFIGURE_CMD_INIT, config );

fdctl_check_configure( config );
initialize_workspaces( config );
initialize_stacks( config );
fd_topo_join_workspaces( topo, FD_SHMEM_JOIN_MODE_READ_WRITE );

/* FIXME allow running sandboxed/multiprocess */
fd_topo_run_single_process( topo, 2, config->uid, config->gid, fdctl_tile_run );
for(;;) pause();
}

action_t fd_action_test_dedup = {
.name = "test-dedup",
.args = test_dedup_cmd_args,
.fn = test_dedup_cmd_fn,
.description = "Test the dedup tile"
};
145 changes: 145 additions & 0 deletions src/app/shared_dev/commands/test_dedup/test_dedup_rx_tile.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
#include "../../../../disco/topo/fd_topo.h"

struct test_dedup_rx_ctx {
fd_rng_t rng[1];

void * rx_base;

ulong tcache_depth;
ulong tcache_map_cnt;
ulong * _tcache_sync;
ulong tcache_sync;
ulong * _tcache_ring;
ulong * _tcache_map;

ulong diag_iter;
long diag_last_ts;
long diag_interval;

uint corrupt : 1;
};

typedef struct test_dedup_rx_ctx test_dedup_rx_ctx_t;

static void
during_housekeeping( test_dedup_rx_ctx_t * ctx ) {
/* Update synchronization info */
FD_COMPILER_MFENCE();
FD_VOLATILE( *ctx->_tcache_sync ) = ctx->tcache_sync;
FD_COMPILER_MFENCE();

/* Send diagnostic info */
long now = fd_log_wallclock();
long dt = now - ctx->diag_last_ts;
if( FD_UNLIKELY( dt > (long)1e9 ) ) {
float mfps = (1e3f*(float)ctx->diag_iter) / (float)dt;
FD_LOG_NOTICE(( "%7.3f Mfrag/s rx", (double)mfps ));
ctx->diag_last_ts = now;
ctx->diag_iter = 0UL;
}
}

static inline void
during_frag( test_dedup_rx_ctx_t * ctx,
ulong in_idx,
ulong seq,
ulong sig,
ulong chunk,
ulong sz,
ulong ctl ) {
(void)in_idx; (void)seq; (void)ctl;

/* Process the received fragment (FIXME: also validate continuity of
the individual tx streams via sig too, validate control bits, add
latency and bandwidth stats). */

int is_dup;
FD_TCACHE_INSERT( is_dup, ctx->tcache_sync, ctx->_tcache_ring, ctx->tcache_depth, ctx->_tcache_map, ctx->tcache_map_cnt, sig );
if( FD_UNLIKELY( is_dup ) ) FD_LOG_ERR(( "Received a duplicate" ));

uchar const * p = (uchar const *)fd_chunk_to_laddr_const( ctx->rx_base, chunk );
__m256i avx = _mm256_set1_epi64x( (long)sig );
int mask0 = -1;
int mask1 = -1;
int mask2 = -1;
int mask3 = -1;
for( ulong off=0UL; off<sz; off+=128UL ) {
mask0 &= _mm256_movemask_epi8( _mm256_cmpeq_epi8( _mm256_load_si256( (__m256i *) p ), avx ) );
mask1 &= _mm256_movemask_epi8( _mm256_cmpeq_epi8( _mm256_load_si256( (__m256i *)(p+32UL) ), avx ) );
mask2 &= _mm256_movemask_epi8( _mm256_cmpeq_epi8( _mm256_load_si256( (__m256i *)(p+64UL) ), avx ) );
mask3 &= _mm256_movemask_epi8( _mm256_cmpeq_epi8( _mm256_load_si256( (__m256i *)(p+96UL) ), avx ) );
p += 128UL;
}

/* Validate that the frag payload was as expected */
ctx->corrupt = ((mask0 & mask1 & mask2 & mask3)!=-1);
}

static inline void
after_frag( test_dedup_rx_ctx_t * ctx,
ulong in_idx,
ulong in_seq,
ulong in_sig,
ulong in_sz,
ulong in_tsorig,
ulong in_tspub,
fd_stem_context_t * stem ) {
(void)in_idx; (void)in_seq; (void)in_sig; (void)in_sz; (void)in_tsorig; (void)in_tspub; (void)stem;
if( FD_UNLIKELY( ctx->corrupt ) ) FD_LOG_ERR(( "Corrupt payload received" ));
ctx->diag_iter++;
}

#define STEM_BURST 1
#define STEM_LAZY ((long)2e6)
#define STEM_CALLBACK_CONTEXT_ALIGN alignof(test_dedup_rx_ctx_t)
#define STEM_CALLBACK_CONTEXT_TYPE test_dedup_rx_ctx_t
#define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
#define STEM_CALLBACK_DURING_FRAG during_frag
#define STEM_CALLBACK_AFTER_FRAG after_frag
#include "../../../../disco/stem/fd_stem.c"

static ulong
scratch_align( void ) {
return fd_ulong_max( alignof(test_dedup_rx_ctx_t), fd_tcache_align() );
}

static ulong
scratch_footprint( fd_topo_tile_t const * tile ) {
(void)tile;
return FD_LAYOUT_FINI( FD_LAYOUT_APPEND( FD_LAYOUT_APPEND( FD_LAYOUT_INIT,
alignof(test_dedup_rx_ctx_t), sizeof(test_dedup_rx_ctx_t) ),
fd_tcache_align(), fd_tcache_footprint( tile->test_dedup_rx.tcache_depth, tile->test_dedup_rx.tcache_map_cnt ) ),
scratch_align() );
}

static void
unprivileged_init( fd_topo_t * topo,
fd_topo_tile_t * tile ) {
FD_TEST( tile->in_cnt>0 );

FD_SCRATCH_ALLOC_INIT( l, fd_topo_obj_laddr( topo, tile->tile_obj_id ) );
test_dedup_rx_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(test_dedup_rx_ctx_t), sizeof(test_dedup_rx_ctx_t) );
void * tcache_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_tcache_align(), fd_tcache_footprint( tile->test_dedup_rx.tcache_depth, tile->test_dedup_rx.tcache_map_cnt ) );
FD_SCRATCH_ALLOC_FINI( l, scratch_align() );

memset( ctx, 0, sizeof(test_dedup_rx_ctx_t) );
fd_tcache_t * tcache = fd_tcache_join( fd_tcache_new( tcache_mem, tile->test_dedup_rx.tcache_depth, tile->test_dedup_rx.tcache_map_cnt ) );
FD_TEST( tcache );

ctx->tcache_depth = fd_tcache_depth ( tcache );
ctx->tcache_map_cnt = fd_tcache_map_cnt ( tcache );
ctx->_tcache_sync = fd_tcache_oldest_laddr( tcache );
ctx->_tcache_ring = fd_tcache_ring_laddr ( tcache );
ctx->_tcache_map = fd_tcache_map_laddr ( tcache );
ctx->tcache_sync = *ctx->_tcache_sync;

FD_TEST( fd_rng_join( fd_rng_new( ctx->rng, tile->test_dedup_rx.rng_seq, 0UL ) ) );
}

fd_topo_run_tile_t fd_tile_TDupRx = {
.name = "TDupRx",
.scratch_align = scratch_align,
.scratch_footprint = scratch_footprint,
.unprivileged_init = unprivileged_init,
.run = stem_run
};
Loading
Loading