Skip to content

Commit e08f55c

Browse files
authored
feature: adds Flashbots submit task (#144)
# feature: adds Flashbots submit task - adds a `FlashbotsTask` for submitting rollup block transactions to the Flashbots API - imports the Flashbots crate from `bin-base`​
1 parent 05ecd2e commit e08f55c

File tree

22 files changed

+911
-787
lines changed

22 files changed

+911
-787
lines changed

Cargo.lock

Lines changed: 559 additions & 609 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,8 @@ name = "builder"
1717
name = "zenith-builder-example"
1818
path = "bin/builder.rs"
1919

20-
[features]
21-
integration = []
22-
2320
[dependencies]
24-
init4-bin-base = { version = "0.12.3", features = ["perms", "aws"] }
21+
init4-bin-base = { version = "0.12.3", features = ["perms", "aws", "flashbots"] }
2522

2623
signet-constants = { version = "0.10.1" }
2724
signet-sim = { version = "0.10.1" }
@@ -36,22 +33,22 @@ alloy = { version = "1.0.25", features = [
3633
"json-rpc",
3734
"signer-aws",
3835
"rpc-types-mev",
36+
"rpc-client",
37+
"rpc-types-debug",
3938
"rlp",
4039
"node-bindings",
4140
"serde",
42-
"getrandom"
41+
"getrandom",
4342
] }
4443

45-
serde = { version = "1.0.197", features = ["derive"] }
46-
4744
axum = "0.7.5"
45+
chrono = "0.4.40"
4846
eyre = "0.6.12"
4947
openssl = { version = "0.10", features = ["vendored"] }
5048
reqwest = { version = "0.12.22", features = ["blocking", "json"] }
51-
serde_json = "1.0"
49+
serde = { version = "1.0.197", features = ["derive"] }
50+
serde_json = "1.0.145"
51+
tracing = "0.1.41"
5252
tokio = { version = "1.36.0", features = ["full", "macros", "rt-multi-thread"] }
53-
chrono = "0.4.40"
54-
5553
tokio-stream = "0.1.17"
5654
url = "2.5.4"
57-
tracing = "0.1.41"

bin/builder.rs

Lines changed: 13 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,12 @@
11
use builder::{
22
config::BuilderConfig,
33
service::serve_builder,
4-
tasks::{
5-
block::sim::Simulator, cache::CacheTasks, env::EnvTask, metrics::MetricsTask,
6-
submit::BuilderHelperTask,
7-
},
4+
tasks::{block::sim::Simulator, cache::CacheTasks, env::EnvTask, metrics::MetricsTask},
85
};
96
use init4_bin_base::{
107
deps::tracing::{info, info_span},
118
utils::from_env::FromEnv,
129
};
13-
use signet_types::constants::SignetSystemConstants;
1410
use tokio::select;
1511

1612
// Note: Must be set to `multi_thread` to support async tasks.
@@ -22,49 +18,33 @@ async fn main() -> eyre::Result<()> {
2218

2319
// Pull the configuration from the environment
2420
let config = BuilderConfig::from_env()?.clone();
25-
let constants = SignetSystemConstants::pecorino();
2621

27-
// We connect the WS greedily, so we can fail early if the connection is
28-
// invalid.
29-
let ru_provider = config.connect_ru_provider().await?;
22+
// We connect the providers greedily, so we can fail early if the
23+
// RU WS connection is invalid.
24+
let (ru_provider, host_provider) =
25+
tokio::try_join!(config.connect_ru_provider(), config.connect_host_provider(),)?;
3026

3127
// Spawn the EnvTask
32-
let env_task = EnvTask::new(
33-
config.clone(),
34-
constants.clone(),
35-
config.connect_host_provider().await?,
36-
ru_provider.clone(),
37-
);
28+
let env_task = EnvTask::new(config.clone(), host_provider.clone(), ru_provider.clone());
3829
let (block_env, env_jh) = env_task.spawn();
3930

4031
// Spawn the cache system
4132
let cache_tasks = CacheTasks::new(config.clone(), block_env.clone());
4233
let cache_system = cache_tasks.spawn();
4334

44-
// Prep providers and contracts
45-
let (host_provider, quincey) =
46-
tokio::try_join!(config.connect_host_provider(), config.connect_quincey())?;
47-
let zenith = config.connect_zenith(host_provider.clone());
48-
4935
// Set up the metrics task
50-
let metrics = MetricsTask { host_provider: host_provider.clone() };
36+
let metrics = MetricsTask { host_provider };
5137
let (tx_channel, metrics_jh) = metrics.spawn();
5238

53-
// Make a Tx submission task
54-
let submit = BuilderHelperTask {
55-
zenith,
56-
quincey,
57-
config: config.clone(),
58-
constants: constants.clone(),
59-
outbound_tx_channel: tx_channel,
60-
};
61-
62-
// Set up tx submission
63-
let (submit_channel, submit_jh) = submit.spawn();
39+
// Set up the submit task. This will be either a Flashbots task or a
40+
// BuilderHelper task depending on whether a Flashbots endpoint is
41+
// configured.
42+
let (submit_channel, submit_jh) = config.spawn_submit_task(tx_channel).await?;
6443

6544
// Set up the simulator
6645
let sim = Simulator::new(&config, ru_provider.clone(), block_env);
67-
let build_jh = sim.spawn_simulator_task(constants, cache_system.sim_cache, submit_channel);
46+
let build_jh =
47+
sim.spawn_simulator_task(config.constants.clone(), cache_system.sim_cache, submit_channel);
6848

6949
// Start the healthcheck server
7050
let server = serve_builder(([0, 0, 0, 0], config.builder_port));

src/config.rs

Lines changed: 54 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
1-
use crate::{quincey::Quincey, tasks::block::cfg::SignetCfgEnv};
1+
use crate::{
2+
quincey::Quincey,
3+
tasks::{
4+
block::{cfg::SignetCfgEnv, sim::SimResult},
5+
submit::{BuilderHelperTask, FlashbotsTask},
6+
},
7+
};
28
use alloy::{
39
network::{Ethereum, EthereumWallet},
4-
primitives::Address,
10+
primitives::{Address, TxHash},
511
providers::{
612
Identity, ProviderBuilder, RootProvider,
713
fillers::{
@@ -15,14 +21,16 @@ use init4_bin_base::{
1521
perms::{Authenticator, OAuthConfig, SharedToken},
1622
utils::{
1723
calc::SlotCalculator,
24+
flashbots::Flashbots,
1825
from_env::FromEnv,
1926
provider::{ProviderConfig, PubSubConfig},
2027
signer::{LocalOrAws, SignerError},
2128
},
2229
};
30+
use signet_constants::SignetSystemConstants;
2331
use signet_zenith::Zenith;
2432
use std::borrow::Cow;
25-
use tokio::join;
33+
use tokio::{join, sync::mpsc::UnboundedSender, task::JoinHandle};
2634

2735
/// Type alias for the provider used to simulate against rollup state.
2836
pub type RuProvider = RootProvider<Ethereum>;
@@ -89,13 +97,8 @@ pub struct BuilderConfig {
8997
)]
9098
pub tx_broadcast_urls: Vec<Cow<'static, str>>,
9199

92-
/// Flashbots endpoint for privately submitting rollup blocks.
93-
#[from_env(
94-
var = "FLASHBOTS_ENDPOINT",
95-
desc = "Flashbots endpoint for privately submitting rollup blocks",
96-
optional
97-
)]
98-
pub flashbots_endpoint: Option<url::Url>,
100+
/// Flashbots configuration for privately submitting rollup blocks.
101+
pub flashbots: init4_bin_base::utils::flashbots::FlashbotsConfig,
99102

100103
/// Address of the Zenith contract on Host.
101104
#[from_env(var = "ZENITH_ADDRESS", desc = "address of the Zenith contract on Host")]
@@ -163,12 +166,21 @@ pub struct BuilderConfig {
163166

164167
/// The slot calculator for the builder.
165168
pub slot_calculator: SlotCalculator,
169+
170+
/// The signet system constants.
171+
pub constants: SignetSystemConstants,
166172
}
167173

168174
impl BuilderConfig {
169175
/// Connect to the Builder signer.
170176
pub async fn connect_builder_signer(&self) -> Result<LocalOrAws, SignerError> {
171-
LocalOrAws::load(&self.builder_key, Some(self.host_chain_id)).await
177+
static ONCE: tokio::sync::OnceCell<LocalOrAws> = tokio::sync::OnceCell::const_new();
178+
179+
ONCE.get_or_try_init(|| async {
180+
LocalOrAws::load(&self.builder_key, Some(self.host_chain_id)).await
181+
})
182+
.await
183+
.cloned()
172184
}
173185

174186
/// Connect to the Sequencer signer.
@@ -275,4 +287,35 @@ impl BuilderConfig {
275287
.unwrap_or(DEFAULT_CONCURRENCY_LIMIT)
276288
})
277289
}
290+
291+
/// Connect to a Flashbots provider.
292+
pub async fn flashbots_provider(&self) -> eyre::Result<Flashbots> {
293+
self.flashbots
294+
.build(self.connect_builder_signer().await?)
295+
.ok_or_else(|| eyre::eyre!("Flashbots is not configured"))
296+
}
297+
298+
/// Spawn a submit task, either Flashbots or BuilderHelper depending on
299+
/// configuration.
300+
pub async fn spawn_submit_task(
301+
&self,
302+
tx_channel: UnboundedSender<TxHash>,
303+
) -> eyre::Result<(UnboundedSender<SimResult>, JoinHandle<()>)> {
304+
// If we have a flashbots endpoint, use that
305+
if self.flashbots.flashbots_endpoint.is_some() {
306+
// Make a Flashbots submission task
307+
let submit = FlashbotsTask::new(self.clone(), tx_channel).await?;
308+
309+
// Set up flashbots submission
310+
let (submit_channel, submit_jh) = submit.spawn();
311+
return Ok((submit_channel, submit_jh));
312+
}
313+
314+
// Make a Tx submission task
315+
let submit = BuilderHelperTask::new(self.clone(), tx_channel).await?;
316+
317+
// Set up tx submission
318+
let (submit_channel, submit_jh) = submit.spawn();
319+
Ok((submit_channel, submit_jh))
320+
}
278321
}

