Skip to content

Commit 9124ad9

Browse files
committed
repair: queue-based repair replacement for tree-traversal
* handling tricky edge cases with dropping peers, and being at head of turbine, invalidating iterator on publish
1 parent ab3acc9 commit 9124ad9

File tree

9 files changed

+690
-436
lines changed

9 files changed

+690
-436
lines changed

src/discof/forest/fd_forest.c

Lines changed: 186 additions & 118 deletions
Large diffs are not rendered by default.

src/discof/forest/fd_forest.h

Lines changed: 143 additions & 100 deletions
Large diffs are not rendered by default.

src/discof/forest/test_forest.c

Lines changed: 204 additions & 96 deletions
Large diffs are not rendered by default.

src/discof/repair/fd_inflight.c

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ fd_inflights_join( void * shmem ) {
5151
}
5252

5353
void
54-
fd_inflights_request_insert( fd_inflights_t * table, ulong nonce, fd_pubkey_t const * pubkey ) {
54+
fd_inflights_request_insert( fd_inflights_t * table, ulong nonce, fd_pubkey_t const * pubkey, ulong slot, ulong shred_idx ) {
5555
if( FD_UNLIKELY( !fd_inflight_pool_free( table->pool ) ) ) {
5656
fd_inflight_t * evict = fd_inflight_dlist_ele_pop_head( table->dlist, table->pool );
5757
fd_inflight_map_ele_remove( table->map, &evict->nonce, NULL, table->pool );
@@ -62,8 +62,10 @@ fd_inflights_request_insert( fd_inflights_t * table, ulong nonce, fd_pubkey_t co
6262
inflight_req->nonce = nonce;
6363
inflight_req->timestamp_ns = fd_log_wallclock();
6464
inflight_req->pubkey = *pubkey;
65+
inflight_req->slot = slot;
66+
inflight_req->shred_idx = shred_idx;
6567

66-
fd_inflight_map_ele_insert( table->map, inflight_req, table->pool );
68+
fd_inflight_map_ele_insert ( table->map, inflight_req, table->pool );
6769
fd_inflight_dlist_ele_push_tail( table->dlist, inflight_req, table->pool );
6870
}
6971

@@ -84,6 +86,16 @@ fd_inflights_request_remove( fd_inflights_t * table, ulong nonce, fd_pubkey_t *
8486
return 0;
8587
}
8688

89+
void
90+
fd_inflights_request_pop( fd_inflights_t * table, ulong * nonce_out, ulong * slot_out, ulong * shred_idx_out ) {
91+
fd_inflight_t * inflight_req = fd_inflight_dlist_ele_pop_head( table->dlist, table->pool );
92+
fd_inflight_map_ele_remove( table->map, &inflight_req->nonce, NULL, table->pool );
93+
*nonce_out = inflight_req->nonce;
94+
*slot_out = inflight_req->slot;
95+
*shred_idx_out = inflight_req->shred_idx;
96+
fd_inflight_pool_ele_release( table->pool, inflight_req );
97+
}
98+
8799
fd_inflight_t *
88100
fd_inflights_request_query( fd_inflights_t * table, ulong nonce ) {
89101
return fd_inflight_map_ele_query( table->map, &nonce, NULL, table->pool );

src/discof/repair/fd_inflight.h

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,18 @@
44
#include "../../flamenco/types/fd_types.h"
55

66
/* fd_inflights tracks repair requests that are inflight to other
7-
validators. This module is not necessary for the repair protocol and
8-
strategy, but is useful for metrics and reporting. Incorrect updates
9-
and removals from this module are non-critical. Requests are key-ed
10-
by nonce as in the current strategy (see fd_policy.h), all requests
11-
have a unique nonce. The chances that an inflight request does not
12-
get a response are non-negligible due to shred tile upstream deduping
13-
duplicates. */
7+
validators. This module is useful for metrics and reporting.
8+
In-exact updates of orphan requests and highest window requests from
9+
this module are non-critical, but exact updates of shred requests are
10+
critical. Repair tile relies on this module to be able to re-request
11+
any shreds that it has sent, because policy next does not request any
12+
shred twice.
13+
(TODO should this be rolled into policy.h?)
14+
15+
Requests are key-ed by nonce as in the current strategy (see
16+
fd_policy.h), all requests have a unique nonce. The chances that an
17+
inflight request does not get a response are non-negligible due to
18+
shred tile upstream deduping duplicates. */
1419

1520
/* Max number of pending requests */
1621
#define FD_INFLIGHT_REQ_MAX (1<<20)
@@ -21,6 +26,9 @@ struct __attribute__((aligned(128UL))) fd_inflight {
2126
long timestamp_ns; /* timestamp when request was created (nanoseconds) */
2227
fd_pubkey_t pubkey; /* public key of the peer */
2328

29+
ulong slot; /* slot of the request */
30+
ulong shred_idx; /* shred index of the request */
31+
2432
/* Reserved for DLL eviction */
2533
ulong prevll; /* pool index of previous element in DLL */
2634
ulong nextll; /* pool index of next element in DLL */
@@ -74,11 +82,30 @@ fd_inflights_t *
7482
fd_inflights_join( void * shmem );
7583

7684
void
77-
fd_inflights_request_insert( fd_inflights_t * table, ulong nonce, fd_pubkey_t const * pubkey );
85+
fd_inflights_request_insert( fd_inflights_t * table, ulong nonce, fd_pubkey_t const * pubkey, ulong slot, ulong shred_idx );
7886

7987
long
8088
fd_inflights_request_remove( fd_inflights_t * table, ulong nonce, fd_pubkey_t * peer_out );
8189

90+
/* Important! Caller must guarantee that the request list is not empty.
91+
This function cannot fail and will always try to populate the output
92+
parameters. Typical use should only call this after
93+
fd_inflights_should_drain returns true. */
94+
95+
void
96+
fd_inflights_request_pop( fd_inflights_t * table, ulong * nonce_out, ulong * slot_out, ulong * shred_idx_out );
97+
98+
static inline int
99+
fd_inflights_should_drain( fd_inflights_t * table, long now ) {
100+
/* peek at head */
101+
if( FD_UNLIKELY( fd_inflight_dlist_is_empty( table->dlist, table->pool ) ) ) return 0;
102+
103+
fd_inflight_t * inflight_req = fd_inflight_dlist_ele_peek_head( table->dlist, table->pool );
104+
if( FD_UNLIKELY( inflight_req->timestamp_ns + 90e6L < now ) ) return 1;
105+
return 0;
106+
}
107+
108+
82109
fd_inflight_t *
83110
fd_inflights_request_query ( fd_inflights_t * table, ulong nonce );
84111

src/discof/repair/fd_policy.c

Lines changed: 34 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ fd_policy_new( void * shmem, ulong dedup_max, ulong peer_max, ulong seed ) {
4141
policy->peers.pool = fd_peer_pool_new ( peers_pool, peer_max );
4242
policy->peers.fast = fd_peer_dlist_new ( peers_fast );
4343
policy->peers.slow = fd_peer_dlist_new ( peers_slow );
44-
policy->iterf.ele_idx = ULONG_MAX;
4544
policy->turbine_slot0 = ULONG_MAX;
4645
policy->tsreset = 0;
4746
policy->nonce = 1;
@@ -165,6 +164,8 @@ fd_policy_peer_select( fd_policy_t * policy ) {
165164
fd_peer_dlist_t * worst_dlist = policy->peers.slow;
166165
fd_peer_t * pool = policy->peers.pool;
167166

167+
if( FD_UNLIKELY( fd_peer_pool_used( policy->peers.pool ) == 0 ) ) return NULL;
168+
168169
fd_peer_dlist_t * dlist = bucket_stages[policy->peers.select.stage] == FD_POLICY_LATENCY_FAST ? best_dlist : worst_dlist;
169170

170171
while( FD_UNLIKELY( fd_peer_dlist_iter_done( policy->peers.select.iter, dlist, pool ) ) ) {
@@ -202,18 +203,13 @@ fd_policy_next( fd_policy_t * policy, fd_forest_t * forest, fd_repair_t * repair
202203
}
203204
}
204205

205-
/* Every so often we'll need to reset the frontier iterator to the
206-
head of frontier, because we could end up traversing down a very
207-
long tree if we are far behind. */
206+
/**********************/
207+
/* ADVANCE ITERATOR */
208+
/**********************/
208209

209-
if( FD_UNLIKELY( now_ms - policy->tsreset > 100UL /* ms */ ||
210-
policy->iterf.frontier_ver != fd_fseq_query( fd_forest_ver_const( forest ) ) ) ) {
211-
fd_policy_reset( policy, forest );
212-
}
213-
214-
fd_forest_blk_t * ele = fd_forest_pool_ele( pool, policy->iterf.ele_idx );
215-
if( FD_UNLIKELY( !ele ) ) {
216-
// This happens when we are fully caught up i.e. we have all the shreds of every slot we know about.
210+
fd_forest_iter_next( forest );
211+
if( FD_UNLIKELY( fd_forest_iter_done( forest ) ) ) {
212+
// This happens when we have already requested all the shreds we know about.
217213
return NULL;
218214
}
219215

@@ -231,53 +227,34 @@ fd_policy_next( fd_policy_t * policy, fd_forest_t * forest, fd_repair_t * repair
231227
next valid requestable element. */
232228

233229
int req_made = 0;
234-
while( !req_made ) {
235-
ele = fd_forest_pool_ele( pool, policy->iterf.ele_idx );
236-
237-
if( FD_UNLIKELY( !passes_throttle_threshold( policy, ele ) ) ) {
238-
/* We are not ready to repair this slot yet. But it's possible we
239-
have another fork that we need to repair... so we just
240-
should skip to the next SLOT in the consumed iterator. The
241-
likelihood that this ele is the head of turbine is high, which
242-
means that the shred_idx of the iterf is likely to be UINT_MAX,
243-
which means calling fd_forest_iter_next will advance the iterf
244-
to the next slot. */
245-
policy->iterf.shred_idx = UINT_MAX; // heinous... i'm sorry
246-
policy->iterf = fd_forest_iter_next( policy->iterf, forest );
247-
if( FD_UNLIKELY( fd_forest_iter_done( policy->iterf, forest ) ) ) {
248-
policy->iterf = fd_forest_iter_init( forest );
249-
break;
250-
}
251-
continue;
252-
}
253230

254-
if( FD_UNLIKELY( policy->iterf.shred_idx == UINT_MAX ) ) {
255-
ulong key = fd_policy_dedup_key( FD_REPAIR_KIND_HIGHEST_SHRED, ele->slot, 0 );
256-
if( FD_UNLIKELY( ele->slot < highest_known_slot && !dedup_next( policy, key, now ) ) ) {
257-
// We'll never know the the highest shred for the current turbine slot, so there's no point in requesting it.
258-
out = fd_repair_highest_shred( repair, fd_policy_peer_select( policy ), now_ms, policy->nonce, ele->slot, 0 );
259-
policy->nonce++;
260-
req_made = 1;
261-
}
262-
} else {
263-
ulong key = fd_policy_dedup_key( FD_REPAIR_KIND_SHRED, ele->slot, policy->iterf.shred_idx );
264-
if( FD_UNLIKELY( !dedup_next( policy, key, now ) ) ) {
265-
out = fd_repair_shred( repair, fd_policy_peer_select( policy ), now_ms, policy->nonce, ele->slot, policy->iterf.shred_idx );
266-
policy->nonce++;
267-
if( FD_UNLIKELY( ele->first_req_ts == 0 ) ) ele->first_req_ts = fd_tickcount();
268-
req_made = 1;
269-
}
270-
}
271-
272-
/* Even if we have a request ready, we need to advance the iterator.
273-
Otherwise on the next call of policy_next, we'll try to re-request the
274-
same shred and it will get deduped. */
231+
fd_forest_blk_t * ele = fd_forest_pool_ele( pool, forest->iter.ele_idx );
232+
if( FD_UNLIKELY( !passes_throttle_threshold( policy, ele ) ) ) {
233+
/* We are not ready to repair this slot yet. But it's possible we
234+
have another fork that we need to repair... so we just
235+
should skip to the next SLOT in the consumed iterator. The
236+
likelihood that this ele is the head of turbine is high, which
237+
means that the shred_idx of the iterf is likely to be UINT_MAX,
238+
which means calling fd_forest_iter_next will advance the iterf
239+
to the next slot. */
240+
forest->iter.shred_idx = UINT_MAX;
241+
/* TODO: Heinous... I'm sorry. Easiest way to ensure this slot gets added back to the requests deque.
242+
but maybe there should be an explicit API for it. */
243+
return NULL;
244+
}
275245

276-
policy->iterf = fd_forest_iter_next( policy->iterf, forest );
277-
if( FD_UNLIKELY( fd_forest_iter_done( policy->iterf, forest ) ) ) {
278-
policy->iterf = fd_forest_iter_init( forest );
279-
break;
246+
if( FD_UNLIKELY( forest->iter.shred_idx == UINT_MAX ) ) {
247+
if( FD_UNLIKELY( ele->slot < highest_known_slot ) ) {
248+
// We'll never know the the highest shred for the current turbine slot, so there's no point in requesting it.
249+
out = fd_repair_highest_shred( repair, fd_policy_peer_select( policy ), now_ms, policy->nonce, ele->slot, 0 );
250+
policy->nonce++;
251+
req_made = 1;
280252
}
253+
} else {
254+
out = fd_repair_shred( repair, fd_policy_peer_select( policy ), now_ms, policy->nonce, ele->slot, forest->iter.shred_idx );
255+
policy->nonce++;
256+
if( FD_UNLIKELY( ele->first_req_ts == 0 ) ) ele->first_req_ts = fd_tickcount();
257+
req_made = 1;
281258
}
282259

283260
if( FD_UNLIKELY( !req_made ) ) return NULL;
@@ -325,7 +302,7 @@ fd_policy_peer_remove( fd_policy_t * policy, fd_pubkey_t const * key ) {
325302

326303
if( FD_UNLIKELY( policy->peers.select.iter == fd_peer_pool_idx( policy->peers.pool, peer_ele ) ) ) {
327304
/* In general removal during iteration is safe, except when the iterator is on the peer to be removed. */
328-
fd_peer_dlist_t * dlist = policy->peers.select.stage == FD_POLICY_LATENCY_FAST ? policy->peers.fast : policy->peers.slow;
305+
fd_peer_dlist_t * dlist = bucket_stages[policy->peers.select.stage] == FD_POLICY_LATENCY_FAST ? policy->peers.fast : policy->peers.slow;
329306
policy->peers.select.iter = fd_peer_dlist_iter_fwd_next( policy->peers.select.iter, dlist, policy->peers.pool );
330307
}
331308

@@ -365,12 +342,6 @@ fd_policy_peer_response_update( fd_policy_t * policy, fd_pubkey_t const * to, lo
365342
}
366343
}
367344

368-
void
369-
fd_policy_reset( fd_policy_t * policy, fd_forest_t * forest ) {
370-
policy->iterf = fd_forest_iter_init( forest );
371-
policy->tsreset = ts_ms( fd_log_wallclock() );
372-
}
373-
374345
void
375346
fd_policy_set_turbine_slot0( fd_policy_t * policy, ulong slot ) {
376347
policy->turbine_slot0 = slot;

src/discof/repair/fd_policy.h

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,6 @@ struct fd_policy {
180180
long tsmax; /* maximum time for an iteration before resetting the DFS to root */
181181
long tsref; /* reference timestamp for resetting DFS */
182182

183-
fd_forest_iter_t iterf; /* forest iterator */
184183
ulong tsreset; /* ms timestamp of last reset of iterf */
185184

186185
ulong turbine_slot0;
@@ -287,7 +286,4 @@ fd_policy_peer_response_update( fd_policy_t * policy, fd_pubkey_t const * to, lo
287286
void
288287
fd_policy_set_turbine_slot0( fd_policy_t * policy, ulong slot );
289288

290-
void
291-
fd_policy_reset( fd_policy_t * policy, fd_forest_t * forest );
292-
293-
#endif /* HEADER_fd_src_discof_repair_fd_policy_h */
289+
#endif /* HEADER_fd_src_choreo_policy_fd_policy_h */

src/discof/repair/fd_repair_metrics.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ struct fd_slot_metrics {
1818
};
1919
typedef struct fd_slot_metrics fd_slot_metrics_t;
2020

21-
#define FD_CATCHUP_METRICS_MAX 256
21+
#define FD_CATCHUP_METRICS_MAX 16384
2222

2323
struct fd_repair_metrics_t {
2424
fd_slot_metrics_t slots[ FD_CATCHUP_METRICS_MAX ];

0 commit comments

Comments
 (0)