diff --git a/README.md b/README.md index 79f6ee2..06a6254 100644 --- a/README.md +++ b/README.md @@ -111,6 +111,9 @@ This repository contains the following examples: - [x] [Static encoding with `sol!`](./examples/advanced/examples/encoding_sol_static.rs) - [x] [Using `foundry-fork-db`](./examples/advanced/examples/foundry_fork_db.rs) - [x] [Wrapping `Provider` trait over reth-db](./examples/advanced/examples/reth_db_provider.rs) +- [x] Comparison + - [x] [Compare block headers between providers](./examples/comparison/examples/compare_block_headers.rs) + - [x] [Compare pending transactions between providers](./examples/comparison/examples/compare_pending_transactions.rs) ## Contributing diff --git a/examples/comparison/Cargo.toml b/examples/comparison/Cargo.toml new file mode 100644 index 0000000..a4d1661 --- /dev/null +++ b/examples/comparison/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "examples-comparison" +publish.workspace = true +version.workspace = true +edition.workspace = true +rust-version.workspace = true +authors.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true + +[lints] +workspace = true + +[dev-dependencies] +alloy.workspace = true + +chrono = "0.4" +clap = { version = "4.5", features = ["derive"] } +eyre.workspace = true +futures-util.workspace = true +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } diff --git a/examples/comparison/examples/compare_new_heads.rs b/examples/comparison/examples/compare_new_heads.rs new file mode 100644 index 0000000..772dfc3 --- /dev/null +++ b/examples/comparison/examples/compare_new_heads.rs @@ -0,0 +1,106 @@ +//! Example of comparing new block headers from multiple providers. + +use alloy::{ + network::AnyNetwork, + providers::{Provider, ProviderBuilder}, +}; +use chrono::Utc; +use clap::Parser; +use eyre::Result; +use futures_util::StreamExt; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::mpsc; + +#[derive(Debug, Parser)] +struct Cli { + #[clap(short = 'r', help = "rpcs to connect to, usage: -r : -r : ...")] + rpcs: Vec, +} + +#[tokio::main] +async fn main() -> Result<()> { + let cli = Cli::parse(); + let rpcs: Vec<(&str, &str)> = cli.rpcs.iter().filter_map(|s| s.split_once(':')).collect(); + + let mut total_streams = 0; + let mut tasks = vec![]; + let (sx, mut rx) = mpsc::unbounded_channel(); + for (name, url) in rpcs { + let sx = sx.clone(); + let name = Arc::new(name.to_string()); + let url = url.to_string(); + + let provider = match ProviderBuilder::new().network::().on_builtin(&url).await { + Ok(provider) => provider, + Err(e) => { + eprintln!("skipping {} at {} because of error: {}", name, url, e); + continue; + } + }; + + let mut stream = match provider.subscribe_blocks().await { + Ok(stream) => stream.into_stream().take(10), + Err(e) => { + eprintln!("skipping {} at {} because of error: {}", name, url, e); + continue; + } + }; + + total_streams += 1; + + tasks.push(tokio::spawn(async move { + let _p = provider; // keep provider alive + while let Some(block) = stream.next().await { + if let Err(e) = sx.send((name.clone(), block, Utc::now())) { + eprintln!("sending to channel failed: {}", e); + } + } + })); + } + + tokio::spawn(async move { + #[derive(Debug)] + struct TxTrack { + first_seen: chrono::DateTime, + seen_by: Vec<(Arc, chrono::DateTime)>, + } + + let mut tracker = HashMap::new(); + while let Some((name, block, timestamp)) = rx.recv().await { + let block_number = block.header.number; + let track = tracker + .entry(block_number) + .and_modify(|t: &mut TxTrack| { + t.seen_by.push((name.clone(), timestamp)); + }) + .or_insert_with(|| TxTrack { + first_seen: timestamp, + seen_by: vec![(name.clone(), timestamp)], + }); + + if track.seen_by.len() == total_streams { + let mut msg = String::new(); + for (name, timestamp) in &track.seen_by { + msg.push_str(&format!( + "{} +{}ms ", + name, + (*timestamp - track.first_seen).num_milliseconds() + )); + } + println!( + "block #{} at {} - {}", + block_number, + track.first_seen.timestamp_millis(), + msg + ); + tracker.remove(&block_number); + } + } + }); + + for task in tasks { + task.await?; + } + + Ok(()) +} diff --git a/examples/comparison/examples/compare_pending_txs.rs b/examples/comparison/examples/compare_pending_txs.rs new file mode 100644 index 0000000..982d89a --- /dev/null +++ b/examples/comparison/examples/compare_pending_txs.rs @@ -0,0 +1,113 @@ +//! Example of comparing pending transactions from multiple providers. + +use alloy::{ + network::AnyNetwork, + providers::{Provider, ProviderBuilder}, +}; +use chrono::Utc; +use clap::Parser; +use eyre::Result; +use futures_util::StreamExt; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::mpsc; + +#[derive(Debug, Parser)] +struct Cli { + #[clap(short = 'r', help = "rpcs to connect to, usage: -r : -r : ...")] + rpcs: Vec, +} + +#[tokio::main] +async fn main() -> Result<()> { + let cli = Cli::parse(); + let tmp: Vec<(&str, &str)> = cli.rpcs.iter().filter_map(|s| s.split_once(':')).collect(); + let mut rpcs = vec![]; + for (name, url) in tmp { + if url.starts_with("http") { + eprintln!("skipping {} at {} because it is not a websocket/ipc endpoint", name, url); + continue; + } + rpcs.push((name, url)); + } + + let mut total_streams = 0; + let mut tasks = vec![]; + let (sx, mut rx) = mpsc::unbounded_channel(); + for (name, url) in &rpcs { + let sx = sx.clone(); + let name = Arc::new(name.to_string()); + let url = url.to_string(); + + let provider = match ProviderBuilder::new().network::().on_builtin(&url).await { + Ok(provider) => provider, + Err(e) => { + eprintln!("skipping {} at {} because of error: {}", name, url, e); + continue; + } + }; + + let mut stream = match provider.subscribe_pending_transactions().await { + Ok(stream) => stream.into_stream().take(50), + Err(e) => { + eprintln!("skipping {} at {} because of error: {}", name, url, e); + continue; + } + }; + + total_streams += 1; + + tasks.push(tokio::spawn(async move { + let _p = provider; // keep provider alive + while let Some(tx_hash) = stream.next().await { + if let Err(e) = sx.send((name.clone(), tx_hash, Utc::now())) { + eprintln!("sending to channel failed: {}", e); + } + } + })); + } + + tokio::spawn(async move { + #[derive(Debug)] + struct TxTrack { + first_seen: chrono::DateTime, + seen_by: Vec<(Arc, chrono::DateTime)>, + } + + let mut tracker = HashMap::new(); + while let Some((name, tx_hash, timestamp)) = rx.recv().await { + let track = tracker + .entry(tx_hash) + .and_modify(|t: &mut TxTrack| { + t.seen_by.push((name.clone(), timestamp)); + }) + .or_insert_with(|| TxTrack { + first_seen: timestamp, + seen_by: vec![(name.clone(), timestamp)], + }); + + if track.seen_by.len() == total_streams { + let mut msg = String::new(); + for (name, timestamp) in &track.seen_by { + msg.push_str(&format!( + "{} +{}ms ", + name, + (*timestamp - track.first_seen).num_milliseconds() + )); + } + println!( + "pending tx #{} at {} - {}", + tx_hash, + track.first_seen.timestamp_millis(), + msg + ); + tracker.remove(&tx_hash); + } + } + }); + + for task in tasks { + task.await?; + } + + Ok(()) +}