|
| 1 | +#include "../../../../disco/topo/fd_topo.h" |
| 2 | + |
| 3 | +struct test_dedup_rx_ctx { |
| 4 | + fd_rng_t rng[1]; |
| 5 | + |
| 6 | + void * rx_base; |
| 7 | + |
| 8 | + ulong tcache_depth; |
| 9 | + ulong tcache_map_cnt; |
| 10 | + ulong * _tcache_sync; |
| 11 | + ulong tcache_sync; |
| 12 | + ulong * _tcache_ring; |
| 13 | + ulong * _tcache_map; |
| 14 | + |
| 15 | + ulong diag_iter; |
| 16 | + long diag_last_ts; |
| 17 | + long diag_interval; |
| 18 | + |
| 19 | + uint corrupt : 1; |
| 20 | +}; |
| 21 | + |
| 22 | +typedef struct test_dedup_rx_ctx test_dedup_rx_ctx_t; |
| 23 | + |
| 24 | +static void |
| 25 | +during_housekeeping( test_dedup_rx_ctx_t * ctx ) { |
| 26 | + /* Update synchronization info */ |
| 27 | + FD_COMPILER_MFENCE(); |
| 28 | + FD_VOLATILE( *ctx->_tcache_sync ) = ctx->tcache_sync; |
| 29 | + FD_COMPILER_MFENCE(); |
| 30 | + |
| 31 | + /* Send diagnostic info */ |
| 32 | + long now = fd_log_wallclock(); |
| 33 | + long dt = now - ctx->diag_last_ts; |
| 34 | + if( FD_UNLIKELY( dt > (long)1e9 ) ) { |
| 35 | + float mfps = (1e3f*(float)ctx->diag_iter) / (float)dt; |
| 36 | + FD_LOG_NOTICE(( "%7.3f Mfrag/s rx", (double)mfps )); |
| 37 | + ctx->diag_last_ts = now; |
| 38 | + ctx->diag_iter = 0UL; |
| 39 | + } |
| 40 | +} |
| 41 | + |
| 42 | +static inline void |
| 43 | +during_frag( test_dedup_rx_ctx_t * ctx, |
| 44 | + ulong in_idx, |
| 45 | + ulong seq, |
| 46 | + ulong sig, |
| 47 | + ulong chunk, |
| 48 | + ulong sz, |
| 49 | + ulong ctl ) { |
| 50 | + (void)in_idx; (void)seq; (void)ctl; |
| 51 | + |
| 52 | + /* Process the received fragment (FIXME: also validate continuity of |
| 53 | + the individual tx streams via sig too, validate control bits, add |
| 54 | + latency and bandwidth stats). */ |
| 55 | + |
| 56 | + int is_dup; |
| 57 | + FD_TCACHE_INSERT( is_dup, ctx->tcache_sync, ctx->_tcache_ring, ctx->tcache_depth, ctx->_tcache_map, ctx->tcache_map_cnt, sig ); |
| 58 | + if( FD_UNLIKELY( is_dup ) ) FD_LOG_ERR(( "Received a duplicate" )); |
| 59 | + |
| 60 | + uchar const * p = (uchar const *)fd_chunk_to_laddr_const( ctx->rx_base, chunk ); |
| 61 | + __m256i avx = _mm256_set1_epi64x( (long)sig ); |
| 62 | + int mask0 = -1; |
| 63 | + int mask1 = -1; |
| 64 | + int mask2 = -1; |
| 65 | + int mask3 = -1; |
| 66 | + for( ulong off=0UL; off<sz; off+=128UL ) { |
| 67 | + mask0 &= _mm256_movemask_epi8( _mm256_cmpeq_epi8( _mm256_load_si256( (__m256i *) p ), avx ) ); |
| 68 | + mask1 &= _mm256_movemask_epi8( _mm256_cmpeq_epi8( _mm256_load_si256( (__m256i *)(p+32UL) ), avx ) ); |
| 69 | + mask2 &= _mm256_movemask_epi8( _mm256_cmpeq_epi8( _mm256_load_si256( (__m256i *)(p+64UL) ), avx ) ); |
| 70 | + mask3 &= _mm256_movemask_epi8( _mm256_cmpeq_epi8( _mm256_load_si256( (__m256i *)(p+96UL) ), avx ) ); |
| 71 | + p += 128UL; |
| 72 | + } |
| 73 | + |
| 74 | + /* Validate that the frag payload was as expected */ |
| 75 | + ctx->corrupt = ((mask0 & mask1 & mask2 & mask3)!=-1); |
| 76 | +} |
| 77 | + |
| 78 | +static inline void |
| 79 | +after_frag( test_dedup_rx_ctx_t * ctx, |
| 80 | + ulong in_idx, |
| 81 | + ulong in_seq, |
| 82 | + ulong in_sig, |
| 83 | + ulong in_sz, |
| 84 | + ulong in_tsorig, |
| 85 | + ulong in_tspub, |
| 86 | + fd_stem_context_t * stem ) { |
| 87 | + (void)in_idx; (void)in_seq; (void)in_sig; (void)in_sz; (void)in_tsorig; (void)in_tspub; (void)stem; |
| 88 | + if( FD_UNLIKELY( ctx->corrupt ) ) FD_LOG_ERR(( "Corrupt payload received" )); |
| 89 | + ctx->diag_iter++; |
| 90 | +} |
| 91 | + |
| 92 | +#define STEM_BURST 1 |
| 93 | +#define STEM_LAZY ((long)2e6) |
| 94 | +#define STEM_CALLBACK_CONTEXT_ALIGN alignof(test_dedup_rx_ctx_t) |
| 95 | +#define STEM_CALLBACK_CONTEXT_TYPE test_dedup_rx_ctx_t |
| 96 | +#define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping |
| 97 | +#define STEM_CALLBACK_DURING_FRAG during_frag |
| 98 | +#define STEM_CALLBACK_AFTER_FRAG after_frag |
| 99 | +#include "../../../../disco/stem/fd_stem.c" |
| 100 | + |
| 101 | +static ulong |
| 102 | +scratch_align( void ) { |
| 103 | + return fd_ulong_max( alignof(test_dedup_rx_ctx_t), fd_tcache_align() ); |
| 104 | +} |
| 105 | + |
| 106 | +static ulong |
| 107 | +scratch_footprint( fd_topo_tile_t const * tile ) { |
| 108 | + (void)tile; |
| 109 | + return FD_LAYOUT_FINI( FD_LAYOUT_APPEND( FD_LAYOUT_APPEND( FD_LAYOUT_INIT, |
| 110 | + alignof(test_dedup_rx_ctx_t), sizeof(test_dedup_rx_ctx_t) ), |
| 111 | + fd_tcache_align(), fd_tcache_footprint( tile->test_dedup_rx.tcache_depth, tile->test_dedup_rx.tcache_map_cnt ) ), |
| 112 | + scratch_align() ); |
| 113 | +} |
| 114 | + |
| 115 | +static void |
| 116 | +unprivileged_init( fd_topo_t * topo, |
| 117 | + fd_topo_tile_t * tile ) { |
| 118 | + FD_TEST( tile->in_cnt>0 ); |
| 119 | + |
| 120 | + FD_SCRATCH_ALLOC_INIT( l, fd_topo_obj_laddr( topo, tile->tile_obj_id ) ); |
| 121 | + test_dedup_rx_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(test_dedup_rx_ctx_t), sizeof(test_dedup_rx_ctx_t) ); |
| 122 | + void * tcache_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_tcache_align(), fd_tcache_footprint( tile->test_dedup_rx.tcache_depth, tile->test_dedup_rx.tcache_map_cnt ) ); |
| 123 | + FD_SCRATCH_ALLOC_FINI( l, scratch_align() ); |
| 124 | + |
| 125 | + memset( ctx, 0, sizeof(test_dedup_rx_ctx_t) ); |
| 126 | + fd_tcache_t * tcache = fd_tcache_join( fd_tcache_new( tcache_mem, tile->test_dedup_rx.tcache_depth, tile->test_dedup_rx.tcache_map_cnt ) ); |
| 127 | + FD_TEST( tcache ); |
| 128 | + |
| 129 | + ctx->tcache_depth = fd_tcache_depth ( tcache ); |
| 130 | + ctx->tcache_map_cnt = fd_tcache_map_cnt ( tcache ); |
| 131 | + ctx->_tcache_sync = fd_tcache_oldest_laddr( tcache ); |
| 132 | + ctx->_tcache_ring = fd_tcache_ring_laddr ( tcache ); |
| 133 | + ctx->_tcache_map = fd_tcache_map_laddr ( tcache ); |
| 134 | + ctx->tcache_sync = *ctx->_tcache_sync; |
| 135 | + |
| 136 | + FD_TEST( fd_rng_join( fd_rng_new( ctx->rng, tile->test_dedup_rx.rng_seq, 0UL ) ) ); |
| 137 | +} |
| 138 | + |
| 139 | +fd_topo_run_tile_t fd_tile_TDupRx = { |
| 140 | + .name = "TDupRx", |
| 141 | + .scratch_align = scratch_align, |
| 142 | + .scratch_footprint = scratch_footprint, |
| 143 | + .unprivileged_init = unprivileged_init, |
| 144 | + .run = stem_run |
| 145 | +}; |
0 commit comments