Skip to content

Commit 9038a1e

Browse files
committed
Test the simulator
1 parent 5b24f80 commit 9038a1e

File tree

16 files changed

+2091
-137
lines changed

16 files changed

+2091
-137
lines changed

crates/simulator/src/core.rs

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,20 @@
11
use crate::engine::SimulationEngine;
22
use crate::publisher::SimulationPublisher;
33
use crate::types::SimulationRequest;
4+
use async_trait::async_trait;
45
use eyre::Result;
5-
use reth_provider::StateProviderFactory;
66
use tracing::{error, info};
77

8-
/// Core bundle simulator that provides shared simulation logic
9-
/// Used by both mempool event simulators and ExEx event simulators
10-
pub struct BundleSimulator<E, P>
8+
/// Clean trait for bundle simulation without exposing Reth's complex types
9+
#[async_trait]
10+
pub trait BundleSimulator: Send + Sync {
11+
/// Simulate a bundle execution
12+
async fn simulate(&self, request: &SimulationRequest) -> Result<()>;
13+
}
14+
15+
/// Production bundle simulator for Reth
16+
/// This is the Reth-specific implementation
17+
pub struct RethBundleSimulator<E, P>
1118
where
1219
E: SimulationEngine,
1320
P: SimulationPublisher,
@@ -16,34 +23,28 @@ where
1623
publisher: P,
1724
}
1825

19-
impl<E, P> BundleSimulator<E, P>
26+
impl<E, P> RethBundleSimulator<E, P>
2027
where
2128
E: SimulationEngine,
2229
P: SimulationPublisher,
2330
{
2431
pub fn new(engine: E, publisher: P) -> Self {
25-
Self { engine, publisher }
32+
Self {
33+
engine,
34+
publisher,
35+
}
2636
}
37+
}
2738

