Skip to content

Commit 48232cf

Browse files
committed
feat: add verification service for flashblocks with metrics tracking for block reorgs
1 parent 3f3d846 commit 48232cf

File tree

8 files changed

+685
-25
lines changed

8 files changed

+685
-25
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ reth-rpc-types-compat = { git = "https://github.com/paradigmxyz/reth", tag = "v1
3333
reth-optimism-rpc = { git = "https://github.com/paradigmxyz/reth", tag = "v1.3.12" }
3434
reth-optimism-evm = { git = "https://github.com/paradigmxyz/reth", tag = "v1.3.12" }
3535
reth-optimism-chainspec = { git = "https://github.com/paradigmxyz/reth", tag = "v1.3.12" }
36+
reth-exex = { git = "https://github.com/paradigmxyz/reth", tag = "v1.3.12" }
3637

3738
# revm
3839
revm = { version = "22.0.1", default-features = false }

crates/flashblocks-rpc/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ reth-rpc-types-compat.workspace = true
2626
reth-optimism-rpc.workspace = true
2727
reth-optimism-evm.workspace = true
2828
reth-optimism-chainspec.workspace = true
29+
reth-exex.workspace = true
2930

3031
# revm
3132
revm.workspace = true

crates/flashblocks-rpc/src/flashblocks.rs

Lines changed: 183 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -339,29 +339,37 @@ fn get_and_set_txs_and_receipts(
339339
metadata: Metadata,
340340
) -> Result<Vec<OpReceipt>, Box<dyn std::error::Error>> {
341341
let mut diff_receipts: Vec<OpReceipt> = vec![];
342-
// Store tx transaction signed
342+
let mut tx_hashes: Vec<String> = vec![];
343+
344+
if let Some(existing_hashes) = cache.get::<Vec<String>>(&format!("tx_hashes:{}", block_number))
345+
{
346+
tx_hashes = existing_hashes;
347+
}
348+
343349
for (idx, transaction) in block.body.transactions.iter().enumerate() {
344-
// check if exists, if not update
345-
let existing_tx = cache.get::<OpTransactionSigned>(&transaction.tx_hash().to_string());
350+
let tx_hash = transaction.tx_hash().to_string();
351+
352+
// Add transaction hash to the ordered list if not already present
353+
if !tx_hashes.contains(&tx_hash) {
354+
tx_hashes.push(tx_hash.clone());
355+
}
356+
357+
let existing_tx = cache.get::<OpTransactionSigned>(&tx_hash);
346358
if existing_tx.is_none() {
347-
if let Err(e) = cache.set(&transaction.tx_hash().to_string(), &transaction, Some(10)) {
359+
if let Err(e) = cache.set(&tx_hash, &transaction, Some(10)) {
348360
error!("Failed to set transaction in cache: {}", e);
349361
continue;
350362
}
351-
// update tx index
352363
if let Err(e) = cache.set(&format!("tx_idx:{}", transaction.tx_hash()), &idx, Some(10))
353364
{
354365
error!("Failed to set transaction index in cache: {}", e);
355366
continue;
356367
}
357368

358-
// update tx count for each from address
359369
if let Ok(from) = transaction.recover_signer() {
360-
// Get current tx count, default to 0 if not found
361370
let current_count = cache
362371
.get::<u64>(&format!("tx_count:{}:{}", from, block_number))
363372
.unwrap_or(0);
364-
// Increment tx count by 1
365373
if let Err(e) = cache.set(
366374
&format!("tx_count:{}:{}", from, block_number),
367375
&(current_count + 1),
@@ -370,7 +378,6 @@ fn get_and_set_txs_and_receipts(
370378
error!("Failed to set transaction count in cache: {}", e);
371379
}
372380

373-
// also keep track of sender of each transaction
374381
if let Err(e) = cache.set(
375382
&format!("tx_sender:{}", transaction.tx_hash()),
376383
&from,
@@ -379,7 +386,6 @@ fn get_and_set_txs_and_receipts(
379386
error!("Failed to set transaction sender in cache: {}", e);
380387
}
381388

382-
// also keep track of the block number of each transaction
383389
if let Err(e) = cache.set(
384390
&format!("tx_block_number:{}", transaction.tx_hash()),
385391
&block_number,
@@ -391,26 +397,16 @@ fn get_and_set_txs_and_receipts(
391397
}
392398

393399
// TODO: move this into the transaction check
394-
if metadata
395-
.receipts
396-
.contains_key(&transaction.tx_hash().to_string())
397-
{
400+
if metadata.receipts.contains_key(&tx_hash) {
398401
// find receipt in metadata and set it in cache
399-
let receipt = metadata
400-
.receipts
401-
.get(&transaction.tx_hash().to_string())
402-
.unwrap();
403-
if let Err(e) = cache.set(
404-
&format!("receipt:{:?}", transaction.tx_hash().to_string()),
405-
receipt,
406-
Some(10),
407-
) {
402+
let receipt = metadata.receipts.get(&tx_hash).unwrap();
403+
if let Err(e) = cache.set(&format!("receipt:{:?}", tx_hash), receipt, Some(10)) {
408404
error!("Failed to set receipt in cache: {}", e);
409405
continue;
410406
}
411407
// map receipt's block number as well
412408
if let Err(e) = cache.set(
413-
&format!("receipt_block:{:?}", transaction.tx_hash().to_string()),
409+
&format!("receipt_block:{:?}", tx_hash),
414410
&block_number,
415411
Some(10),
416412
) {
@@ -422,6 +418,10 @@ fn get_and_set_txs_and_receipts(
422418
}
423419
}
424420

421+
if let Err(e) = cache.set(&format!("tx_hashes:{}", block_number), &tx_hashes, Some(10)) {
422+
error!("Failed to update transaction hashes list in cache: {}", e);
423+
}
424+
425425
Ok(diff_receipts)
426426
}
427427

@@ -562,7 +562,7 @@ mod tests {
562562
payload_id: PayloadId::new([0; 8]),
563563
base: None,
564564
diff: delta2,
565-
metadata: serde_json::to_value(metadata2).unwrap(),
565+
metadata: serde_json::to_value(metadata2.clone()).unwrap(),
566566
}
567567
}
568568

@@ -688,4 +688,162 @@ mod tests {
688688
// Verify no block was stored, since it skips the first payload
689689
assert!(cache.get::<OpBlock>("pending").is_none());
690690
}
691+
692+
#[test]
693+
fn test_tx_hash_list_storage_and_deduplication() {
694+
let cache = Arc::new(Cache::default());
695+
let block_number = 1;
696+
697+
let base = ExecutionPayloadBaseV1 {
698+
parent_hash: Default::default(),
699+
parent_beacon_block_root: Default::default(),
700+
fee_recipient: Address::from_str("0x1234567890123456789012345678901234567890").unwrap(),
701+
block_number,
702+
gas_limit: 1000000,
703+
timestamp: 1234567890,
704+
prev_randao: Default::default(),
705+
extra_data: Default::default(),
706+
base_fee_per_gas: U256::from(1000),
707+
};
708+
709+
let tx1 = Bytes::from_str("0x02f87483014a3482017e8459682f0084596830a98301f1d094b01866f195533de16eb929b73f87280693ca0cb480844e71d92dc001a0a658c18bdba29dd4022ee6640fdd143691230c12b3c8c86cf5c1a1f1682cc1e2a0248a28763541ebed2b87ecea63a7024b5c2b7de58539fa64c887b08f5faf29c1").unwrap();
710+
711+
let delta1 = ExecutionPayloadFlashblockDeltaV1 {
712+
transactions: vec![tx1.clone()],
713+
withdrawals: vec![],
714+
state_root: Default::default(),
715+
receipts_root: Default::default(),
716+
logs_bloom: Default::default(),
717+
gas_used: 21000,
718+
block_hash: Default::default(),
719+
};
720+
721+
let tx1_hash =
722+
"0x3cbbc9a6811ac5b2a2e5780bdb67baffc04246a59f39e398be048f1b2d05460c".to_string();
723+
724+
let metadata1 = Metadata {
725+
block_number,
726+
receipts: {
727+
let mut receipts = HashMap::default();
728+
receipts.insert(
729+
tx1_hash.clone(),
730+
OpReceipt::Legacy(Receipt {
731+
status: true.into(),
732+
cumulative_gas_used: 21000,
733+
logs: vec![],
734+
}),
735+
);
736+
receipts
737+
},
738+
new_account_balances: HashMap::default(),
739+
};
740+
741+
let payload1 = FlashblocksPayloadV1 {
742+
index: 0,
743+
payload_id: PayloadId::new([0; 8]),
744+
base: Some(base),
745+
diff: delta1,
746+
metadata: serde_json::to_value(metadata1).unwrap(),
747+
};
748+
749+
process_payload(payload1, cache.clone());
750+
751+
let tx_hashes1 = cache
752+
.get::<Vec<String>>(&format!("tx_hashes:{}", block_number))
753+
.unwrap();
754+
assert_eq!(tx_hashes1.len(), 1);
755+
assert_eq!(tx_hashes1[0], tx1_hash);
756+
757+
let tx2 = Bytes::from_str("0xf8cd82016d8316e5708302c01c94f39635f2adf40608255779ff742afe13de31f57780b8646e530e9700000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000001bc16d674ec8000000000000000000000000000000000000000000000000000156ddc81eed2a36d68302948ba0a608703e79b22164f74523d188a11f81c25a65dd59535bab1cd1d8b30d115f3ea07f4cfbbad77a139c9209d3bded89091867ff6b548dd714109c61d1f8e7a84d14").unwrap();
758+
759+
let tx2_hash =
760+
"0xa6155b295085d3b87a3c86e342fe11c3b22f9952d0d85d9d34d223b7d6a17cd8".to_string();
761+
762+
let delta2 = ExecutionPayloadFlashblockDeltaV1 {
763+
transactions: vec![tx1.clone(), tx2.clone()], // Note tx1 is repeated
764+
withdrawals: vec![],
765+
state_root: B256::repeat_byte(0x1),
766+
receipts_root: B256::repeat_byte(0x2),
767+
logs_bloom: Default::default(),
768+
gas_used: 42000,
769+
block_hash: B256::repeat_byte(0x3),
770+
};
771+
772+
let metadata2 = Metadata {
773+
block_number,
774+
receipts: {
775+
let mut receipts = HashMap::default();
776+
receipts.insert(
777+
tx1_hash.clone(),
778+
OpReceipt::Legacy(Receipt {
779+
status: true.into(),
780+
cumulative_gas_used: 21000,
781+
logs: vec![],
782+
}),
783+
);
784+
receipts.insert(
785+
tx2_hash.clone(),
786+
OpReceipt::Legacy(Receipt {
787+
status: true.into(),
788+
cumulative_gas_used: 42000,
789+
logs: vec![],
790+
}),
791+
);
792+
receipts
793+
},
794+
new_account_balances: HashMap::default(),
795+
};
796+
797+
let payload2 = FlashblocksPayloadV1 {
798+
index: 1,
799+
payload_id: PayloadId::new([0; 8]),
800+
base: None,
801+
diff: delta2,
802+
metadata: serde_json::to_value(metadata2.clone()).unwrap(),
803+
};
804+
805+
process_payload(payload2, cache.clone());
806+
807+
let tx_hashes2 = cache
808+
.get::<Vec<String>>(&format!("tx_hashes:{}", block_number))
809+
.unwrap();
810+
assert_eq!(
811+
tx_hashes2.len(),
812+
2,
813+
"Should have 2 unique transaction hashes"
814+
);
815+
assert_eq!(tx_hashes2[0], tx1_hash, "First hash should be tx1");
816+
assert_eq!(tx_hashes2[1], tx2_hash, "Second hash should be tx2");
817+
818+
let delta3 = ExecutionPayloadFlashblockDeltaV1 {
819+
transactions: vec![tx2.clone(), tx1.clone()], // Different order
820+
withdrawals: vec![],
821+
state_root: B256::repeat_byte(0x1),
822+
receipts_root: B256::repeat_byte(0x2),
823+
logs_bloom: Default::default(),
824+
gas_used: 42000,
825+
block_hash: B256::repeat_byte(0x3),
826+
};
827+
828+
let payload3 = FlashblocksPayloadV1 {
829+
index: 2,
830+
payload_id: PayloadId::new([0; 8]),
831+
base: None,
832+
diff: delta3,
833+
metadata: serde_json::to_value(metadata2).unwrap(), // Same metadata
834+
};
835+
836+
process_payload(payload3, cache.clone());
837+
838+
let tx_hashes3 = cache
839+
.get::<Vec<String>>(&format!("tx_hashes:{}", block_number))
840+
.unwrap();
841+
assert_eq!(
842+
tx_hashes3.len(),
843+
2,
844+
"Should still have 2 unique transaction hashes"
845+
);
846+
assert_eq!(tx_hashes3[0], tx1_hash, "First hash should be tx1");
847+
assert_eq!(tx_hashes3[1], tx2_hash, "Second hash should be tx2");
848+
}
691849
}

crates/flashblocks-rpc/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ pub mod cache;
22
pub mod flashblocks;
33
mod metrics;
44
pub mod rpc;
5+
pub mod verification;
56

67
#[cfg(test)]
78
mod integration;

crates/flashblocks-rpc/src/metrics.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,16 @@ pub struct Metrics {
2626

2727
#[metric(describe = "Count of times flashblocks get_block_by_number is called")]
2828
pub get_block_by_number: Counter,
29+
30+
#[metric(describe = "Count of successful block verifications")]
31+
pub block_verification_success: Counter,
32+
33+
#[metric(describe = "Count of failed block verifications")]
34+
pub block_verification_failure: Counter,
35+
36+
#[metric(describe = "Count of blocks not found in cache during verification")]
37+
pub block_verification_not_found: Counter,
38+
39+
#[metric(describe = "Count of transaction count mismatches during verification")]
40+
pub block_verification_tx_count_mismatch: Counter,
2941
}

0 commit comments

Comments
 (0)