Skip to content

Commit 85d8cc4

Browse files
authored
Push tbv via block processor (#17)
* Best true value pusher generic. Moved redis code to redis_backend * Working! * tests
1 parent 3342fd2 commit 85d8cc4

11 files changed

+334
-89
lines changed

src/blocks_processor.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,6 @@ impl<HttpClientType: ClientT> BlocksProcessorClient<HttpClientType> {
198198
Ok(())
199199
}
200200

201-
/// T is parametric just because it too BIG to type
202201
fn handle_rpc_error(err: &jsonrpsee::core::Error, request: &ConsumeBuiltBlockRequest) {
203202
match err {
204203
jsonrpsee::core::Error::Call(error_object) => {

src/flashbots_config.rs

Lines changed: 66 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
use alloy_signer_local::PrivateKeySigner;
55
use eyre::Context;
66
use http::StatusCode;
7-
use jsonrpsee::http_client::HttpClientBuilder;
87
use jsonrpsee::RpcModule;
98
use rbuilder::building::builders::merging_builder::merging_build_backtest;
109
use rbuilder::building::builders::UnfinishedBlockBuildingSinkFactory;
@@ -36,7 +35,6 @@ use reth_db::DatabaseEnv;
3635
use serde::Deserialize;
3736
use serde_with::serde_as;
3837
use tokio_util::sync::CancellationToken;
39-
use tower::ServiceBuilder;
4038
use tracing::{error, warn};
4139
use url::Url;
4240

@@ -48,7 +46,6 @@ use crate::blocks_processor::{
4846
SIGNED_BLOCK_CONSUME_BUILT_BLOCK_METHOD,
4947
};
5048
use crate::build_info::rbuilder_version;
51-
use crate::flashbots_signer::FlashbotsSignerLayer;
5249
use crate::true_block_value_push::unfinished_block_building_sink_factory_wrapper::UnfinishedBlockBuildingSinkFactoryWrapper;
5350

5451
use clickhouse::Client;
@@ -103,12 +100,16 @@ pub struct FlashbotsConfig {
103100
/// selected builder configurations
104101
pub builders: Vec<BuilderConfig>,
105102

106-
/// If this is Some then signed mode is used for blocks_processor and tbv_push is done via blocks_processor_url (signed block-processor also handles flashbots_reportBestTrueValue)
103+
/// If this is Some then blocks_processor_url MUST be some and:
104+
/// - signed mode is used for blocks_processor.
105+
/// - tbv_push is done via blocks_processor_url (signed block-processor also handles flashbots_reportBestTrueValue).
107106
pub key_registration_url: Option<String>,
108107

109108
pub blocks_processor_url: Option<String>,
110109

111-
/// For production: Some <=> key_registration_url is Some since it's used by smart-multiplexing.
110+
/// Cfg to push tbv to redis.
111+
/// For production we always need some tbv push (since it's used by smart-multiplexing.) so:
112+
/// !Some(key_registration_url) => Some(tbv_push_redis)
112113
tbv_push_redis: Option<TBVPushRedisConfig>,
113114
}
114115

@@ -251,30 +252,31 @@ impl FlashbotsConfig {
251252
&self,
252253
block_processor_key: Option<PrivateKeySigner>,
253254
) -> eyre::Result<Box<dyn BidObserver + Send + Sync>> {
254-
let bid_observer: Box<dyn BidObserver + Send + Sync> =
255-
if let Some(url) = &self.blocks_processor_url {
256-
if let Some(block_processor_key) = block_processor_key {
257-
let signing_middleware = FlashbotsSignerLayer::new(block_processor_key);
258-
let service_builder = ServiceBuilder::new()
259-
// map signer errors to http errors
260-
.map_err(jsonrpsee::http_client::transport::Error::Http)
261-
.layer(signing_middleware);
262-
let client = HttpClientBuilder::default()
263-
.set_middleware(service_builder)
264-
.build(url)?;
265-
let block_processor =
266-
BlocksProcessorClient::new(client, SIGNED_BLOCK_CONSUME_BUILT_BLOCK_METHOD);
267-
Box::new(BlocksProcessorClientBidObserver::new(block_processor))
268-
} else {
269-
let client = BlocksProcessorClient::try_from(url)?;
270-
Box::new(BlocksProcessorClientBidObserver::new(client))
271-
}
255+
let bid_observer: Box<dyn BidObserver + Send + Sync> = if let Some(url) =
256+
&self.blocks_processor_url
257+
{
258+
if let Some(block_processor_key) = block_processor_key {
259+
let client = crate::signed_http_client::create_client(url, block_processor_key)?;
260+
let block_processor =
261+
BlocksProcessorClient::new(client, SIGNED_BLOCK_CONSUME_BUILT_BLOCK_METHOD);
262+
Box::new(BlocksProcessorClientBidObserver::new(block_processor))
272263
} else {
273-
Box::new(NullBidObserver {})
274-
};
264+
let client = BlocksProcessorClient::try_from(url)?;
265+
Box::new(BlocksProcessorClientBidObserver::new(client))
266+
}
267+
} else {
268+
if block_processor_key.is_some() {
269+
return Self::bail_blocks_processor_url_not_set();
270+
}
271+
Box::new(NullBidObserver {})
272+
};
275273
Ok(bid_observer)
276274
}
277275

276+
fn bail_blocks_processor_url_not_set<T>() -> Result<T, eyre::Report> {
277+
eyre::bail!("blocks_processor_url should always be set if key_registration_url is set");
278+
}
279+
278280
/// Connects (UnfinishedBlockBuildingSinkFactoryWrapper->BlockSealingBidderFactory)->RelaySubmitSinkFactory
279281
/// RelaySubmitSinkFactory: submits final blocks to relays
280282
/// BlockSealingBidderFactory: performs sealing/bidding. Sends bids to the RelaySubmitSinkFactory
@@ -290,12 +292,15 @@ impl FlashbotsConfig {
290292
Arc<dyn BiddingServiceWinControl>,
291293
)> {
292294
let block_processor_key = if let Some(key_registration_url) = &self.key_registration_url {
295+
if self.blocks_processor_url.is_none() {
296+
return Self::bail_blocks_processor_url_not_set();
297+
}
293298
Some(self.register_key(key_registration_url).await?)
294299
} else {
295300
None
296301
};
297302
// RelaySubmitSinkFactory
298-
let bid_observer = self.create_block_processor_client(block_processor_key)?;
303+
let bid_observer = self.create_block_processor_client(block_processor_key.clone())?;
299304
let (sink_sealed_factory, relays) = self
300305
.l1_config
301306
.create_relays_sealed_sink_factory(self.base_config.chain_spec()?, bid_observer)?;
@@ -321,32 +326,56 @@ impl FlashbotsConfig {
321326
));
322327

323328
// UnfinishedBlockBuildingSinkFactoryWrapper
324-
let wrapped_sink_factory =
325-
self.wrap_with_tbv_redis_pusher(&cancellation_token, &bid_value_source, sink_factory)?;
329+
let wrapped_sink_factory = self.wrap_with_tbv_pusher(
330+
&cancellation_token,
331+
&bid_value_source,
332+
block_processor_key,
333+
sink_factory,
334+
)?;
326335

327336
Ok((wrapped_sink_factory, relays, bidding_service_win_control))
328337
}
329338

330-
fn wrap_with_tbv_redis_pusher(
339+
/// Wraps the factory with one that sends a to TBV stream to our infra.
340+
/// block_processor_key == Some -> We use signed block processor API.
341+
/// block_processor_key == None -> If Some(tbv_push_redis) -> We send directly to a redis channel.
342+
fn wrap_with_tbv_pusher(
331343
&self,
332344
cancellation_token: &CancellationToken,
333345
bid_value_source: &Arc<dyn BidValueSource + Send + Sync>,
346+
block_processor_key: Option<PrivateKeySigner>,
334347
factory: Box<dyn UnfinishedBlockBuildingSinkFactory>,
335348
) -> eyre::Result<Box<dyn UnfinishedBlockBuildingSinkFactory>> {
336-
if let Some(cfg) = &self.tbv_push_redis {
349+
if let Some(block_processor_key) = block_processor_key {
350+
if let Some(blocks_processor_url) = &self.blocks_processor_url {
351+
Ok(Box::new(
352+
UnfinishedBlockBuildingSinkFactoryWrapper::new_block_processor(
353+
factory,
354+
bid_value_source.clone(),
355+
blocks_processor_url.clone(),
356+
block_processor_key,
357+
cancellation_token.clone(),
358+
)?,
359+
))
360+
} else {
361+
Self::bail_blocks_processor_url_not_set()
362+
}
363+
} else if let Some(cfg) = &self.tbv_push_redis {
337364
let tbv_push_redis_url_value = cfg
338365
.url
339366
.as_ref()
340367
.ok_or(eyre::Report::msg("Missing tbv_push_redis_url"))?
341368
.value()
342369
.context("tbv_push_redis_url")?;
343-
Ok(Box::new(UnfinishedBlockBuildingSinkFactoryWrapper::new(
344-
factory,
345-
bid_value_source.clone(),
346-
tbv_push_redis_url_value,
347-
cfg.channel.clone(),
348-
cancellation_token.clone(),
349-
)?))
370+
Ok(Box::new(
371+
UnfinishedBlockBuildingSinkFactoryWrapper::new_redis(
372+
factory,
373+
bid_value_source.clone(),
374+
tbv_push_redis_url_value,
375+
cfg.channel.clone(),
376+
cancellation_token.clone(),
377+
)?,
378+
))
350379
} else {
351380
Ok(factory)
352381
}

src/flashbots_signer.rs

Lines changed: 57 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -109,27 +109,27 @@ where
109109
"{:?}:0x{}",
110110
signer.address(),
111111
hex::encode(signature.as_bytes())
112-
))
113-
.expect("Header contains invalid characters");
112+
))?;
114113
parts.headers.insert(FLASHBOTS_HEADER.clone(), header_val);
115114

116115
let request = Request::from_parts(parts, Body::from(body_bytes.clone()));
117116
inner.call(request).await.map_err(Into::into)
118117
})
119118
}
120119
}
121-
/*
120+
122121
#[cfg(test)]
123122
mod tests {
124123
use super::*;
124+
use alloy_signer_local::PrivateKeySigner;
125125
use http::Response;
126126
use hyper::Body;
127127
use std::convert::Infallible;
128128
use tower::{service_fn, ServiceExt};
129129

130130
#[tokio::test]
131131
async fn test_signature() {
132-
let fb_signer = LocalWallet::new(&mut thread_rng());
132+
let fb_signer = PrivateKeySigner::random();
133133

134134
// mock service that returns the request headers
135135
let svc = FlashbotsSigner {
@@ -164,19 +164,18 @@ mod tests {
164164

165165
let signer_address = format!("{:?}", fb_signer.address());
166166
let expected_signature = fb_signer
167-
.sign_message(format!("0x{:x}", H256::from(keccak256(bytes.clone()))))
167+
.sign_message(format!("{:?}", keccak256(bytes.clone())).as_bytes())
168168
.await
169-
.unwrap()
170-
.to_string();
171-
169+
.unwrap();
170+
let expected_signature = hex::encode(expected_signature.as_bytes());
172171
// verify that the header contains expected address and signature
173172
assert_eq!(header_address, signer_address);
174173
assert_eq!(header_signature, expected_signature);
175174
}
176175

177176
#[tokio::test]
178177
async fn test_skips_non_json() {
179-
let fb_signer = LocalWallet::new(&mut thread_rng());
178+
let fb_signer = PrivateKeySigner::random();
180179

181180
// mock service that returns the request headers
182181
let svc = FlashbotsSigner {
@@ -210,7 +209,7 @@ mod tests {
210209

211210
#[tokio::test]
212211
async fn test_returns_error_when_not_post() {
213-
let fb_signer = LocalWallet::new(&mut thread_rng());
212+
let fb_signer = PrivateKeySigner::random();
214213

215214
// mock service that returns the request headers
216215
let svc = FlashbotsSigner {
@@ -240,5 +239,52 @@ mod tests {
240239
// should be an error
241240
assert!(res.is_err());
242241
}
242+
243+
/// Uses a static private key and compares the signature generated by this package to the signature
244+
/// generated by the `cast` CLI.
245+
/// Test copied from https://github.com/flashbots/go-utils/blob/main/signature/signature_test.go#L146 501d395be6a9802494ef1ef25a755acaa4448c17 (TestSignatureCreateCompareToCastAndEthers)
246+
#[tokio::test]
247+
async fn test_signature_cast() {
248+
let fb_signer: PrivateKeySigner =
249+
"fad9c8855b740a0b7ed4c221dbad0f33a83a49cad6b3fe8d5817ac83d38b6a19"
250+
.parse()
251+
.unwrap();
252+
// mock service that returns the request headers
253+
let svc = FlashbotsSigner {
254+
signer: fb_signer.clone(),
255+
inner: service_fn(|_req: Request<Body>| async {
256+
let (parts, _) = _req.into_parts();
257+
258+
let mut res = Response::builder();
259+
for (k, v) in parts.headers.iter() {
260+
res = res.header(k, v);
261+
}
262+
let res = res.body(Body::empty()).unwrap();
263+
Ok::<_, Infallible>(res)
264+
}),
265+
};
266+
267+
// build request
268+
let bytes = "Hello".as_bytes();
269+
let req = Request::builder()
270+
.method(http::Method::POST)
271+
.header(http::header::CONTENT_TYPE, "application/json")
272+
.body(Body::from(bytes))
273+
.unwrap();
274+
275+
let res = svc.oneshot(req).await.unwrap();
276+
277+
let header = res.headers().get("x-flashbots-signature").unwrap();
278+
let header = header.to_str().unwrap();
279+
let header = header.split(":0x").collect::<Vec<_>>();
280+
let header_address = header[0];
281+
let header_signature = header[1];
282+
// I generated the signature using the cast CLI:
283+
// cast wallet sign --private-key fad9c8855b740a0b7ed4c221dbad0f33a83a49cad6b3fe8d5817ac83d38b6a19 $(cast from-utf8 $(cast keccak Hello))
284+
let signer_address = "0x96216849c49358B10257cb55b28eA603c874b05E".to_lowercase();
285+
let expected_signature = "1446053488f02d460c012c84c4091cd5054d98c6cfca01b65f6c1a72773e80e60b8a4931aeee7ed18ce3adb45b2107e8c59e25556c1f871a8334e30e5bddbed21c";
286+
// verify that the header contains expected address and signature
287+
assert_eq!(header_address, signer_address);
288+
assert_eq!(header_signature, expected_signature);
289+
}
243290
}
244-
*/

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@ pub mod build_info;
66
pub mod flashbots_config;
77
pub mod flashbots_signer;
88
pub mod metrics;
9+
pub mod signed_http_client;
910
mod true_block_value_push;