src/macros.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,33 @@
22
macro_rules! span_scoped {
33
($span:expr, $level:ident!($($arg:tt)*)) => {
44
$span.in_scope(|| {
5-
$level!($($arg)*);
6-
});
5+
::tracing::$level!($($arg)*);
6+
})
7+
};
8+
}
9+
10+
/// Helper macro to log a debug event within a span that is not currently
11+
/// entered.
12+
macro_rules! span_debug {
13+
($span:expr, $($arg:tt)*) => {
14+
span_scoped!($span, debug!($($arg)*))
15+
};
16+
}
17+
18+
/// Helper macro to log an info event within a span that is not currently
19+
/// entered.
20+
macro_rules! span_info {
21+
($span:expr, $($arg:tt)*) => {
22+
span_scoped!($span, info!($($arg)*))
23+
};
24+
25+
}
26+
27+
/// Helper macro to log a warning event within a span that is not currently
28+
/// entered.
29+
macro_rules! span_error {
30+
($span:expr, $($arg:tt)*) => {
31+
span_scoped!($span, error!($($arg)*))
732
};
833
}
934

src/quincey.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
use alloy::signers::Signer;
22
use eyre::bail;
3-
use init4_bin_base::{
4-
deps::tracing::{debug, info, instrument, trace},
5-
perms::SharedToken,
6-
utils::signer::LocalOrAws,
7-
};
3+
use init4_bin_base::{perms::SharedToken, utils::signer::LocalOrAws};
84
use reqwest::Client;
95
use signet_types::{SignRequest, SignResponse};
6+
use tracing::{debug, info, instrument, trace};
107