28-
/// Process a simulation request by creating state provider from factory
29-
/// Convenience method that handles state provider creation
30-
pub async fn simulate<F>(
31-
&self,
32-
request: &SimulationRequest,
33-
state_provider_factory: &F,
34-
) -> Result<()>
35-
where
36-
F: StateProviderFactory,
37-
{
38-
// Get state provider for the block
39-
// FIXME: We probably want to get the state provider once per block rather than once per
40-
// bundle for each block.
41-
let state_provider = state_provider_factory
42-
.state_by_block_hash(request.block_hash)
43-
.map_err(|e| eyre::eyre!("Failed to get state provider: {}", e))?;
44-
45-
// Run the simulation
46-
match self.engine.simulate_bundle(request, &state_provider).await {
39+
#[async_trait]
40+
impl<E, P> BundleSimulator for RethBundleSimulator<E, P>
41+
where
42+
E: SimulationEngine + 'static,
43+
P: SimulationPublisher + 'static,
44+
{
45+
async fn simulate(&self, request: &SimulationRequest) -> Result<()> {
46+
// Run the simulation - engine will get its own state provider
47+
match self.engine.simulate_bundle(request).await {
4748
Ok(result) => {
4849
info!(
4950
bundle_id = %request.bundle_id,

crates/simulator/src/engine.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -101,13 +101,10 @@ where
101101
#[async_trait]
102102
pub trait SimulationEngine: Send + Sync {
103103
/// Simulate a bundle execution
104-
async fn simulate_bundle<S>(
104+
async fn simulate_bundle(
105105
&self,
106106
request: &SimulationRequest,
107-
state_provider: &S,
108-
) -> Result<SimulationResult>
109-
where
110-
S: StateProvider + Send + Sync;
107+
) -> Result<SimulationResult>;
111108
}
112109

113110
#[derive(Clone)]
@@ -137,14 +134,10 @@ where
137134
Node: FullNodeComponents,
138135
<Node as FullNodeComponents>::Evm: ConfigureEvm<NextBlockEnvCtx = OpNextBlockEnvAttributes>,
139136
{
140-
async fn simulate_bundle<S>(
137+
async fn simulate_bundle(
141138
&self,
142139
request: &SimulationRequest,
143-
_state_provider: &S,
144-
) -> Result<SimulationResult>
145-
where
146-
S: StateProvider + Send + Sync,
147-
{
140+
) -> Result<SimulationResult> {
148141
let start_time = Instant::now();
149142
let simulation_id = Uuid::new_v4();
150143

crates/simulator/src/lib.rs

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use std::sync::Arc;
1515
use tracing::{error, info};
1616

1717
pub use config::SimulatorNodeConfig;
18-
pub use core::BundleSimulator;
18+
pub use core::{BundleSimulator, RethBundleSimulator};
1919
pub use engine::{RethSimulationEngine, SimulationEngine};
2020
pub use listeners::{ExExEventListener, MempoolEventListener, MempoolListenerConfig};
2121
pub use publisher::{SimulationPublisher, TipsSimulationPublisher};
@@ -24,15 +24,14 @@ pub use worker_pool::SimulationWorkerPool;
2424

2525
// Type aliases for concrete implementations
2626
pub type TipsBundleSimulator<Node> =
27-
BundleSimulator<RethSimulationEngine<Node>, TipsSimulationPublisher>;
27+
RethBundleSimulator<RethSimulationEngine<Node>, TipsSimulationPublisher>;
2828
pub type TipsExExEventListener<Node> = ExExEventListener<
2929
Node,
30-
RethSimulationEngine<Node>,
31-
TipsSimulationPublisher,
30+
TipsBundleSimulator<Node>,
3231
tips_datastore::PostgresDatastore,
3332
>;
3433
pub type TipsMempoolEventListener<Node> =
35-
MempoolEventListener<Node, RethSimulationEngine<Node>, TipsSimulationPublisher>;
34+
MempoolEventListener<Node, TipsBundleSimulator<Node>>;
3635

3736
// Initialization functions
3837

@@ -43,7 +42,7 @@ where
4342
<Node as FullNodeComponents>::Evm: ConfigureEvm<NextBlockEnvCtx = OpNextBlockEnvAttributes>,
4443
{
4544
datastore: Arc<tips_datastore::PostgresDatastore>,
46-
simulator: BundleSimulator<RethSimulationEngine<Node>, TipsSimulationPublisher>,
45+
simulator: RethBundleSimulator<RethSimulationEngine<Node>, TipsSimulationPublisher>,
4746
}
4847

4948
/// Initialize common listener components (database, publisher, engine, core simulator)
@@ -81,7 +80,7 @@ where
8180
let engine = RethSimulationEngine::new(Arc::clone(&provider), evm_config);
8281
info!("Simulation engine initialized");
8382

84-
let simulator = BundleSimulator::new(engine, publisher);
83+
let simulator = RethBundleSimulator::new(engine, publisher);
8584
info!("Core bundle simulator initialized");
8685

8786
Ok(CommonListenerComponents {
@@ -119,7 +118,6 @@ where
119118

120119
let worker_pool = SimulationWorkerPool::new(
121120
Arc::new(common_components.simulator),
122-
Arc::clone(&provider),
123121
config.max_concurrent_simulations,
124122
);
125123

@@ -160,7 +158,6 @@ where
160158

161159
let worker_pool = SimulationWorkerPool::new(
162160
Arc::new(common_components.simulator),
163-
Arc::clone(&provider),
164161
max_concurrent_simulations,
165162
);
166163

@@ -183,9 +180,7 @@ where
183180
Node: FullNodeComponents,
184181
<Node as FullNodeComponents>::Evm: ConfigureEvm<NextBlockEnvCtx = OpNextBlockEnvAttributes>,
185182
{
186-
worker_pool: Arc<
187-
SimulationWorkerPool<RethSimulationEngine<Node>, TipsSimulationPublisher, Node::Provider>,
188-
>,
183+
worker_pool: Arc<SimulationWorkerPool<TipsBundleSimulator<Node>>>,
189184
exex_listener: TipsExExEventListener<Node>,
190185
mempool_listener: TipsMempoolEventListener<Node>,
191186
}
@@ -222,7 +217,6 @@ where
222217

223218
let shared_worker_pool = SimulationWorkerPool::new(
224219
Arc::new(common_components.simulator),
225-
Arc::clone(&provider),
226220
max_concurrent_simulations,
227221
);
228222

crates/simulator/src/listeners/exex.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
use crate::engine::SimulationEngine;
2-
use crate::publisher::SimulationPublisher;
1+
use crate::core::BundleSimulator;
32
use crate::types::SimulationRequest;
43
use crate::worker_pool::{SimulationTask, SimulationWorkerPool};
54

@@ -62,33 +61,31 @@ where
6261

6362
/// ExEx event listener that processes chain events and queues bundle simulations
6463
/// Processes chain events (commits, reorgs, reverts) and queues simulation tasks
65-
pub struct ExExEventListener<Node, E, P, D>
64+
pub struct ExExEventListener<Node, B, D>
6665
where
6766
Node: FullNodeComponents,
68-
E: SimulationEngine + Clone + 'static,
69-
P: SimulationPublisher + Clone + 'static,
67+
B: BundleSimulator + 'static,
7068
D: tips_datastore::BundleDatastore,
7169
{
7270
/// The execution extension context
7371
ctx: ExExContext<Node>,
7472
/// Datastore for fetching bundles from mempool
7573
datastore: Arc<D>,
7674
/// Shared simulation worker pool
77-
worker_pool: Arc<SimulationWorkerPool<E, P, Node::Provider>>,
75+
worker_pool: Arc<SimulationWorkerPool<B>>,
7876
}
7977

80-
impl<Node, E, P, D> ExExEventListener<Node, E, P, D>
78+
impl<Node, B, D> ExExEventListener<Node, B, D>
8179
where
8280
Node: FullNodeComponents,
83-
E: SimulationEngine + Clone + 'static,
84-
P: SimulationPublisher + Clone + 'static,
81+
B: BundleSimulator + 'static,
8582
D: tips_datastore::BundleDatastore + 'static,
8683
{
8784
/// Create a new ExEx event listener
8885
pub fn new(
8986
ctx: ExExContext<Node>,
9087
datastore: Arc<D>,
91-
worker_pool: Arc<SimulationWorkerPool<E, P, Node::Provider>>,
88+
worker_pool: Arc<SimulationWorkerPool<B>>,
9289
) -> Self {
9390
Self {
9491
ctx,
@@ -190,12 +187,12 @@ where
190187
}
191188

192189
/// Process a single block for potential bundle simulations
193-
async fn process_block<B>(
190+
async fn process_block<Block>(
194191
&mut self,
195-
block: (&B256, &reth_primitives::RecoveredBlock<B>),
192+
block: (&B256, &reth_primitives::RecoveredBlock<Block>),
196193
) -> Result<()>
197194
where
198-
B: reth_node_api::Block,
195+
Block: reth_node_api::Block,
199196
{
200197
let (block_hash, sealed_block) = block;
201198
let block_number = sealed_block.number();

crates/simulator/src/listeners/mempool.rs

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
use crate::engine::SimulationEngine;
2-
use crate::publisher::SimulationPublisher;
1+
use crate::core::BundleSimulator;
32
use crate::types::SimulationRequest;
43
use crate::worker_pool::{SimulationTask, SimulationWorkerPool};
54
use alloy_primitives::B256;
@@ -31,11 +30,10 @@ pub struct MempoolListenerConfig {
3130
}
3231

3332
/// Mempool event listener that processes events and queues simulations
34-
pub struct MempoolEventListener<Node, E, P>
33+
pub struct MempoolEventListener<Node, B>
3534
where
3635
Node: FullNodeComponents,
37-
E: SimulationEngine,
38-
P: SimulationPublisher,
36+
B: BundleSimulator + 'static,
3937
{
4038
/// State provider factory for getting current block info
4139
provider: Arc<Node::Provider>,
@@ -44,20 +42,19 @@ where
4442
/// Kafka topic name
4543
topic: String,
4644
/// Shared simulation worker pool
47-
worker_pool: Arc<SimulationWorkerPool<E, P, Node::Provider>>,
45+
worker_pool: Arc<SimulationWorkerPool<B>>,
4846
}
4947

50-
impl<Node, E, P> MempoolEventListener<Node, E, P>
48+
impl<Node, B> MempoolEventListener<Node, B>
5149
where
5250
Node: FullNodeComponents,
53-
E: SimulationEngine + Clone + 'static,
54-
P: SimulationPublisher + Clone + 'static,
51+
B: BundleSimulator + 'static,
5552
{
5653
/// Create a new mempool event listener
5754
pub fn new(
5855
provider: Arc<Node::Provider>,
5956
config: MempoolListenerConfig,
60-
worker_pool: Arc<SimulationWorkerPool<E, P, Node::Provider>>,
57+
worker_pool: Arc<SimulationWorkerPool<B>>,
6158
) -> Result<Self> {
6259
let consumer: StreamConsumer = ClientConfig::new()
6360
.set("group.id", &config.kafka_group_id)
@@ -86,8 +83,7 @@ where
8683
/// Run the mempool event listener
8784
pub async fn run(self) -> Result<()>
8885
where
89-
E: 'static,
90-
P: 'static,
86+
B: 'static,
9187
{
9288
info!(
9389
topic = %self.topic,

crates/simulator/src/worker_pool.rs

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
use crate::core::BundleSimulator;
2-
use crate::engine::SimulationEngine;
3-
use crate::publisher::SimulationPublisher;
42
use crate::types::SimulationRequest;
5-
use reth_provider::StateProviderFactory;
63
use std::sync::atomic::{AtomicU64, Ordering};
74
use std::sync::Arc;
85
use tokio::sync::mpsc;
@@ -15,16 +12,12 @@ pub struct SimulationTask {
1512
}
1613

1714
/// Generic simulation worker pool that can be shared across different simulators
18-
pub struct SimulationWorkerPool<E, P, S>
15+
pub struct SimulationWorkerPool<B>
1916
where
20-
E: SimulationEngine,
21-
P: SimulationPublisher,
22-
S: StateProviderFactory,
17+
B: BundleSimulator,
2318
{
2419
/// Core bundle simulator
25-
simulator: Arc<BundleSimulator<E, P>>,
26-
/// State provider factory
27-
state_provider_factory: Arc<S>,
20+
simulator: Arc<B>,
2821
/// Channel for sending simulation requests to workers
2922
simulation_tx: mpsc::Sender<SimulationTask>,
3023
/// Channel for receiving simulation requests in workers
@@ -37,23 +30,19 @@ where
3730
max_concurrent: usize,
3831
}
3932

40-
impl<E, P, S> SimulationWorkerPool<E, P, S>
33+
impl<B> SimulationWorkerPool<B>
4134
where
42-
E: SimulationEngine + Clone + 'static,
43-
P: SimulationPublisher + Clone + 'static,
44-
S: reth_provider::StateProviderFactory + Send + Sync + 'static,
35+
B: BundleSimulator + 'static,
4536
{
4637
/// Create a new simulation worker pool
4738
pub fn new(
48-
simulator: Arc<BundleSimulator<E, P>>,
49-
state_provider_factory: Arc<S>,
39+
simulator: Arc<B>,
5040
max_concurrent_simulations: usize,
5141
) -> Arc<Self> {
5242
let (simulation_tx, simulation_rx) = mpsc::channel(1000);
5343

5444
Arc::new(Self {
5545
simulator,
56-
state_provider_factory,
5746
simulation_tx,
5847
simulation_rx: Arc::new(tokio::sync::Mutex::new(simulation_rx)),
5948
latest_block: AtomicU64::new(0),
@@ -145,7 +134,7 @@ where
145134
// Execute the simulation
146135
match pool
147136
.simulator
148-
.simulate(&task.request, pool.state_provider_factory.as_ref())
137+
.simulate(&task.request)
149138
.await
150139
{
151140
Ok(_) => {

0 commit comments

Comments
 (0)