Skip to content

Commit

Permalink
add example for comparing subscriptions from different providers (#148)
Browse files Browse the repository at this point in the history
* new_heads example

* new_heads & pending_txs draft

* cleanup

* use take so it terminates

* fix clippy warnings

* prefix name with compare_ for better indexing

* add to readme

---------

Co-authored-by: zerosnacks <[email protected]>
  • Loading branch information
0xlosh and zerosnacks authored Oct 9, 2024
1 parent c184890 commit 0018187
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 0 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ This repository contains the following examples:
- [x] [Static encoding with `sol!`](./examples/advanced/examples/encoding_sol_static.rs)
- [x] [Using `foundry-fork-db`](./examples/advanced/examples/foundry_fork_db.rs)
- [x] [Wrapping `Provider` trait over reth-db](./examples/advanced/examples/reth_db_provider.rs)
- [x] Comparison
- [x] [Compare block headers between providers](./examples/comparison/examples/compare_block_headers.rs)
- [x] [Compare pending transactions between providers](./examples/comparison/examples/compare_pending_transactions.rs)

## Contributing

Expand Down
22 changes: 22 additions & 0 deletions examples/comparison/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "examples-comparison"
publish.workspace = true
version.workspace = true
edition.workspace = true
rust-version.workspace = true
authors.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true

[lints]
workspace = true

[dev-dependencies]
alloy.workspace = true

chrono = "0.4"
clap = { version = "4.5", features = ["derive"] }
eyre.workspace = true
futures-util.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
106 changes: 106 additions & 0 deletions examples/comparison/examples/compare_new_heads.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
//! Example of comparing new block headers from multiple providers.
use alloy::{
network::AnyNetwork,
providers::{Provider, ProviderBuilder},
};
use chrono::Utc;
use clap::Parser;
use eyre::Result;
use futures_util::StreamExt;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::mpsc;

#[derive(Debug, Parser)]
struct Cli {
#[clap(short = 'r', help = "rpcs to connect to, usage: -r <name>:<url> -r <name>:<url> ...")]
rpcs: Vec<String>,
}

#[tokio::main]
async fn main() -> Result<()> {
let cli = Cli::parse();
let rpcs: Vec<(&str, &str)> = cli.rpcs.iter().filter_map(|s| s.split_once(':')).collect();

let mut total_streams = 0;
let mut tasks = vec![];
let (sx, mut rx) = mpsc::unbounded_channel();
for (name, url) in rpcs {
let sx = sx.clone();
let name = Arc::new(name.to_string());
let url = url.to_string();

let provider = match ProviderBuilder::new().network::<AnyNetwork>().on_builtin(&url).await {
Ok(provider) => provider,
Err(e) => {
eprintln!("skipping {} at {} because of error: {}", name, url, e);
continue;
}
};

let mut stream = match provider.subscribe_blocks().await {
Ok(stream) => stream.into_stream().take(10),
Err(e) => {
eprintln!("skipping {} at {} because of error: {}", name, url, e);
continue;
}
};

total_streams += 1;

tasks.push(tokio::spawn(async move {
let _p = provider; // keep provider alive
while let Some(block) = stream.next().await {
if let Err(e) = sx.send((name.clone(), block, Utc::now())) {
eprintln!("sending to channel failed: {}", e);
}
}
}));
}

tokio::spawn(async move {
#[derive(Debug)]
struct TxTrack {
first_seen: chrono::DateTime<Utc>,
seen_by: Vec<(Arc<String>, chrono::DateTime<Utc>)>,
}

let mut tracker = HashMap::new();
while let Some((name, block, timestamp)) = rx.recv().await {
let block_number = block.header.number;
let track = tracker
.entry(block_number)
.and_modify(|t: &mut TxTrack| {
t.seen_by.push((name.clone(), timestamp));
})
.or_insert_with(|| TxTrack {
first_seen: timestamp,
seen_by: vec![(name.clone(), timestamp)],
});

if track.seen_by.len() == total_streams {
let mut msg = String::new();
for (name, timestamp) in &track.seen_by {
msg.push_str(&format!(
"{} +{}ms ",
name,
(*timestamp - track.first_seen).num_milliseconds()
));
}
println!(
"block #{} at {} - {}",
block_number,
track.first_seen.timestamp_millis(),
msg
);
tracker.remove(&block_number);
}
}
});

for task in tasks {
task.await?;
}

Ok(())
}
113 changes: 113 additions & 0 deletions examples/comparison/examples/compare_pending_txs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
//! Example of comparing pending transactions from multiple providers.
use alloy::{
network::AnyNetwork,
providers::{Provider, ProviderBuilder},
};
use chrono::Utc;
use clap::Parser;
use eyre::Result;
use futures_util::StreamExt;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::mpsc;

#[derive(Debug, Parser)]
struct Cli {
#[clap(short = 'r', help = "rpcs to connect to, usage: -r <name>:<url> -r <name>:<url> ...")]
rpcs: Vec<String>,
}

#[tokio::main]
async fn main() -> Result<()> {
let cli = Cli::parse();
let tmp: Vec<(&str, &str)> = cli.rpcs.iter().filter_map(|s| s.split_once(':')).collect();
let mut rpcs = vec![];
for (name, url) in tmp {
if url.starts_with("http") {
eprintln!("skipping {} at {} because it is not a websocket/ipc endpoint", name, url);
continue;
}
rpcs.push((name, url));
}

let mut total_streams = 0;
let mut tasks = vec![];
let (sx, mut rx) = mpsc::unbounded_channel();
for (name, url) in &rpcs {
let sx = sx.clone();
let name = Arc::new(name.to_string());
let url = url.to_string();

let provider = match ProviderBuilder::new().network::<AnyNetwork>().on_builtin(&url).await {
Ok(provider) => provider,
Err(e) => {
eprintln!("skipping {} at {} because of error: {}", name, url, e);
continue;
}
};

let mut stream = match provider.subscribe_pending_transactions().await {
Ok(stream) => stream.into_stream().take(50),
Err(e) => {
eprintln!("skipping {} at {} because of error: {}", name, url, e);
continue;
}
};

total_streams += 1;

tasks.push(tokio::spawn(async move {
let _p = provider; // keep provider alive
while let Some(tx_hash) = stream.next().await {
if let Err(e) = sx.send((name.clone(), tx_hash, Utc::now())) {
eprintln!("sending to channel failed: {}", e);
}
}
}));
}

tokio::spawn(async move {
#[derive(Debug)]
struct TxTrack {
first_seen: chrono::DateTime<Utc>,
seen_by: Vec<(Arc<String>, chrono::DateTime<Utc>)>,
}

let mut tracker = HashMap::new();
while let Some((name, tx_hash, timestamp)) = rx.recv().await {
let track = tracker
.entry(tx_hash)
.and_modify(|t: &mut TxTrack| {
t.seen_by.push((name.clone(), timestamp));
})
.or_insert_with(|| TxTrack {
first_seen: timestamp,
seen_by: vec![(name.clone(), timestamp)],
});

if track.seen_by.len() == total_streams {
let mut msg = String::new();
for (name, timestamp) in &track.seen_by {
msg.push_str(&format!(
"{} +{}ms ",
name,
(*timestamp - track.first_seen).num_milliseconds()
));
}
println!(
"pending tx #{} at {} - {}",
tx_hash,
track.first_seen.timestamp_millis(),
msg
);
tracker.remove(&tx_hash);
}
}
});

for task in tasks {
task.await?;
}

Ok(())
}

0 comments on commit 0018187

Please sign in to comment.