118
/// A quincey client for making requests to the Quincey API.
129
#[derive(Debug, Clone)]

src/service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ use axum::{
44
response::{IntoResponse, Response},
55
routing::get,
66
};
7-
use init4_bin_base::deps::tracing::error;
87
use std::net::SocketAddr;
8+
use tracing::error;
99

1010
/// Return a 404 Not Found response
1111
pub async fn return_404() -> Response {

src/tasks/block/sim.rs

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,7 @@ use crate::{
66
tasks::env::SimEnv,
77
};
88
use alloy::{eips::BlockId, network::Ethereum};
9-
use init4_bin_base::{
10-
deps::tracing::{debug, error},
11-
utils::calc::SlotCalculator,
12-
};
9+
use init4_bin_base::utils::calc::SlotCalculator;
1310
use signet_sim::{BlockBuild, BuiltBlock, SimCache};
1411
use signet_types::constants::SignetSystemConstants;
1512
use std::time::{Duration, Instant};
@@ -20,7 +17,7 @@ use tokio::{
2017
},
2118
task::JoinHandle,
2219
};
23-
use tracing::{Instrument, info, instrument};
20+
use tracing::{Instrument, Span, instrument};
2421
use trevm::revm::{
2522
context::BlockEnv,
2623
database::{AlloyDB, WrapDatabaseAsync},
@@ -64,12 +61,12 @@ impl SimResult {
6461

6562
/// Returns a reference to the tracing span associated with this simulation
6663
/// result.
67-
pub const fn span(&self) -> &tracing::Span {
64+
pub const fn span(&self) -> &Span {
6865
self.sim_env.span()
6966
}
7067

7168
/// Clones the span for use in other tasks.
72-
pub fn clone_span(&self) -> tracing::Span {
69+
pub fn clone_span(&self) -> Span {
7370
self.sim_env.clone_span()
7471
}
7572
}
@@ -116,7 +113,6 @@ impl Simulator {
116113
///
117114
/// A `Result` containing the built block or an error.
118115
#[instrument(skip_all, fields(
119-
block_number = block_env.number.to::<u64>(),
120116
tx_count = sim_items.len(),
121117
millis_to_deadline = finish_by.duration_since(Instant::now()).as_millis()
122118
))]
@@ -144,7 +140,7 @@ impl Simulator {
144140
);
145141

146142
let built_block = block_build.build().in_current_span().await;
147-
debug!(
143+
tracing::debug!(
148144
tx_count = built_block.tx_count(),
149145
block_number = built_block.block_number(),
150146
"block simulation completed",
@@ -171,7 +167,7 @@ impl Simulator {
171167
cache: SimCache,
172168
submit_sender: mpsc::UnboundedSender<SimResult>,
173169
) -> JoinHandle<()> {
174-
debug!("starting simulator task");
170+
tracing::debug!("starting simulator task");
175171

176172
tokio::spawn(async move { self.run_simulator(constants, cache, submit_sender).await })
177173
}
@@ -201,16 +197,13 @@ impl Simulator {
201197
loop {
202198
// Wait for the block environment to be set
203199
if self.sim_env.changed().await.is_err() {
204-
error!("block_env channel closed - shutting down simulator task");
200+
tracing::error!("block_env channel closed - shutting down simulator task");
205201
return;
206202
}
207203
let Some(sim_env) = self.sim_env.borrow_and_update().clone() else { return };
208204

209205
let span = sim_env.span();
210-
211-
span.in_scope(|| {
212-
info!("new block environment received");
213-
});
206+
span_info!(span, "new block environment received");
214207

215208
// Calculate the deadline for this block simulation.
216209
// NB: This must happen _after_ taking a reference to the sim cache,
@@ -222,13 +215,13 @@ impl Simulator {
222215
.handle_build(constants.clone(), sim_cache, finish_by, sim_env.block_env.clone())
223216
.instrument(span.clone())
224217
.await
225-
.inspect_err(|err| span.in_scope(|| error!(%err, "error during block build")))
218+
.inspect_err(|err| span_error!(span, %err, "error during block build"))
226219
else {
227220
continue;
228221
};
229222

230223
let _guard = span.clone().entered();
231-
debug!(block = ?block.block_number(), tx_count = block.transactions().len(), "built simulated block");
224+
span_debug!(span, tx_count = block.transactions().len(), "built simulated block");
232225
let _ = submit_sender.send(SimResult { block, sim_env });
233226
}
234227
}

0 commit comments

Comments
 (0)