@@ -2,18 +2,18 @@ use super::bundler::{Bundle, BundlePoller};
2
2
use super :: oauth:: Authenticator ;
3
3
use super :: tx_poller:: TxPoller ;
4
4
use crate :: config:: { BuilderConfig , WalletlessProvider } ;
5
- use alloy:: primitives:: { keccak256, Bytes , B256 } ;
6
- use alloy:: providers:: Provider ;
7
5
use alloy:: {
8
6
consensus:: { SidecarBuilder , SidecarCoder , TxEnvelope } ,
9
7
eips:: eip2718:: Decodable2718 ,
8
+ primitives:: { keccak256, Bytes , B256 } ,
9
+ providers:: Provider as _,
10
+ rlp:: Buf ,
10
11
} ;
11
- use alloy_rlp:: Buf ;
12
12
use std:: time:: { SystemTime , UNIX_EPOCH } ;
13
13
use std:: { sync:: OnceLock , time:: Duration } ;
14
14
use tokio:: { sync:: mpsc, task:: JoinHandle } ;
15
- use tracing:: Instrument ;
16
- use zenith_types:: { encode_txns, Alloy2718Coder } ;
15
+ use tracing:: { debug , error , info , trace , Instrument } ;
16
+ use zenith_types:: { encode_txns, Alloy2718Coder , ZenithEthBundle } ;
17
17
18
18
/// Ethereum's slot time in seconds.
19
19
pub const ETHEREUM_SLOT_TIME : u64 = 12 ;
@@ -56,22 +56,22 @@ impl InProgressBlock {
56
56
57
57
/// Ingest a transaction into the in-progress block. Fails
58
58
pub fn ingest_tx ( & mut self , tx : & TxEnvelope ) {
59
- tracing :: trace!( hash = %tx. tx_hash( ) , "ingesting tx" ) ;
59
+ trace ! ( hash = %tx. tx_hash( ) , "ingesting tx" ) ;
60
60
self . unseal ( ) ;
61
61
self . transactions . push ( tx. clone ( ) ) ;
62
62
}
63
63
64
64
/// Remove a transaction from the in-progress block.
65
65
pub fn remove_tx ( & mut self , tx : & TxEnvelope ) {
66
- tracing :: trace!( hash = %tx. tx_hash( ) , "removing tx" ) ;
66
+ trace ! ( hash = %tx. tx_hash( ) , "removing tx" ) ;
67
67
self . unseal ( ) ;
68
68
self . transactions . retain ( |t| t. tx_hash ( ) != tx. tx_hash ( ) ) ;
69
69
}
70
70
71
71
/// Ingest a bundle into the in-progress block.
72
72
/// Ignores Signed Orders for now.
73
73
pub fn ingest_bundle ( & mut self , bundle : Bundle ) {
74
- tracing :: trace!( bundle = %bundle. id, "ingesting bundle" ) ;
74
+ trace ! ( bundle = %bundle. id, "ingesting bundle" ) ;
75
75
76
76
let txs = bundle
77
77
. bundle
@@ -87,7 +87,7 @@ impl InProgressBlock {
87
87
// As this builder does not provide bundles landing "top of block", its fine to just extend.
88
88
self . transactions . extend ( txs) ;
89
89
} else {
90
- tracing :: error!( "failed to decode bundle. dropping" ) ;
90
+ error ! ( "failed to decode bundle. dropping" ) ;
91
91
}
92
92
}
93
93
@@ -139,39 +139,51 @@ impl BlockBuilder {
139
139
}
140
140
}
141
141
142
+ /// Fetches transactions from the cache and ingests them into the in progress block
142
143
async fn get_transactions ( & mut self , in_progress : & mut InProgressBlock ) {
143
- tracing :: trace!( "query transactions from cache" ) ;
144
+ trace ! ( "query transactions from cache" ) ;
144
145
let txns = self . tx_poller . check_tx_cache ( ) . await ;
145
146
match txns {
146
147
Ok ( txns) => {
147
- tracing :: trace!( "got transactions response" ) ;
148
+ trace ! ( "got transactions response" ) ;
148
149
for txn in txns. into_iter ( ) {
149
150
in_progress. ingest_tx ( & txn) ;
150
151
}
151
152
}
152
153
Err ( e) => {
153
- tracing :: error!( error = %e, "error polling transactions" ) ;
154
+ error ! ( error = %e, "error polling transactions" ) ;
154
155
}
155
156
}
156
157
}
157
158
158
- async fn _get_bundles ( & mut self , in_progress : & mut InProgressBlock ) {
159
- tracing:: trace!( "query bundles from cache" ) ;
159
+ /// Fetches bundles from the cache and ingests them into the in progress block
160
+ async fn get_bundles ( & mut self , in_progress : & mut InProgressBlock ) {
161
+ trace ! ( "query bundles from cache" ) ;
160
162
let bundles = self . bundle_poller . check_bundle_cache ( ) . await ;
161
163
match bundles {
162
164
Ok ( bundles) => {
163
- tracing:: trace!( "got bundles response" ) ;
164
165
for bundle in bundles {
165
- in_progress. ingest_bundle ( bundle) ;
166
+ match self . simulate_bundle ( & bundle. bundle ) . await {
167
+ Ok ( ( ) ) => in_progress. ingest_bundle ( bundle. clone ( ) ) ,
168
+ Err ( e) => error ! ( error = %e, id = ?bundle. id, "bundle simulation failed" ) ,
169
+ }
166
170
}
167
171
}
168
172
Err ( e) => {
169
- tracing :: error!( error = %e, "error polling bundles" ) ;
173
+ error ! ( error = %e, "error polling bundles" ) ;
170
174
}
171
175
}
172
176
self . bundle_poller . evict ( ) ;
173
177
}
174
178
179
+ /// Simulates a Zenith bundle against the rollup state
180
+ async fn simulate_bundle ( & mut self , bundle : & ZenithEthBundle ) -> eyre:: Result < ( ) > {
181
+ // TODO: Simulate bundles with the Simulation Engine
182
+ // [ENG-672](https://linear.app/initiates/issue/ENG-672/add-support-for-bundles)
183
+ debug ! ( hash = ?bundle. bundle. bundle_hash( ) , block_number = ?bundle. block_number( ) , "bundle simulations is not implemented yet - skipping simulation" ) ;
184
+ Ok ( ( ) )
185
+ }
186
+
175
187
async fn filter_transactions ( & self , in_progress : & mut InProgressBlock ) {
176
188
// query the rollup node to see which transaction(s) have been included
177
189
let mut confirmed_transactions = Vec :: new ( ) ;
@@ -185,7 +197,7 @@ impl BlockBuilder {
185
197
confirmed_transactions. push ( transaction. clone ( ) ) ;
186
198
}
187
199
}
188
- tracing :: trace!( confirmed = confirmed_transactions. len( ) , "found confirmed transactions" ) ;
200
+ trace ! ( confirmed = confirmed_transactions. len( ) , "found confirmed transactions" ) ;
189
201
190
202
// remove already-confirmed transactions
191
203
for transaction in confirmed_transactions {
@@ -213,32 +225,98 @@ impl BlockBuilder {
213
225
loop {
214
226
// sleep the buffer time
215
227
tokio:: time:: sleep ( Duration :: from_secs ( self . secs_to_next_target ( ) ) ) . await ;
216
- tracing :: info!( "beginning block build cycle" ) ;
228
+ info ! ( "beginning block build cycle" ) ;
217
229
218
230
// Build a block
219
231
let mut in_progress = InProgressBlock :: default ( ) ;
220
232
self . get_transactions ( & mut in_progress) . await ;
221
-
222
- // TODO: Implement bundle ingestion #later
223
- // self.get_bundles(&mut in_progress).await;
233
+ self . get_bundles ( & mut in_progress) . await ;
224
234
225
235
// Filter confirmed transactions from the block
226
236
self . filter_transactions ( & mut in_progress) . await ;
227
237
228
238
// submit the block if it has transactions
229
239
if !in_progress. is_empty ( ) {
230
- tracing :: debug!( txns = in_progress. len( ) , "sending block to submit task" ) ;
240
+ debug ! ( txns = in_progress. len( ) , "sending block to submit task" ) ;
231
241
let in_progress_block = std:: mem:: take ( & mut in_progress) ;
232
242
if outbound. send ( in_progress_block) . is_err ( ) {
233
- tracing :: error!( "downstream task gone" ) ;
243
+ error ! ( "downstream task gone" ) ;
234
244
break ;
235
245
}
236
246
} else {
237
- tracing :: debug!( "no transactions, skipping block submission" ) ;
247
+ debug ! ( "no transactions, skipping block submission" ) ;
238
248
}
239
249
}
240
250
}
241
251
. in_current_span ( ) ,
242
252
)
243
253
}
244
254
}
255
+
256
+ #[ cfg( test) ]
257
+ mod tests {
258
+ use super :: * ;
259
+ use alloy:: primitives:: Address ;
260
+ use alloy:: {
261
+ eips:: eip2718:: Encodable2718 ,
262
+ network:: { EthereumWallet , TransactionBuilder } ,
263
+ rpc:: types:: { mev:: EthSendBundle , TransactionRequest } ,
264
+ signers:: local:: PrivateKeySigner ,
265
+ } ;
266
+ use zenith_types:: ZenithEthBundle ;
267
+
268
+ /// Create a mock bundle for testing with a single transaction
269
+ async fn create_mock_bundle ( wallet : & EthereumWallet ) -> Bundle {
270
+ let tx = TransactionRequest :: default ( )
271
+ . to ( Address :: ZERO )
272
+ . from ( wallet. default_signer ( ) . address ( ) )
273
+ . nonce ( 1 )
274
+ . max_fee_per_gas ( 2 )
275
+ . max_priority_fee_per_gas ( 3 )
276
+ . gas_limit ( 4 )
277
+ . build ( wallet)
278
+ . await
279
+ . unwrap ( )
280
+ . encoded_2718 ( ) ;
281
+
282
+ let eth_bundle = EthSendBundle {
283
+ txs : vec ! [ tx. into( ) ] ,
284
+ block_number : 1 ,
285
+ min_timestamp : Some ( u64:: MIN ) ,
286
+ max_timestamp : Some ( u64:: MAX ) ,
287
+ reverting_tx_hashes : vec ! [ ] ,
288
+ replacement_uuid : Some ( "replacement_uuid" . to_owned ( ) ) ,
289
+ } ;
290
+
291
+ let zenith_bundle = ZenithEthBundle { bundle : eth_bundle, host_fills : None } ;
292
+
293
+ Bundle { id : "mock_bundle" . to_owned ( ) , bundle : zenith_bundle }
294
+ }
295
+
296
+ #[ tokio:: test]
297
+ async fn test_ingest_bundle ( ) {
298
+ // Setup random creds
299
+ let signer = PrivateKeySigner :: random ( ) ;
300
+ let wallet = EthereumWallet :: from ( signer) ;
301
+
302
+ // Create an empty InProgressBlock and bundle
303
+ let mut in_progress_block = InProgressBlock :: new ( ) ;
304
+ let bundle = create_mock_bundle ( & wallet) . await ;
305
+
306
+ // Save previous hash for comparison
307
+ let prev_hash = in_progress_block. contents_hash ( ) ;
308
+
309
+ // Ingest the bundle
310
+ in_progress_block. ingest_bundle ( bundle) ;
311
+
312
+ // Assert hash is changed after ingest
313
+ assert_ne ! ( prev_hash, in_progress_block. contents_hash( ) , "Bundle should change block hash" ) ;
314
+
315
+ // Assert that the transaction was persisted into block
316
+ assert_eq ! ( in_progress_block. len( ) , 1 , "Bundle should be persisted" ) ;
317
+
318
+ // Assert that the block is properly sealed
319
+ let raw_encoding = in_progress_block. encode_raw ( ) ;
320
+ assert ! ( !raw_encoding. is_empty( ) , "Raw encoding should not be empty" ) ;
321
+ }
322
+ }
0 commit comments