11use std:: {
22 str:: FromStr ,
3+ sync:: Arc ,
34 time:: { Duration , Instant } ,
45} ;
56
@@ -14,7 +15,7 @@ use cb_common::{
1415 } ,
1516 utils:: { get_user_agent_with_version, read_chunked_body_with_max, utcnow_ms} ,
1617} ;
17- use futures:: future:: select_ok;
18+ use futures:: { FutureExt , future:: select_ok} ;
1819use reqwest:: header:: USER_AGENT ;
1920use tracing:: { debug, warn} ;
2021use url:: Url ;
@@ -31,10 +32,10 @@ use crate::{
3132/// https://ethereum.github.io/builder-specs/#/Builder/submitBlindedBlockV2. Use `api_version` to
3233/// distinguish between the two.
3334pub async fn submit_block < S : BuilderApiState > (
34- signed_blinded_block : SignedBlindedBeaconBlock ,
35+ signed_blinded_block : Arc < SignedBlindedBeaconBlock > ,
3536 req_headers : HeaderMap ,
3637 state : PbsState < S > ,
37- api_version : & BuilderApiVersion ,
38+ api_version : BuilderApiVersion ,
3839) -> eyre:: Result < Option < SubmitBlindedBlockResponse > > {
3940 debug ! ( ?req_headers, "received headers" ) ;
4041
@@ -58,17 +59,22 @@ pub async fn submit_block<S: BuilderApiState>(
5859 send_headers. insert ( USER_AGENT , get_user_agent_with_version ( & req_headers) ?) ;
5960 send_headers. insert ( HEADER_CONSENSUS_VERSION , consensus_version) ;
6061
61- let relays = state. all_relays ( ) ;
62- let mut handles = Vec :: with_capacity ( relays. len ( ) ) ;
63- for relay in relays. iter ( ) {
64- handles. push ( Box :: pin ( submit_block_with_timeout (
65- & signed_blinded_block,
66- relay,
67- send_headers. clone ( ) ,
68- state. pbs_config ( ) . timeout_get_payload_ms ,
69- api_version,
70- fork_name,
71- ) ) ) ;
62+ let mut handles = Vec :: with_capacity ( state. all_relays ( ) . len ( ) ) ;
63+ for relay in state. all_relays ( ) . iter ( ) . cloned ( ) {
64+ handles. push (
65+ tokio:: spawn ( submit_block_with_timeout (
66+ signed_blinded_block. clone ( ) ,
67+ relay,
68+ send_headers. clone ( ) ,
69+ state. pbs_config ( ) . timeout_get_payload_ms ,
70+ api_version,
71+ fork_name,
72+ ) )
73+ . map ( |join_result| match join_result {
74+ Ok ( res) => res,
75+ Err ( err) => Err ( PbsError :: TokioJoinError ( err) ) ,
76+ } ) ,
77+ ) ;
7278 }
7379
7480 let results = select_ok ( handles) . await ;
@@ -81,14 +87,14 @@ pub async fn submit_block<S: BuilderApiState>(
8187/// Submit blinded block to relay, retry connection errors until the
8288/// given timeout has passed
8389async fn submit_block_with_timeout (
84- signed_blinded_block : & SignedBlindedBeaconBlock ,
85- relay : & RelayClient ,
90+ signed_blinded_block : Arc < SignedBlindedBeaconBlock > ,
91+ relay : RelayClient ,
8692 headers : HeaderMap ,
8793 timeout_ms : u64 ,
88- api_version : & BuilderApiVersion ,
94+ api_version : BuilderApiVersion ,
8995 fork_name : ForkName ,
9096) -> Result < Option < SubmitBlindedBlockResponse > , PbsError > {
91- let mut url = relay. submit_block_url ( * api_version) ?;
97+ let mut url = relay. submit_block_url ( api_version) ?;
9298 let mut remaining_timeout_ms = timeout_ms;
9399 let mut retry = 0 ;
94100 let mut backoff = Duration :: from_millis ( 250 ) ;
@@ -97,12 +103,12 @@ async fn submit_block_with_timeout(
97103 let start_request = Instant :: now ( ) ;
98104 match send_submit_block (
99105 url. clone ( ) ,
100- signed_blinded_block,
101- relay,
106+ & signed_blinded_block,
107+ & relay,
102108 headers. clone ( ) ,
103109 remaining_timeout_ms,
104110 retry,
105- api_version,
111+ & api_version,
106112 fork_name,
107113 )
108114 . await
0 commit comments