Skip to content

Commit b5d0db9

Browse files
authored
feat: collect metrics on submitBlock transaction revert/success (#48)
* feat: collect metrics on submitBlock transaction revert/success * address PR comments
1 parent 56371c8 commit b5d0db9

File tree

4 files changed

+97
-1
lines changed

4 files changed

+97
-1
lines changed

bin/builder.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
use builder::config::BuilderConfig;
44
use builder::service::serve_builder_with_span;
55
use builder::tasks::block::BlockBuilder;
6+
use builder::tasks::metrics::MetricsTask;
67
use builder::tasks::oauth::Authenticator;
78
use builder::tasks::submit::SubmitTask;
89
use metrics_exporter_prometheus::PrometheusBuilder;
@@ -27,13 +28,18 @@ async fn main() -> eyre::Result<()> {
2728
let zenith = config.connect_zenith(host_provider.clone());
2829

2930
let builder = BlockBuilder::new(&config, authenticator.clone(), ru_provider);
31+
32+
let metrics = MetricsTask { host_provider: host_provider.clone() };
33+
let (tx_channel, metrics_jh) = metrics.spawn();
34+
3035
let submit = SubmitTask {
3136
authenticator: authenticator.clone(),
3237
host_provider,
3338
zenith,
3439
client: reqwest::Client::new(),
3540
sequencer_signer,
3641
config: config.clone(),
42+
outbound_tx_channel: tx_channel,
3743
};
3844

3945
let authenticator_jh = authenticator.spawn();
@@ -47,6 +53,9 @@ async fn main() -> eyre::Result<()> {
4753
_ = submit_jh => {
4854
tracing::info!("submit finished");
4955
},
56+
_ = metrics_jh => {
57+
tracing::info!("metrics finished");
58+
},
5059
_ = build_jh => {
5160
tracing::info!("build finished");
5261
}

src/tasks/metrics.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
use crate::config::Provider;
2+
use alloy::{primitives::TxHash, providers::Provider as _};
3+
use metrics::{counter, histogram};
4+
use std::time::Instant;
5+
use tokio::{sync::mpsc, task::JoinHandle};
6+
use tracing::{debug, error};
7+
8+
/// Collects metrics on transactions sent by the Builder
9+
#[derive(Debug, Clone)]
10+
pub struct MetricsTask {
11+
/// Ethereum Provider
12+
pub host_provider: Provider,
13+
}
14+
15+
impl MetricsTask {
16+
/// Given a transaction hash, record metrics on the result of the transaction mining
17+
pub async fn log_tx(&self, pending_tx_hash: TxHash) {
18+
// start timer when tx hash is received
19+
let start: Instant = Instant::now();
20+
21+
// wait for the tx to mine, get its receipt
22+
let receipt_result =
23+
self.host_provider.clone().get_transaction_receipt(pending_tx_hash).await;
24+
25+
match receipt_result {
26+
Ok(maybe_receipt) => {
27+
match maybe_receipt {
28+
Some(receipt) => {
29+
// record how long it took to mine the transaction
30+
// potential improvement: use the block timestamp to calculate the time elapsed
31+
histogram!("metrics.tx_mine_time")
32+
.record(start.elapsed().as_millis() as f64);
33+
34+
// log whether the transaction reverted
35+
if receipt.status() {
36+
counter!("metrics.tx_reverted").increment(1);
37+
debug!(tx_hash = %pending_tx_hash, "tx reverted");
38+
} else {
39+
counter!("metrics.tx_succeeded").increment(1);
40+
debug!(tx_hash = %pending_tx_hash, "tx succeeded");
41+
}
42+
}
43+
None => {
44+
counter!("metrics.no_receipt").increment(1);
45+
error!("no receipt found for tx hash");
46+
}
47+
}
48+
}
49+
Err(e) => {
50+
counter!("metrics.rpc_error").increment(1);
51+
error!(error = ?e, "rpc error");
52+
}
53+
}
54+
}
55+
56+
/// Spawns the task which collects metrics on pending transactions
57+
pub fn spawn(self) -> (mpsc::UnboundedSender<TxHash>, JoinHandle<()>) {
58+
let (sender, mut inbound) = mpsc::unbounded_channel();
59+
let handle = tokio::spawn(async move {
60+
debug!("metrics task spawned");
61+
loop {
62+
if let Some(pending_tx_hash) = inbound.recv().await {
63+
let this = self.clone();
64+
tokio::spawn(async move {
65+
debug!("received tx hash");
66+
let that = this.clone();
67+
that.log_tx(pending_tx_hash).await;
68+
debug!("logged tx metrics");
69+
});
70+
} else {
71+
tracing::debug!("upstream task gone");
72+
break;
73+
}
74+
}
75+
});
76+
77+
(sender, handle)
78+
}
79+
}

src/tasks/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
pub mod block;
22
pub mod bundler;
3+
pub mod metrics;
34
pub mod oauth;
45
pub mod submit;
56
pub mod tx_poller;

src/tasks/submit.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use alloy::{
88
consensus::{constants::GWEI_TO_WEI, SimpleCoder},
99
eips::BlockNumberOrTag,
1010
network::{TransactionBuilder, TransactionBuilder4844},
11-
primitives::{FixedBytes, U256},
11+
primitives::{FixedBytes, TxHash, U256},
1212
providers::{Provider as _, SendableTx, WalletProvider},
1313
rpc::types::eth::TransactionRequest,
1414
signers::Signer,
@@ -59,6 +59,8 @@ pub struct SubmitTask {
5959
pub config: crate::config::BuilderConfig,
6060
/// Authenticator
6161
pub authenticator: Authenticator,
62+
// Channel over which to send pending transactions
63+
pub outbound_tx_channel: mpsc::UnboundedSender<TxHash>,
6264
}
6365

6466
impl SubmitTask {
@@ -192,6 +194,11 @@ impl SubmitTask {
192194
spawn_provider_send!(&host_provider, &tx);
193195
}
194196

197+
// send the in-progress transaction over the outbound_tx_channel
198+
if self.outbound_tx_channel.send(*tx.tx_hash()).is_err() {
199+
tracing::error!("receipts task gone");
200+
}
201+
195202
// question mark unwraps join error, which would be an internal panic
196203
// then if let checks for rpc error
197204
if let Err(e) = fut.await? {

0 commit comments

Comments
 (0)