Skip to content

fix(pbs): timeout of get header #291

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 62 additions & 43 deletions crates/pbs/src/mev_boost/get_header.rs
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ use cb_common::{
use futures::future::join_all;
use parking_lot::RwLock;
use reqwest::{header::USER_AGENT, StatusCode};
use tokio::time::sleep;
use tokio::time::{sleep, timeout};
use tracing::{debug, error, warn, Instrument};
use tree_hash::TreeHash;
use url::Url;
@@ -192,21 +192,20 @@ async fn send_timed_get_header(
send_freq_ms, timeout_left_ms, "TG: sending multiple header requests"
);

let mut request_id = 0;

loop {
handles.push(tokio::spawn(
handles.push(tokio::spawn(timeout(
Duration::from_millis(timeout_left_ms),
send_one_get_header(
params,
relay.clone(),
chain,
RequestContext {
timeout_ms: timeout_left_ms,
url: url.clone(),
headers: headers.clone(),
},
RequestContext { url: url.clone(), request_id, headers: headers.clone() },
validation.clone(),
)
.in_current_span(),
));
)));

if timeout_left_ms > send_freq_ms {
// enough time for one more
@@ -215,38 +214,45 @@ async fn send_timed_get_header(
} else {
break;
}

request_id += 1;
}

debug!(relay_id = relay.id.as_ref(), "TG: joining header requests");

let results = join_all(handles).await;

let filtered = results.into_iter().filter_map(|res| match res {
// ignore join error and timeouts
Ok(Ok(r)) => Some(r),
_ => None,
});

let mut maybe_header = None;
let mut start_time = 0;
let mut n_responses = 0;
let mut n_headers = 0;

if let Some((_, maybe_header)) = results
.into_iter()
.filter_map(|res| {
// ignore join error and timeouts, log other errors
res.ok().and_then(|inner_res| match inner_res {
Ok(maybe_header) => {
if maybe_header.1.is_some() {
n_headers += 1;
Some(maybe_header)
} else {
// filter out 204 responses that are returned if the request
// is after the relay cutoff
None
for res in filtered {
n_responses += 1;
match res {
Ok((req_start_time, req_header)) => {
if req_header.is_some() {
n_headers += 1;

if req_start_time > start_time {
start_time = req_start_time;
maybe_header = req_header;
}
}
Err(err) if err.is_timeout() => None,
Err(err) => {
error!(relay_id = relay.id.as_ref(),%err, "TG: error sending header request");
None
}
})
})
.max_by_key(|(start_time, _)| *start_time)
{
debug!(relay_id = relay.id.as_ref(), n_headers, "TG: received headers from relay");
return Ok(maybe_header);
} else {
}
Err(err) => {
error!(relay_id = relay.id.as_ref(),%err, "TG: error sending header request");
}
}
}

if n_responses == 0 {
// all requests failed
warn!(relay_id = relay.id.as_ref(), "TG: no headers received");

@@ -255,6 +261,13 @@ async fn send_timed_get_header(
code: TIMEOUT_ERROR_CODE,
});
}

debug!(
relay_id = relay.id.as_ref(),
n_responses, n_headers, "TG: received responses from relay"
);

return Ok(maybe_header);
}
}

@@ -263,7 +276,7 @@ async fn send_timed_get_header(
params,
relay,
chain,
RequestContext { timeout_ms: timeout_left_ms, url, headers },
RequestContext { url, request_id: 0, headers },
validation,
)
.await
@@ -272,7 +285,7 @@ async fn send_timed_get_header(

struct RequestContext {
url: Url,
timeout_ms: u64,
request_id: u64,
headers: HeaderMap,
}

@@ -291,21 +304,20 @@ async fn send_one_get_header(
mut req_config: RequestContext,
validation: ValidationContext,
) -> Result<(u64, Option<GetHeaderResponse>), PbsError> {
debug!(
relay_id = relay.id.as_ref(),
req_id = req_config.request_id,
"sending get_header request"
);

// the timestamp in the header is the consensus block time which is fixed,
// use the beginning of the request as proxy to make sure we use only the
// last one received
let start_request_time = utcnow_ms();
req_config.headers.insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from(start_request_time));

let start_request = Instant::now();
let res = match relay
.client
.get(req_config.url)
.timeout(Duration::from_millis(req_config.timeout_ms))
.headers(req_config.headers)
.send()
.await
{
let res = match relay.client.get(req_config.url).headers(req_config.headers).send().await {
Ok(res) => res,
Err(err) => {
RELAY_STATUS_CODE
@@ -315,6 +327,8 @@ async fn send_one_get_header(
}
};

debug!(relay_id = relay.id.as_ref(), req_id = req_config.request_id, "received relay response");

let request_latency = start_request.elapsed();
RELAY_LATENCY
.with_label_values(&[GET_HEADER_ENDPOINT_TAG, &relay.id])
@@ -324,6 +338,9 @@ async fn send_one_get_header(
RELAY_STATUS_CODE.with_label_values(&[code.as_str(), GET_HEADER_ENDPOINT_TAG, &relay.id]).inc();

let response_bytes = read_chunked_body_with_max(res, MAX_SIZE_GET_HEADER).await?;

debug!(relay_id = relay.id.as_ref(), req_id = req_config.request_id, "read relay response");

if !code.is_success() {
return Err(PbsError::RelayResponse {
error_msg: String::from_utf8_lossy(&response_bytes).into_owned(),
@@ -333,6 +350,7 @@ async fn send_one_get_header(
if code == StatusCode::NO_CONTENT {
debug!(
relay_id = relay.id.as_ref(),
req_id = req_config.request_id,
?code,
latency = ?request_latency,
response = ?response_bytes,
@@ -353,6 +371,7 @@ async fn send_one_get_header(

debug!(
relay_id = relay.id.as_ref(),
req_id = req_config.request_id,
latency = ?request_latency,
version = get_header_response.version(),
value_eth = format_ether(get_header_response.value()),