Skip to content

feat: add verification service for flashblocks with metrics tracking for block reorgs #22

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ reth-rpc-types-compat = { git = "https://github.com/paradigmxyz/reth", tag = "v1
reth-optimism-rpc = { git = "https://github.com/paradigmxyz/reth", tag = "v1.3.12" }
reth-optimism-evm = { git = "https://github.com/paradigmxyz/reth", tag = "v1.3.12" }
reth-optimism-chainspec = { git = "https://github.com/paradigmxyz/reth", tag = "v1.3.12" }
reth-exex = { git = "https://github.com/paradigmxyz/reth", tag = "v1.3.12" }

# revm
revm = { version = "22.0.1", default-features = false }
Expand Down
1 change: 1 addition & 0 deletions crates/flashblocks-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ reth-rpc-types-compat.workspace = true
reth-optimism-rpc.workspace = true
reth-optimism-evm.workspace = true
reth-optimism-chainspec.workspace = true
reth-exex.workspace = true

# revm
revm.workspace = true
Expand Down
208 changes: 183 additions & 25 deletions crates/flashblocks-rpc/src/flashblocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,29 +368,37 @@ fn get_and_set_txs_and_receipts(
metadata: Metadata,
) -> Result<Vec<OpReceipt>, Box<dyn std::error::Error>> {
let mut diff_receipts: Vec<OpReceipt> = vec![];
// Store tx transaction signed
let mut tx_hashes: Vec<String> = vec![];

if let Some(existing_hashes) = cache.get::<Vec<String>>(&format!("tx_hashes:{}", block_number))
{
tx_hashes = existing_hashes;
}

for (idx, transaction) in block.body.transactions.iter().enumerate() {
// check if exists, if not update
let existing_tx = cache.get::<OpTransactionSigned>(&transaction.tx_hash().to_string());
let tx_hash = transaction.tx_hash().to_string();

// Add transaction hash to the ordered list if not already present
if !tx_hashes.contains(&tx_hash) {
tx_hashes.push(tx_hash.clone());
}

let existing_tx = cache.get::<OpTransactionSigned>(&tx_hash);
if existing_tx.is_none() {
if let Err(e) = cache.set(&transaction.tx_hash().to_string(), &transaction, Some(10)) {
if let Err(e) = cache.set(&tx_hash, &transaction, Some(10)) {
error!("Failed to set transaction in cache: {}", e);
continue;
}
// update tx index
if let Err(e) = cache.set(&format!("tx_idx:{}", transaction.tx_hash()), &idx, Some(10))
{
error!("Failed to set transaction index in cache: {}", e);
continue;
}

// update tx count for each from address
if let Ok(from) = transaction.recover_signer() {
// Get current tx count, default to 0 if not found
let current_count = cache
.get::<u64>(&format!("tx_count:{}:{}", from, block_number))
.unwrap_or(0);
// Increment tx count by 1
if let Err(e) = cache.set(
&format!("tx_count:{}:{}", from, block_number),
&(current_count + 1),
Expand All @@ -399,7 +407,6 @@ fn get_and_set_txs_and_receipts(
error!("Failed to set transaction count in cache: {}", e);
}

// also keep track of sender of each transaction
if let Err(e) = cache.set(
&format!("tx_sender:{}", transaction.tx_hash()),
&from,
Expand All @@ -408,7 +415,6 @@ fn get_and_set_txs_and_receipts(
error!("Failed to set transaction sender in cache: {}", e);
}

// also keep track of the block number of each transaction
if let Err(e) = cache.set(
&format!("tx_block_number:{}", transaction.tx_hash()),
&block_number,
Expand All @@ -420,26 +426,16 @@ fn get_and_set_txs_and_receipts(
}

// TODO: move this into the transaction check
if metadata
.receipts
.contains_key(&transaction.tx_hash().to_string())
{
if metadata.receipts.contains_key(&tx_hash) {
// find receipt in metadata and set it in cache
let receipt = metadata
.receipts
.get(&transaction.tx_hash().to_string())
.unwrap();
if let Err(e) = cache.set(
&format!("receipt:{:?}", transaction.tx_hash().to_string()),
receipt,
Some(10),
) {
let receipt = metadata.receipts.get(&tx_hash).unwrap();
if let Err(e) = cache.set(&format!("receipt:{:?}", tx_hash), receipt, Some(10)) {
error!("Failed to set receipt in cache: {}", e);
continue;
}
// map receipt's block number as well
if let Err(e) = cache.set(
&format!("receipt_block:{:?}", transaction.tx_hash().to_string()),
&format!("receipt_block:{:?}", tx_hash),
&block_number,
Some(10),
) {
Expand All @@ -451,6 +447,10 @@ fn get_and_set_txs_and_receipts(
}
}

if let Err(e) = cache.set(&format!("tx_hashes:{}", block_number), &tx_hashes, Some(10)) {
error!("Failed to update transaction hashes list in cache: {}", e);
}

Ok(diff_receipts)
}

Expand Down Expand Up @@ -635,7 +635,7 @@ mod tests {
payload_id: PayloadId::new([0; 8]),
base: None,
diff: delta2,
metadata: serde_json::to_value(metadata2).unwrap(),
metadata: serde_json::to_value(metadata2.clone()).unwrap(),
}
}

Expand Down Expand Up @@ -833,4 +833,162 @@ mod tests {
let highest = cache.get::<u64>("highest_payload_index").unwrap();
assert_eq!(highest, 0);
}

#[test]
fn test_tx_hash_list_storage_and_deduplication() {
let cache = Arc::new(Cache::default());
let block_number = 1;

let base = ExecutionPayloadBaseV1 {
parent_hash: Default::default(),
parent_beacon_block_root: Default::default(),
fee_recipient: Address::from_str("0x1234567890123456789012345678901234567890").unwrap(),
block_number,
gas_limit: 1000000,
timestamp: 1234567890,
prev_randao: Default::default(),
extra_data: Default::default(),
base_fee_per_gas: U256::from(1000),
};

let tx1 = Bytes::from_str("0x02f87483014a3482017e8459682f0084596830a98301f1d094b01866f195533de16eb929b73f87280693ca0cb480844e71d92dc001a0a658c18bdba29dd4022ee6640fdd143691230c12b3c8c86cf5c1a1f1682cc1e2a0248a28763541ebed2b87ecea63a7024b5c2b7de58539fa64c887b08f5faf29c1").unwrap();

let delta1 = ExecutionPayloadFlashblockDeltaV1 {
transactions: vec![tx1.clone()],
withdrawals: vec![],
state_root: Default::default(),
receipts_root: Default::default(),
logs_bloom: Default::default(),
gas_used: 21000,
block_hash: Default::default(),
};

let tx1_hash =
"0x3cbbc9a6811ac5b2a2e5780bdb67baffc04246a59f39e398be048f1b2d05460c".to_string();

let metadata1 = Metadata {
block_number,
receipts: {
let mut receipts = HashMap::default();
receipts.insert(
tx1_hash.clone(),
OpReceipt::Legacy(Receipt {
status: true.into(),
cumulative_gas_used: 21000,
logs: vec![],
}),
);
receipts
},
new_account_balances: HashMap::default(),
};

let payload1 = FlashblocksPayloadV1 {
index: 0,
payload_id: PayloadId::new([0; 8]),
base: Some(base),
diff: delta1,
metadata: serde_json::to_value(metadata1).unwrap(),
};

process_payload(payload1, cache.clone());

let tx_hashes1 = cache
.get::<Vec<String>>(&format!("tx_hashes:{}", block_number))
.unwrap();
assert_eq!(tx_hashes1.len(), 1);
assert_eq!(tx_hashes1[0], tx1_hash);

let tx2 = Bytes::from_str("0xf8cd82016d8316e5708302c01c94f39635f2adf40608255779ff742afe13de31f57780b8646e530e9700000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000001bc16d674ec8000000000000000000000000000000000000000000000000000156ddc81eed2a36d68302948ba0a608703e79b22164f74523d188a11f81c25a65dd59535bab1cd1d8b30d115f3ea07f4cfbbad77a139c9209d3bded89091867ff6b548dd714109c61d1f8e7a84d14").unwrap();

let tx2_hash =
"0xa6155b295085d3b87a3c86e342fe11c3b22f9952d0d85d9d34d223b7d6a17cd8".to_string();

let delta2 = ExecutionPayloadFlashblockDeltaV1 {
transactions: vec![tx1.clone(), tx2.clone()], // Note tx1 is repeated
withdrawals: vec![],
state_root: B256::repeat_byte(0x1),
receipts_root: B256::repeat_byte(0x2),
logs_bloom: Default::default(),
gas_used: 42000,
block_hash: B256::repeat_byte(0x3),
};

let metadata2 = Metadata {
block_number,
receipts: {
let mut receipts = HashMap::default();
receipts.insert(
tx1_hash.clone(),
OpReceipt::Legacy(Receipt {
status: true.into(),
cumulative_gas_used: 21000,
logs: vec![],
}),
);
receipts.insert(
tx2_hash.clone(),
OpReceipt::Legacy(Receipt {
status: true.into(),
cumulative_gas_used: 42000,
logs: vec![],
}),
);
receipts
},
new_account_balances: HashMap::default(),
};

let payload2 = FlashblocksPayloadV1 {
index: 1,
payload_id: PayloadId::new([0; 8]),
base: None,
diff: delta2,
metadata: serde_json::to_value(metadata2.clone()).unwrap(),
};

process_payload(payload2, cache.clone());

let tx_hashes2 = cache
.get::<Vec<String>>(&format!("tx_hashes:{}", block_number))
.unwrap();
assert_eq!(
tx_hashes2.len(),
2,
"Should have 2 unique transaction hashes"
);
assert_eq!(tx_hashes2[0], tx1_hash, "First hash should be tx1");
assert_eq!(tx_hashes2[1], tx2_hash, "Second hash should be tx2");

let delta3 = ExecutionPayloadFlashblockDeltaV1 {
transactions: vec![tx2.clone(), tx1.clone()], // Different order
withdrawals: vec![],
state_root: B256::repeat_byte(0x1),
receipts_root: B256::repeat_byte(0x2),
logs_bloom: Default::default(),
gas_used: 42000,
block_hash: B256::repeat_byte(0x3),
};

let payload3 = FlashblocksPayloadV1 {
index: 2,
payload_id: PayloadId::new([0; 8]),
base: None,
diff: delta3,
metadata: serde_json::to_value(metadata2).unwrap(), // Same metadata
};

process_payload(payload3, cache.clone());

let tx_hashes3 = cache
.get::<Vec<String>>(&format!("tx_hashes:{}", block_number))
.unwrap();
assert_eq!(
tx_hashes3.len(),
2,
"Should still have 2 unique transaction hashes"
);
assert_eq!(tx_hashes3[0], tx1_hash, "First hash should be tx1");
assert_eq!(tx_hashes3[1], tx2_hash, "Second hash should be tx2");
}
}
1 change: 1 addition & 0 deletions crates/flashblocks-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod cache;
pub mod flashblocks;
mod metrics;
pub mod rpc;
pub mod verification;

#[cfg(test)]
mod integration;
12 changes: 12 additions & 0 deletions crates/flashblocks-rpc/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,16 @@ pub struct Metrics {

#[metric(describe = "Number of flashblocks in a block")]
pub flashblocks_in_block: Histogram,

#[metric(describe = "Count of successful block verifications")]
pub block_verification_success: Counter,

#[metric(describe = "Count of failed block verifications")]
pub block_verification_failure: Counter,

#[metric(describe = "Count of blocks not found in cache during verification")]
pub block_verification_not_found: Counter,

#[metric(describe = "Count of transaction count mismatches during verification")]
pub block_verification_tx_count_mismatch: Counter,
}
Loading