@@ -111,6 +111,7 @@ impl SubmitTask {
111
111
async fn next_host_block_height ( & self ) -> eyre:: Result < u64 > {
112
112
let result = self . provider ( ) . get_block_number ( ) . await ?;
113
113
let next = result. checked_add ( 1 ) . ok_or_else ( || eyre ! ( "next host block height overflow" ) ) ?;
114
+ debug ! ( next, "next host block height" ) ;
114
115
Ok ( next)
115
116
}
116
117
@@ -257,7 +258,6 @@ impl SubmitTask {
257
258
nonce = ?tx. nonce,
258
259
"sending transaction to network"
259
260
) ;
260
-
261
261
262
262
// assign the nonce and fill the rest of the values
263
263
let SendableTx :: Envelope ( tx) = self . provider ( ) . fill ( tx) . await ? else {
@@ -285,12 +285,15 @@ impl SubmitTask {
285
285
return Ok ( ControlFlow :: Skip ) ;
286
286
}
287
287
288
+ // Okay so the code gets all the way to this log
289
+ // but we don't see the tx hash in the logs or in the explorer,
290
+ // not even as a failed TX, just not at all.
288
291
info ! (
289
292
tx_hash = %tx. tx_hash( ) ,
290
293
ru_chain_id = %resp. req. ru_chain_id,
291
294
gas_limit = %resp. req. gas_limit,
292
295
"dispatched to network"
293
- ) ;
296
+ ) ;
294
297
295
298
Ok ( ControlFlow :: Done )
296
299
}
@@ -336,22 +339,31 @@ impl SubmitTask {
336
339
let span = debug_span ! ( "SubmitTask::retrying_handle_inbound" , retries) ;
337
340
debug ! ( retries, "number of retries" ) ;
338
341
339
- let inbound_result = match self . handle_inbound ( retries, block) . instrument ( span. clone ( ) ) . await {
342
+ let inbound_result = match self
343
+ . handle_inbound ( retries, block)
344
+ . instrument ( span. clone ( ) )
345
+ . await
346
+ {
340
347
Ok ( control_flow) => {
341
348
debug ! ( ?control_flow, retries, "successfully handled inbound block" ) ;
342
349
control_flow
343
350
}
344
351
Err ( err) => {
352
+ // Log the retry attempt
345
353
retries += 1 ;
346
354
error ! ( error = %err, "error handling inbound block" ) ;
347
355
356
+ // Delay until next slot if we get a 403 error
348
357
if err. to_string ( ) . contains ( "403" ) {
349
- debug ! ( "403 error - skipping block" ) ;
350
- let ( slot_number, start, end) = self . calculate_slot_window ( ) ?;
351
- debug ! ( slot_number, start, end, "403 sleep until skipping block" ) ;
352
- // TODO: Sleep until the end of the next slot and return retry
353
- return Ok ( ControlFlow :: Done ) ;
358
+ let ( slot_number, _, end) = self . calculate_slot_window ( ) ?;
359
+ let now = self . now ( ) ;
360
+ if end > now {
361
+ let sleep_duration = std:: time:: Duration :: from_secs ( end - now) ;
362
+ debug ! ( sleep_duration = ?sleep_duration, slot_number, "403 detected - sleeping until end of slot" ) ;
363
+ tokio:: time:: sleep ( sleep_duration) . await ;
364
+ }
354
365
}
366
+
355
367
ControlFlow :: Retry
356
368
}
357
369
} ;
@@ -413,19 +425,36 @@ impl SubmitTask {
413
425
}
414
426
415
427
/// Task future for the submit task
428
+ /// NB: This task assumes that the simulator will only send it blocks for
429
+ /// slots that it's assigned.
416
430
async fn task_future ( self , mut inbound : mpsc:: UnboundedReceiver < BuiltBlock > ) {
431
+ // Holds a reference to the last block we attempted to submit
432
+ let mut last_block_attempted: u64 = 0 ;
433
+
417
434
loop {
435
+ // Wait to receive a new block
418
436
let Some ( block) = inbound. recv ( ) . await else {
419
437
debug ! ( "upstream task gone" ) ;
420
438
break ;
421
439
} ;
422
- debug ! ( ?block, "submit channel received block" ) ;
440
+ debug ! ( block_number = block. block_number( ) , ?block, "submit channel received block" ) ;
441
+
442
+ // Check if a block number was set and skip if not
443
+ if block. block_number ( ) == 0 {
444
+ debug ! ( "block number is 0 - skipping" ) ;
445
+ continue ;
446
+ }
447
+
448
+ // Only attempt each block number once
449
+ if block. block_number ( ) == last_block_attempted {
450
+ debug ! ( "block number is unchanged from last attempt - skipping" ) ;
451
+ continue ;
452
+ }
423
453
424
- // TODO: Pass a BlockEnv to this function to give retrying handle inbound access to the block
425
- // env and thus the block number so that we can be sure that we try for only our assigned slots.
454
+ // This means we have encountered a new block, so reset the last block attempted
455
+ last_block_attempted = block. block_number ( ) ;
456
+ debug ! ( last_block_attempted, "resetting last block attempted" ) ;
426
457
427
- // Instead this needs to fire off a task that attempts to land the block for the given slot
428
- // Once that slot is up, it's invalid for the next anyway, so this job can be ephemeral.
429
458
if self . retrying_handle_inbound ( & block, 3 ) . await . is_err ( ) {
430
459
debug ! ( "error handling inbound block" ) ;
431
460
continue ;
0 commit comments