1
1
use std:: time:: { Duration , Instant } ;
2
2
3
- use alloy:: rpc :: types :: beacon :: relay :: ValidatorRegistration ;
3
+ use alloy:: primitives :: Bytes ;
4
4
use axum:: http:: { HeaderMap , HeaderValue } ;
5
5
use cb_common:: {
6
6
pbs:: { HEADER_START_TIME_UNIX_MS , RelayClient , error:: PbsError } ,
7
7
utils:: { get_user_agent_with_version, read_chunked_body_with_max, utcnow_ms} ,
8
8
} ;
9
9
use eyre:: bail;
10
10
use futures:: future:: { join_all, select_ok} ;
11
- use reqwest:: header:: USER_AGENT ;
11
+ use reqwest:: header:: { CONTENT_TYPE , USER_AGENT } ;
12
12
use tracing:: { Instrument , debug, error} ;
13
13
use url:: Url ;
14
14
@@ -21,7 +21,7 @@ use crate::{
21
21
/// Implements https://ethereum.github.io/builder-specs/#/Builder/registerValidator
22
22
/// Returns 200 if at least one relay returns 200, else 503
23
23
pub async fn register_validator < S : BuilderApiState > (
24
- registrations : Vec < ValidatorRegistration > ,
24
+ registrations : Vec < serde_json :: Value > ,
25
25
req_headers : HeaderMap ,
26
26
state : PbsState < S > ,
27
27
) -> eyre:: Result < ( ) > {
@@ -31,27 +31,29 @@ pub async fn register_validator<S: BuilderApiState>(
31
31
. insert ( HEADER_START_TIME_UNIX_MS , HeaderValue :: from_str ( & utcnow_ms ( ) . to_string ( ) ) ?) ;
32
32
send_headers. insert ( USER_AGENT , get_user_agent_with_version ( & req_headers) ?) ;
33
33
34
- let relays = state. all_relays ( ) . to_vec ( ) ;
35
- let mut handles = Vec :: with_capacity ( relays. len ( ) ) ;
36
- for relay in relays. clone ( ) {
37
- if let Some ( batch_size) = relay. config . validator_registration_batch_size {
38
- for batch in registrations. chunks ( batch_size) {
39
- handles. push ( tokio:: spawn (
40
- send_register_validator_with_timeout (
41
- batch. to_vec ( ) ,
42
- relay. clone ( ) ,
43
- send_headers. clone ( ) ,
44
- state. pbs_config ( ) . timeout_register_validator_ms ,
45
- state. pbs_config ( ) . register_validator_retry_limit ,
46
- )
47
- . in_current_span ( ) ,
48
- ) ) ;
49
- }
34
+ // prepare the body in advance, ugly dyn
35
+ let bodies: Box < dyn Iterator < Item = ( usize , Bytes ) > > =
36
+ if let Some ( batch_size) = state. config . pbs_config . validator_registration_batch_size {
37
+ Box :: new ( registrations. chunks ( batch_size) . map ( |batch| {
38
+ // SAFETY: unwrap is ok because we're serializing a &[serde_json::Value]
39
+ let body = serde_json:: to_vec ( batch) . unwrap ( ) ;
40
+ ( batch. len ( ) , Bytes :: from ( body) )
41
+ } ) )
50
42
} else {
43
+ let body = serde_json:: to_vec ( & registrations) . unwrap ( ) ;
44
+ Box :: new ( std:: iter:: once ( ( registrations. len ( ) , Bytes :: from ( body) ) ) )
45
+ } ;
46
+ send_headers. insert ( CONTENT_TYPE , HeaderValue :: from_static ( "application/json" ) ) ;
47
+
48
+ let mut handles = Vec :: with_capacity ( state. all_relays ( ) . len ( ) ) ;
49
+
50
+ for ( n_regs, body) in bodies {
51
+ for relay in state. all_relays ( ) . iter ( ) . cloned ( ) {
51
52
handles. push ( tokio:: spawn (
52
53
send_register_validator_with_timeout (
53
- registrations. clone ( ) ,
54
- relay. clone ( ) ,
54
+ n_regs,
55
+ body. clone ( ) ,
56
+ relay,
55
57
send_headers. clone ( ) ,
56
58
state. pbs_config ( ) . timeout_register_validator_ms ,
57
59
state. pbs_config ( ) . register_validator_retry_limit ,
@@ -82,7 +84,8 @@ pub async fn register_validator<S: BuilderApiState>(
82
84
/// Register validator to relay, retry connection errors until the
83
85
/// given timeout has passed
84
86
async fn send_register_validator_with_timeout (
85
- registrations : Vec < ValidatorRegistration > ,
87
+ n_regs : usize ,
88
+ body : Bytes ,
86
89
relay : RelayClient ,
87
90
headers : HeaderMap ,
88
91
timeout_ms : u64 ,
@@ -97,7 +100,8 @@ async fn send_register_validator_with_timeout(
97
100
let start_request = Instant :: now ( ) ;
98
101
match send_register_validator (
99
102
url. clone ( ) ,
100
- & registrations,
103
+ n_regs,
104
+ body. clone ( ) ,
101
105
& relay,
102
106
headers. clone ( ) ,
103
107
remaining_timeout_ms,
@@ -134,7 +138,8 @@ async fn send_register_validator_with_timeout(
134
138
135
139
async fn send_register_validator (
136
140
url : Url ,
137
- registrations : & [ ValidatorRegistration ] ,
141
+ n_regs : usize ,
142
+ body : Bytes ,
138
143
relay : & RelayClient ,
139
144
headers : HeaderMap ,
140
145
timeout_ms : u64 ,
@@ -146,7 +151,7 @@ async fn send_register_validator(
146
151
. post ( url)
147
152
. timeout ( Duration :: from_millis ( timeout_ms) )
148
153
. headers ( headers)
149
- . json ( & registrations )
154
+ . body ( body . 0 )
150
155
. send ( )
151
156
. await
152
157
{
@@ -189,7 +194,7 @@ async fn send_register_validator(
189
194
retry,
190
195
?code,
191
196
latency = ?request_latency,
192
- num_registrations = registrations . len ( ) ,
197
+ num_registrations = n_regs ,
193
198
"registration successful"
194
199
) ;
195
200
0 commit comments