Skip to content

Commit 605aef1

Browse files
authored
CBST-02: add retries (#187)
* add retries * clippy
1 parent 9fdb987 commit 605aef1

File tree

3 files changed

+107
-8
lines changed

3 files changed

+107
-8
lines changed

crates/common/src/pbs/error.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@ impl PbsError {
3434
pub fn is_timeout(&self) -> bool {
3535
matches!(self, PbsError::Reqwest(err) if err.is_timeout())
3636
}
37+
38+
/// Whether the error is retryable in requests to relays
39+
pub fn should_retry(&self) -> bool {
40+
matches!(self, PbsError::RelayResponse { .. } | PbsError::Reqwest { .. })
41+
}
3742
}
3843

3944
#[derive(Debug, Error, PartialEq, Eq)]

crates/pbs/src/mev_boost/register_validator.rs

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use eyre::bail;
1010
use futures::future::{join_all, select_ok};
1111
use reqwest::header::USER_AGENT;
1212
use tracing::{debug, error, Instrument};
13+
use url::Url;
1314

1415
use crate::{
1516
constants::{MAX_SIZE_DEFAULT, REGISTER_VALIDATOR_ENDPOINT_TAG, TIMEOUT_ERROR_CODE_STR},
@@ -35,7 +36,7 @@ pub async fn register_validator<S: BuilderApiState>(
3536
let mut handles = Vec::with_capacity(relays.len());
3637
for relay in relays {
3738
handles.push(tokio::spawn(
38-
send_register_validator(
39+
send_register_validator_with_timeout(
3940
registrations.clone(),
4041
relay,
4142
send_headers.clone(),
@@ -63,15 +64,61 @@ pub async fn register_validator<S: BuilderApiState>(
6364
}
6465
}
6566

66-
#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id.as_ref()))]
67-
async fn send_register_validator(
67+
/// Register validator to relay, retry connection errors until the
68+
/// given timeout has passed
69+
async fn send_register_validator_with_timeout(
6870
registrations: Vec<ValidatorRegistration>,
6971
relay: RelayClient,
7072
headers: HeaderMap,
7173
timeout_ms: u64,
7274
) -> Result<(), PbsError> {
7375
let url = relay.register_validator_url()?;
76+
let mut remaining_timeout_ms = timeout_ms;
77+
let mut retry = 0;
78+
let mut backoff = Duration::from_millis(250);
79+
80+
loop {
81+
let start_request = Instant::now();
82+
match send_register_validator(
83+
url.clone(),
84+
&registrations,
85+
&relay,
86+
headers.clone(),
87+
remaining_timeout_ms,
88+
retry,
89+
)
90+
.await
91+
{
92+
Ok(_) => return Ok(()),
93+
94+
Err(err) if err.should_retry() => {
95+
tokio::time::sleep(backoff).await;
96+
backoff += Duration::from_millis(250);
97+
98+
remaining_timeout_ms =
99+
timeout_ms.saturating_sub(start_request.elapsed().as_millis() as u64);
100+
101+
if remaining_timeout_ms == 0 {
102+
return Err(err);
103+
}
104+
}
105+
106+
Err(err) => return Err(err),
107+
};
74108

109+
retry += 1;
110+
}
111+
}
112+
113+
#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id.as_ref(), retry = retry))]
114+
async fn send_register_validator(
115+
url: Url,
116+
registrations: &[ValidatorRegistration],
117+
relay: &RelayClient,
118+
headers: HeaderMap,
119+
timeout_ms: u64,
120+
retry: u32,
121+
) -> Result<(), PbsError> {
75122
let start_request = Instant::now();
76123
let res = match relay
77124
.client

crates/pbs/src/mev_boost/submit_block.rs

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use cb_common::{
1212
use futures::future::select_ok;
1313
use reqwest::header::USER_AGENT;
1414
use tracing::{debug, warn};
15+
use url::Url;
1516

1617
use crate::{
1718
constants::{MAX_SIZE_SUBMIT_BLOCK, SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, TIMEOUT_ERROR_CODE_STR},
@@ -37,11 +38,11 @@ pub async fn submit_block<S: BuilderApiState>(
3738
let relays = state.relays();
3839
let mut handles = Vec::with_capacity(relays.len());
3940
for relay in relays.iter() {
40-
handles.push(Box::pin(send_submit_block(
41+
handles.push(Box::pin(submit_block_with_timeout(
4142
&signed_blinded_block,
4243
relay,
4344
send_headers.clone(),
44-
state.config.pbs_config.timeout_get_payload_ms,
45+
state.pbs_config().timeout_get_payload_ms,
4546
)));
4647
}
4748

@@ -52,17 +53,63 @@ pub async fn submit_block<S: BuilderApiState>(
5253
}
5354
}
5455

56+
/// Submit blinded block to relay, retry connection errors until the
57+
/// given timeout has passed
58+
async fn submit_block_with_timeout(
59+
signed_blinded_block: &SignedBlindedBeaconBlock,
60+
relay: &RelayClient,
61+
headers: HeaderMap,
62+
timeout_ms: u64,
63+
) -> Result<SubmitBlindedBlockResponse, PbsError> {
64+
let url = relay.submit_block_url()?;
65+
let mut remaining_timeout_ms = timeout_ms;
66+
let mut retry = 0;
67+
let mut backoff = Duration::from_millis(250);
68+
69+
loop {
70+
let start_request = Instant::now();
71+
match send_submit_block(
72+
url.clone(),
73+
signed_blinded_block,
74+
relay,
75+
headers.clone(),
76+
remaining_timeout_ms,
77+
retry,
78+
)
79+
.await
80+
{
81+
Ok(response) => return Ok(response),
82+
83+
Err(err) if err.should_retry() => {
84+
tokio::time::sleep(backoff).await;
85+
backoff += Duration::from_millis(250);
86+
87+
remaining_timeout_ms =
88+
timeout_ms.saturating_sub(start_request.elapsed().as_millis() as u64);
89+
90+
if remaining_timeout_ms == 0 {
91+
return Err(err);
92+
}
93+
}
94+
95+
Err(err) => return Err(err),
96+
};
97+
98+
retry += 1;
99+
}
100+
}
101+
55102
// submits blinded signed block and expects the execution payload + blobs bundle
56103
// back
57-
#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id.as_ref()))]
104+
#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id.as_ref(), retry = retry))]
58105
async fn send_submit_block(
106+
url: Url,
59107
signed_blinded_block: &SignedBlindedBeaconBlock,
60108
relay: &RelayClient,
61109
headers: HeaderMap,
62110
timeout_ms: u64,
111+
retry: u32,
63112
) -> Result<SubmitBlindedBlockResponse, PbsError> {
64-
let url = relay.submit_block_url()?;
65-
66113
let start_request = Instant::now();
67114
let res = match relay
68115
.client

0 commit comments

Comments
 (0)