src/signed_http_client.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
use crate::flashbots_signer::{FlashbotsSigner, FlashbotsSignerLayer};
2+
use alloy_signer_local::PrivateKeySigner;
3+
use jsonrpsee::http_client::transport::Error as JsonError;
4+
use jsonrpsee::http_client::HttpClientBuilder;
5+
use jsonrpsee::http_client::{transport::HttpBackend, HttpClient};
6+
use tower::ServiceBuilder;
7+
type MapErrorFn = fn(Box<dyn std::error::Error + Send + Sync>) -> JsonError;
8+
9+
const fn map_error(err: Box<dyn std::error::Error + Send + Sync>) -> JsonError {
10+
JsonError::Http(err)
11+
}
12+
13+
pub type SignedHttpClient =
14+
HttpClient<tower::util::MapErr<FlashbotsSigner<PrivateKeySigner, HttpBackend>, MapErrorFn>>;
15+
16+
pub fn create_client(
17+
url: &str,
18+
signer: PrivateKeySigner,
19+
) -> Result<SignedHttpClient, jsonrpsee::core::Error> {
20+
let signing_middleware = FlashbotsSignerLayer::new(signer);
21+
let service_builder = ServiceBuilder::new()
22+
// Coerce to function pointer and remove the + 'static added to the closure
23+
.map_err(map_error as MapErrorFn)
24+
.layer(signing_middleware);
25+
let client = HttpClientBuilder::default()
26+
.set_middleware(service_builder)
27+
.build(url)?;
28+
Ok(client)
29+
}

0 commit comments

Comments
 (0)