diff --git a/Cargo.lock b/Cargo.lock index 8efa707..2c53de8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9661,6 +9661,7 @@ dependencies = [ "jito-tip-router-program", "log", "meta-merkle-tree", + "rand 0.8.5", "serde", "serde_json", "solana-account-decoder 1.18.26 (git+https://github.com/jito-foundation/jito-solana.git?rev=0bbcbe476c0e728907ac01135115e661c16538e5)", @@ -9677,10 +9678,12 @@ dependencies = [ "solana-program-test 1.18.26 (git+https://github.com/jito-foundation/jito-solana.git?rev=0bbcbe476c0e728907ac01135115e661c16538e5)", "solana-rpc", "solana-rpc-client", + "solana-rpc-client-api", "solana-runtime 1.18.26 (git+https://github.com/jito-foundation/jito-solana.git?rev=0bbcbe476c0e728907ac01135115e661c16538e5)", "solana-sdk", "solana-stake-program 1.18.26 (git+https://github.com/jito-foundation/jito-solana.git?rev=0bbcbe476c0e728907ac01135115e661c16538e5)", "solana-streamer", + "solana-transaction-status", "solana-unified-scheduler-pool", "solana-vote 1.18.26 (git+https://github.com/jito-foundation/jito-solana.git?rev=0bbcbe476c0e728907ac01135115e661c16538e5)", "spl-memo", diff --git a/Cargo.toml b/Cargo.toml index fd7cd92..ce69f99 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,6 +81,7 @@ solana-transaction-status = { package = "solana-transaction-status", git = "http solana-program-test = { package = "solana-program-test", git = "https://github.com/jito-foundation/jito-solana.git", rev = "0bbcbe476c0e728907ac01135115e661c16538e5" } solana-rpc-client = { package = "solana-rpc-client", git = "https://github.com/jito-foundation/jito-solana.git", rev = "0bbcbe476c0e728907ac01135115e661c16538e5" } solana-rpc = { git = "https://github.com/jito-foundation/jito-solana.git", rev = "0bbcbe476c0e728907ac01135115e661c16538e5" } +solana-rpc-client-api = { git = "https://github.com/jito-foundation/jito-solana.git", rev = "0bbcbe476c0e728907ac01135115e661c16538e5" } solana-runtime = { package = "solana-runtime", git = "https://github.com/jito-foundation/jito-solana.git", rev = "0bbcbe476c0e728907ac01135115e661c16538e5" } solana-streamer = { git = "https://github.com/jito-foundation/jito-solana.git", rev = "0bbcbe476c0e728907ac01135115e661c16538e5" } solana-sdk = { package = "solana-sdk", git = "https://github.com/jito-foundation/jito-solana.git", rev = "0bbcbe476c0e728907ac01135115e661c16538e5" } diff --git a/meta_merkle_tree/src/generated_merkle_tree.rs b/meta_merkle_tree/src/generated_merkle_tree.rs index 2e86338..a028cf4 100644 --- a/meta_merkle_tree/src/generated_merkle_tree.rs +++ b/meta_merkle_tree/src/generated_merkle_tree.rs @@ -110,6 +110,15 @@ impl GeneratedMerkleTreeCollection { slot: stake_meta_collection.slot, }) } + + /// Load a serialized GeneratedMerkleTreeCollection from file path + pub fn new_from_file(path: &PathBuf) -> Result { + let file = File::open(path)?; + let reader = BufReader::new(file); + let tree: Self = serde_json::from_reader(reader)?; + + Ok(tree) + } } #[derive(Clone, Eq, Debug, Hash, PartialEq, Deserialize, Serialize)] diff --git a/tip-router-operator-cli/Cargo.toml b/tip-router-operator-cli/Cargo.toml index 26fa851..18b7b68 100644 --- a/tip-router-operator-cli/Cargo.toml +++ b/tip-router-operator-cli/Cargo.toml @@ -25,6 +25,7 @@ jito-tip-router-core = { workspace = true } jito-tip-router-program = { workspace = true } log = { workspace = true } meta-merkle-tree = { workspace = true } +rand = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } solana-account-decoder = { workspace = true } @@ -40,11 +41,13 @@ solana-metrics = { workspace = true } solana-program = { workspace = true } solana-rpc = { workspace = true } solana-rpc-client = { workspace = true } +solana-rpc-client-api = { workspace = true } solana-runtime = { workspace = true } solana-sdk = { workspace = true } solana-stake-program = { workspace = true } solana-streamer = { workspace = true } solana-unified-scheduler-pool = { workspace = true } +solana-transaction-status = { workspace = true } solana-vote = { workspace = true } spl-memo = { workspace = true } thiserror = { workspace = true } diff --git a/tip-router-operator-cli/src/claim.rs b/tip-router-operator-cli/src/claim.rs new file mode 100644 index 0000000..a47649a --- /dev/null +++ b/tip-router-operator-cli/src/claim.rs @@ -0,0 +1,435 @@ +use std::{ + collections::HashMap, + sync::Arc, + time::{Duration, Instant}, +}; + +use anchor_lang::AccountDeserialize; +use itertools::Itertools; +use jito_tip_distribution_sdk::{ + jito_tip_distribution::accounts::ClaimStatus, TipDistributionAccount, CLAIM_STATUS_SIZE, + CONFIG_SEED, +}; +use jito_tip_router_client::instructions::ClaimWithPayerBuilder; +use jito_tip_router_core::{account_payer::AccountPayer, config::Config}; +use log::{info, warn}; +use meta_merkle_tree::generated_merkle_tree::GeneratedMerkleTreeCollection; +use rand::{prelude::SliceRandom, thread_rng}; +use solana_client::{nonblocking::rpc_client::RpcClient, rpc_config::RpcSimulateTransactionConfig}; +use solana_metrics::datapoint_info; +use solana_sdk::{ + account::Account, commitment_config::CommitmentConfig, + compute_budget::ComputeBudgetInstruction, + fee_calculator::DEFAULT_TARGET_LAMPORTS_PER_SIGNATURE, native_token::LAMPORTS_PER_SOL, + pubkey::Pubkey, signature::Keypair, signer::Signer, system_program, transaction::Transaction, +}; +use thiserror::Error; + +use crate::rpc_utils::{get_batched_accounts, send_until_blockhash_expires}; + +#[derive(Error, Debug)] +pub enum ClaimMevError { + #[error(transparent)] + IoError(#[from] std::io::Error), + + #[error(transparent)] + JsonError(#[from] serde_json::Error), + + #[error(transparent)] + AnchorError(anchor_lang::error::Error), + + #[error(transparent)] + RpcError(#[from] solana_rpc_client_api::client_error::Error), + + #[error("Expected to have at least {desired_balance} lamports in {payer:?}. Current balance is {start_balance} lamports. Deposit {sol_to_deposit} SOL to continue.")] + InsufficientBalance { + desired_balance: u64, + payer: Pubkey, + start_balance: u64, + sol_to_deposit: u64, + }, + + #[error("Not finished with job, transactions left {transactions_left}")] + NotFinished { transactions_left: usize }, + + #[error("UncaughtError {e:?}")] + UncaughtError { e: String }, +} + +pub async fn claim_mev_tips( + merkle_trees: &GeneratedMerkleTreeCollection, + rpc_url: String, + rpc_sender_url: String, + tip_distribution_program_id: Pubkey, + tip_router_program_id: Pubkey, + ncn_address: Pubkey, + keypair: Arc, + max_loop_duration: Duration, + micro_lamports: u64, +) -> Result<(), ClaimMevError> { + let rpc_client = RpcClient::new_with_timeout_and_commitment( + rpc_url, + Duration::from_secs(1800), + CommitmentConfig::confirmed(), + ); + let rpc_sender_client = RpcClient::new(rpc_sender_url); + + let start = Instant::now(); + while start.elapsed() <= max_loop_duration { + let mut all_claim_transactions = get_claim_transactions_for_valid_unclaimed( + &rpc_client, + merkle_trees, + tip_distribution_program_id, + tip_router_program_id, + ncn_address, + micro_lamports, + keypair.pubkey(), + ) + .await?; + + datapoint_info!( + "tip_router_cli.claim_mev_tips-send_summary", + ("claim_transactions_left", all_claim_transactions.len(), i64), + ); + + if all_claim_transactions.is_empty() { + return Ok(()); + } + + all_claim_transactions.shuffle(&mut thread_rng()); + let transactions: Vec<_> = all_claim_transactions.into_iter().take(300).collect(); + + // only check balance for the ones we need to currently send since reclaim rent running in parallel + if let Some((start_balance, desired_balance, sol_to_deposit)) = + is_sufficient_balance(&keypair.pubkey(), &rpc_client, transactions.len() as u64).await + { + return Err(ClaimMevError::InsufficientBalance { + desired_balance, + payer: keypair.pubkey(), + start_balance, + sol_to_deposit, + }); + } + + let blockhash = rpc_client.get_latest_blockhash().await?; + let _ = send_until_blockhash_expires( + &rpc_client, + &rpc_sender_client, + transactions, + blockhash, + &keypair, + ) + .await; + } + + let transactions = get_claim_transactions_for_valid_unclaimed( + &rpc_client, + merkle_trees, + tip_distribution_program_id, + tip_router_program_id, + ncn_address, + micro_lamports, + keypair.pubkey(), + ) + .await?; + if transactions.is_empty() { + return Ok(()); + } + + // if more transactions left, we'll simulate them all to make sure its not an uncaught error + let mut is_error = false; + let mut error_str = String::new(); + for tx in &transactions { + match rpc_client + .simulate_transaction_with_config( + tx, + RpcSimulateTransactionConfig { + sig_verify: false, + replace_recent_blockhash: true, + commitment: Some(CommitmentConfig::processed()), + ..RpcSimulateTransactionConfig::default() + }, + ) + .await + { + Ok(_) => {} + Err(e) => { + error_str = e.to_string(); + is_error = true; + + match e.get_transaction_error() { + None => { + break; + } + Some(e) => { + warn!("transaction error. tx: {:?} error: {:?}", tx, e); + break; + } + } + } + } + } + + if is_error { + Err(ClaimMevError::UncaughtError { e: error_str }) + } else { + Err(ClaimMevError::NotFinished { + transactions_left: transactions.len(), + }) + } +} + +pub async fn get_claim_transactions_for_valid_unclaimed( + rpc_client: &RpcClient, + merkle_trees: &GeneratedMerkleTreeCollection, + tip_distribution_program_id: Pubkey, + tip_router_program_id: Pubkey, + ncn_address: Pubkey, + micro_lamports: u64, + payer_pubkey: Pubkey, +) -> Result, ClaimMevError> { + let tip_router_config_address = + Config::find_program_address(&tip_router_program_id, &ncn_address).0; + let tree_nodes = merkle_trees + .generated_merkle_trees + .iter() + .filter_map(|tree| { + if tree.merkle_root_upload_authority != tip_router_config_address { + return None; + } + Some(&tree.tree_nodes) + }) + .flatten() + .collect_vec(); + + info!( + "reading {} tip distribution related accounts for epoch {}", + tree_nodes.len(), + merkle_trees.epoch + ); + + let start = Instant::now(); + + let tda_pubkeys = merkle_trees + .generated_merkle_trees + .iter() + .map(|tree| tree.tip_distribution_account) + .collect_vec(); + let tdas: HashMap = get_batched_accounts(rpc_client, &tda_pubkeys) + .await? + .into_iter() + .filter_map(|(pubkey, a)| Some((pubkey, a?))) + .collect(); + + let claimant_pubkeys = tree_nodes + .iter() + .map(|tree_node| tree_node.claimant) + .collect_vec(); + let claimants: HashMap = get_batched_accounts(rpc_client, &claimant_pubkeys) + .await? + .into_iter() + .filter_map(|(pubkey, a)| Some((pubkey, a?))) + .collect(); + + let claim_status_pubkeys = tree_nodes + .iter() + .map(|tree_node| tree_node.claim_status_pubkey) + .collect_vec(); + let claim_statuses: HashMap = + get_batched_accounts(rpc_client, &claim_status_pubkeys) + .await? + .into_iter() + .filter_map(|(pubkey, a)| Some((pubkey, a?))) + .collect(); + + let elapsed_us = start.elapsed().as_micros(); + + // can be helpful for determining mismatch in state between requested and read + datapoint_info!( + "tip_router_cli.get_claim_transactions_account_data", + ("elapsed_us", elapsed_us, i64), + ("tdas", tda_pubkeys.len(), i64), + ("tdas_onchain", tdas.len(), i64), + ("claimants", claimant_pubkeys.len(), i64), + ("claimants_onchain", claimants.len(), i64), + ("claim_statuses", claim_status_pubkeys.len(), i64), + ("claim_statuses_onchain", claim_statuses.len(), i64), + ); + + let transactions = build_mev_claim_transactions( + tip_distribution_program_id, + tip_router_program_id, + merkle_trees, + tdas, + claimants, + claim_statuses, + micro_lamports, + payer_pubkey, + ncn_address, + ); + + Ok(transactions) +} + +/// Returns a list of claim transactions for valid, unclaimed MEV tips +/// A valid, unclaimed transaction consists of the following: +/// - there must be lamports to claim for the tip distribution account. +/// - there must be a merkle root. +/// - the claimant (typically a stake account) must exist. +/// - the claimant (typically a stake account) must have a non-zero amount of tips to claim +/// - the claimant must have enough lamports post-claim to be rent-exempt. +/// - note: there aren't any rent exempt accounts on solana mainnet anymore. +/// - it must not have already been claimed. +fn build_mev_claim_transactions( + tip_distribution_program_id: Pubkey, + tip_router_program_id: Pubkey, + merkle_trees: &GeneratedMerkleTreeCollection, + tdas: HashMap, + claimants: HashMap, + claim_status: HashMap, + micro_lamports: u64, + payer_pubkey: Pubkey, + ncn_address: Pubkey, +) -> Vec { + let tip_router_config_address = + Config::find_program_address(&tip_router_program_id, &ncn_address).0; + let tip_router_account_payer = + AccountPayer::find_program_address(&tip_router_program_id, &ncn_address).0; + + let tip_distribution_accounts: HashMap = tdas + .iter() + .filter_map(|(pubkey, account)| { + Some(( + *pubkey, + TipDistributionAccount::try_deserialize(&mut account.data.as_slice()).ok()?, + )) + }) + .collect(); + + let claim_statuses: HashMap = claim_status + .iter() + .filter_map(|(pubkey, account)| { + Some(( + *pubkey, + ClaimStatus::try_deserialize(&mut account.data.as_slice()).ok()?, + )) + }) + .collect(); + + datapoint_info!( + "tip_router_cli.build_mev_claim_transactions", + ( + "tip_distribution_accounts", + tip_distribution_accounts.len(), + i64 + ), + ("claim_statuses", claim_statuses.len(), i64), + ); + + let tip_distribution_config = + Pubkey::find_program_address(&[CONFIG_SEED], &tip_distribution_program_id).0; + + let mut instructions = Vec::with_capacity(claimants.len()); + for tree in &merkle_trees.generated_merkle_trees { + if tree.max_total_claim == 0 { + continue; + } + + // if unwrap panics, there's a bug in the merkle tree code because the merkle tree code relies on the state + // of the chain to claim. + let tip_distribution_account = tip_distribution_accounts + .get(&tree.tip_distribution_account) + .unwrap(); + + // can continue here, as there might be tip distribution accounts this account doesn't upload for + if tip_distribution_account.merkle_root.is_none() + || tip_distribution_account.merkle_root_upload_authority != tip_router_config_address + { + continue; + } + + for node in &tree.tree_nodes { + // doesn't make sense to claim for claimants that don't exist anymore + // can't claim for something already claimed + // don't need to claim for claimants that get 0 MEV + if claimants.get(&node.claimant).is_none() + || claim_statuses.get(&node.claim_status_pubkey).is_some() + || node.amount == 0 + { + continue; + } + + let claim_with_payer_ix = ClaimWithPayerBuilder::new() + .account_payer(tip_router_account_payer) + .ncn(ncn_address) + .config(tip_router_config_address) + .tip_distribution_program(tip_distribution_program_id) + .tip_distribution_config(tip_distribution_config) + .tip_distribution_account(tree.tip_distribution_account) + .claim_status(node.claim_status_pubkey) + .claimant(node.claimant) + .system_program(system_program::id()) + .proof(node.proof.clone().unwrap()) + .amount(node.amount) + .bump(node.claim_status_bump) + .instruction(); + + instructions.push(claim_with_payer_ix); + } + } + + // TODO (LB): see if we can do >1 claim here + let transactions: Vec = instructions + .into_iter() + .map(|claim_ix| { + // helps get txs into block easier since default is 400k CUs + let compute_limit_ix = ComputeBudgetInstruction::set_compute_unit_limit(100_000); + let priority_fee_ix = ComputeBudgetInstruction::set_compute_unit_price(micro_lamports); + Transaction::new_with_payer( + &[compute_limit_ix, priority_fee_ix, claim_ix], + Some(&payer_pubkey), + ) + }) + .collect(); + + transactions +} + +/// heuristic to make sure we have enough funds to cover the rent costs if epoch has many validators +/// If insufficient funds, returns start balance, desired balance, and amount of sol to deposit +async fn is_sufficient_balance( + payer: &Pubkey, + rpc_client: &RpcClient, + instruction_count: u64, +) -> Option<(u64, u64, u64)> { + let start_balance = rpc_client + .get_balance(payer) + .await + .expect("Failed to get starting balance"); + // most amounts are for 0 lamports. had 1736 non-zero claims out of 164742 + let min_rent_per_claim = rpc_client + .get_minimum_balance_for_rent_exemption(CLAIM_STATUS_SIZE) + .await + .expect("Failed to calculate min rent"); + let desired_balance = instruction_count + .checked_mul( + min_rent_per_claim + .checked_add(DEFAULT_TARGET_LAMPORTS_PER_SIGNATURE) + .unwrap(), + ) + .unwrap(); + if start_balance < desired_balance { + let sol_to_deposit = desired_balance + .checked_sub(start_balance) + .unwrap() + .checked_add(LAMPORTS_PER_SOL) + .unwrap() + .checked_sub(1) + .unwrap() + .checked_div(LAMPORTS_PER_SOL) + .unwrap(); // rounds up to nearest sol + Some((start_balance, desired_balance, sol_to_deposit)) + } else { + None + } +} diff --git a/tip-router-operator-cli/src/cli.rs b/tip-router-operator-cli/src/cli.rs index 09169c5..38b2939 100644 --- a/tip-router-operator-cli/src/cli.rs +++ b/tip-router-operator-cli/src/cli.rs @@ -99,4 +99,25 @@ pub enum Commands { #[arg(long, env)] epoch: u64, }, + ClaimTips { + /// Tip distribution program ID + #[arg(long, env)] + tip_distribution_program_id: Pubkey, + + /// Tip router program ID + #[arg(long, env)] + tip_router_program_id: Pubkey, + + /// NCN address + #[arg(long, env)] + ncn_address: Pubkey, + + /// The price to pay for priority fee + #[arg(long, env, default_value_t = 1)] + micro_lamports: u64, + + /// The epoch to Claim tips for + #[arg(long, env)] + epoch: u64, + }, } diff --git a/tip-router-operator-cli/src/lib.rs b/tip-router-operator-cli/src/lib.rs index 03f91f5..c285756 100644 --- a/tip-router-operator-cli/src/lib.rs +++ b/tip-router-operator-cli/src/lib.rs @@ -2,12 +2,14 @@ pub mod ledger_utils; pub mod stake_meta_generator; pub mod tip_router; pub use crate::cli::{Cli, Commands}; +pub mod claim; pub mod cli; pub use crate::process_epoch::process_epoch; pub mod arg_matches; pub mod backup_snapshots; pub mod load_and_process_ledger; pub mod process_epoch; +pub mod rpc_utils; pub mod submit; use std::fs::{self, File}; @@ -28,7 +30,7 @@ use meta_merkle_tree::generated_merkle_tree::MerkleRootGeneratorError; use meta_merkle_tree::{ generated_merkle_tree::GeneratedMerkleTreeCollection, meta_merkle_tree::MetaMerkleTree, }; -use solana_metrics::datapoint_info; +use solana_metrics::{datapoint_error, datapoint_info}; use solana_sdk::{account::AccountSharedData, pubkey::Pubkey, slot_history::Slot}; #[derive(Debug)] @@ -106,6 +108,7 @@ pub fn get_meta_merkle_root( epoch: u64, protocol_fee_bps: u64, snapshots_enabled: bool, + meta_merkle_tree_dir: &PathBuf, ) -> std::result::Result { let start = Instant::now(); @@ -234,6 +237,46 @@ pub fn get_meta_merkle_root( ("duration_ms", start.elapsed().as_millis() as i64, i64) ); + // TODO: Hide this behind a flag when the process gets split up into the various stages and + // checkpoints. + + // Write GeneratedMerkleTreeCollection to disk. Required for Claiming + let merkle_tree_coll_path = + meta_merkle_tree_dir.join(format!("generated_merkle_tree_{}.json", epoch)); + let generated_merkle_tree_col_json = match serde_json::to_string(&merkle_tree_coll) { + Ok(json) => json, + Err(e) => { + datapoint_error!( + "tip_router_cli.process_epoch", + ("operator_address", operator_address.to_string(), String), + ("epoch", epoch, i64), + ("status", "error", String), + ("error", format!("{:?}", e), String), + ("state", "merkle_root_serialization", String), + ("duration_ms", start.elapsed().as_millis() as i64, i64) + ); + return Err(MerkleRootError::MerkleRootGeneratorError( + "Failed to serialize merkle tree collection".to_string(), + )); + } + }; + + if let Err(e) = std::fs::write(&merkle_tree_coll_path, generated_merkle_tree_col_json) { + datapoint_error!( + "tip_router_cli.process_epoch", + ("operator_address", operator_address.to_string(), String), + ("epoch", epoch, i64), + ("status", "error", String), + ("error", format!("{:?}", e), String), + ("state", "merkle_root_file_write", String), + ("duration_ms", start.elapsed().as_millis() as i64, i64) + ); + // TODO: propogate error + return Err(MerkleRootError::MerkleRootGeneratorError( + "Failed to write meta merkle tree to file".to_string(), + )); + } + // Convert to MetaMerkleTree let meta_merkle_tree = MetaMerkleTree::new_from_generated_merkle_tree_collection( merkle_tree_coll, diff --git a/tip-router-operator-cli/src/main.rs b/tip-router-operator-cli/src/main.rs index 972e688..82e6031 100644 --- a/tip-router-operator-cli/src/main.rs +++ b/tip-router-operator-cli/src/main.rs @@ -2,8 +2,10 @@ use ::{ anyhow::Result, clap::Parser, ellipsis_client::{ClientSubset, EllipsisClient}, + jito_tip_router_core::config::Config, log::{error, info}, - solana_metrics::set_host_id, + meta_merkle_tree::generated_merkle_tree::GeneratedMerkleTreeCollection, + solana_metrics::{datapoint_error, datapoint_info, set_host_id}, solana_rpc_client::rpc_client::RpcClient, solana_sdk::{ clock::DEFAULT_SLOTS_PER_EPOCH, @@ -11,9 +13,15 @@ use ::{ signer::{keypair::read_keypair_file, Signer}, transaction::Transaction, }, - std::{path::PathBuf, str::FromStr, time::Duration}, + std::{ + path::PathBuf, + str::FromStr, + sync::Arc, + time::{Duration, Instant}, + }, tip_router_operator_cli::{ backup_snapshots::BackupSnapshotMonitor, + claim::claim_mev_tips, cli::{Cli, Commands}, process_epoch::{get_previous_epoch_last_slot, process_epoch, wait_for_next_epoch}, submit::{submit_recent_epochs_to_ncn, submit_to_ncn}, @@ -246,6 +254,59 @@ async fn main() -> Result<()> { ) .await?; } + Commands::ClaimTips { + tip_distribution_program_id, + tip_router_program_id, + ncn_address, + micro_lamports, + epoch, + } => { + let start = Instant::now(); + info!("Claiming tips..."); + + let arc_keypair = Arc::new(keypair); + // Load the GeneratedMerkleTreeCollection, which should have been previously generated + let merkle_tree_coll_path = PathBuf::from(format!( + "{}/generated_merkle_tree_{}.json", + cli.meta_merkle_tree_dir.display(), + epoch + )); + let merkle_tree_coll = + GeneratedMerkleTreeCollection::new_from_file(&merkle_tree_coll_path)?; + + match claim_mev_tips( + &merkle_tree_coll, + cli.rpc_url.clone(), + // TODO: Review if we should offer separate rpc_send_url in CLI. This may be used + // if sending via block engine. + cli.rpc_url, + tip_distribution_program_id, + tip_router_program_id, + ncn_address, + arc_keypair, + Duration::from_secs(3600), + micro_lamports, + ) + .await + { + Err(e) => { + datapoint_error!( + "claim_mev_workflow-claim_error", + ("epoch", epoch, i64), + ("error", 1, i64), + ("err_str", e.to_string(), String), + ("elapsed_us", start.elapsed().as_micros(), i64), + ); + } + Ok(()) => { + datapoint_info!( + "claim_mev_workflow-claim_completion", + ("epoch", epoch, i64), + ("elapsed_us", start.elapsed().as_micros(), i64), + ); + } + } + } } Ok(()) } diff --git a/tip-router-operator-cli/src/process_epoch.rs b/tip-router-operator-cli/src/process_epoch.rs index ae4caf3..e03c54b 100644 --- a/tip-router-operator-cli/src/process_epoch.rs +++ b/tip-router-operator-cli/src/process_epoch.rs @@ -143,6 +143,7 @@ pub async fn process_epoch( target_epoch, adjusted_total_fees, snapshots_enabled, + &meta_merkle_tree_dir, ) { Ok(tree) => { datapoint_info!( diff --git a/tip-router-operator-cli/src/rpc_utils.rs b/tip-router-operator-cli/src/rpc_utils.rs new file mode 100644 index 0000000..27b7a34 --- /dev/null +++ b/tip-router-operator-cli/src/rpc_utils.rs @@ -0,0 +1,148 @@ +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::Duration, +}; + +use log::{info, warn}; +use solana_client::{ + nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction, + rpc_config::RpcSendTransactionConfig, rpc_request::MAX_MULTIPLE_ACCOUNTS, +}; +use solana_sdk::{ + account::Account, + commitment_config::{CommitmentConfig, CommitmentLevel}, + hash::Hash, + pubkey::Pubkey, + signature::{Keypair, Signature}, + transaction::{Transaction, TransactionError}, +}; +use solana_transaction_status::TransactionStatus; +use tokio::time::sleep; + +pub async fn get_batched_accounts( + rpc_client: &RpcClient, + pubkeys: &[Pubkey], +) -> solana_rpc_client_api::client_error::Result>> { + let mut batched_accounts = HashMap::new(); + + for pubkeys_chunk in pubkeys.chunks(MAX_MULTIPLE_ACCOUNTS) { + let accounts = rpc_client.get_multiple_accounts(pubkeys_chunk).await?; + batched_accounts.extend(pubkeys_chunk.iter().cloned().zip(accounts)); + } + Ok(batched_accounts) +} + +pub async fn send_until_blockhash_expires( + rpc_client: &RpcClient, + rpc_sender_client: &RpcClient, + transactions: Vec, + blockhash: Hash, + keypair: &Arc, +) -> solana_rpc_client_api::client_error::Result<()> { + let mut claim_transactions: HashMap = transactions + .into_iter() + .map(|mut tx| { + tx.sign(&[&keypair], blockhash); + (*tx.get_signature(), tx) + }) + .collect(); + + let txs_requesting_send = claim_transactions.len(); + + while rpc_client + .is_blockhash_valid(&blockhash, CommitmentConfig::processed()) + .await? + { + let mut check_signatures = HashSet::with_capacity(claim_transactions.len()); + let mut already_processed = HashSet::with_capacity(claim_transactions.len()); + let mut is_blockhash_not_found = false; + + for (signature, tx) in &claim_transactions { + match rpc_sender_client + .send_transaction_with_config( + tx, + RpcSendTransactionConfig { + skip_preflight: false, + preflight_commitment: Some(CommitmentLevel::Confirmed), + max_retries: Some(2), + ..RpcSendTransactionConfig::default() + }, + ) + .await + { + Ok(_) => { + check_signatures.insert(*signature); + } + Err(e) => match e.get_transaction_error() { + Some(TransactionError::BlockhashNotFound) => { + is_blockhash_not_found = true; + break; + } + Some(TransactionError::AlreadyProcessed) => { + already_processed.insert(*tx.get_signature()); + } + Some(e) => { + warn!( + "TransactionError sending signature: {} error: {:?} tx: {:?}", + tx.get_signature(), + e, + tx + ); + } + None => { + warn!( + "Unknown error sending transaction signature: {} error: {:?}", + tx.get_signature(), + e + ); + } + }, + } + } + + sleep(Duration::from_secs(10)).await; + + let signatures: Vec = check_signatures.iter().cloned().collect(); + let statuses = get_batched_signatures_statuses(rpc_client, &signatures).await?; + + for (signature, maybe_status) in &statuses { + if let Some(_status) = maybe_status { + claim_transactions.remove(signature); + check_signatures.remove(signature); + } + } + + for signature in already_processed { + claim_transactions.remove(&signature); + } + + if claim_transactions.is_empty() || is_blockhash_not_found { + break; + } + } + + let num_landed = txs_requesting_send + .checked_sub(claim_transactions.len()) + .unwrap(); + info!("num_landed: {:?}", num_landed); + + Ok(()) +} + +pub async fn get_batched_signatures_statuses( + rpc_client: &RpcClient, + signatures: &[Signature], +) -> solana_rpc_client_api::client_error::Result)>> { + let mut signature_statuses = Vec::new(); + + for signatures_batch in signatures.chunks(100) { + // was using get_signature_statuses_with_history, but it blocks if the signatures don't exist + // bigtable calls to read signatures that don't exist block forever w/o --rpc-bigtable-timeout argument set + // get_signature_statuses looks in status_cache, which only has a 150 block history + // may have false negative, but for this workflow it doesn't matter + let statuses = rpc_client.get_signature_statuses(signatures_batch).await?; + signature_statuses.extend(signatures_batch.iter().cloned().zip(statuses.value)); + } + Ok(signature_statuses) +} diff --git a/tip-router-operator-cli/tests/integration_tests.rs b/tip-router-operator-cli/tests/integration_tests.rs index bd9834b..2ef7268 100644 --- a/tip-router-operator-cli/tests/integration_tests.rs +++ b/tip-router-operator-cli/tests/integration_tests.rs @@ -186,10 +186,7 @@ impl TestContext { async fn test_meta_merkle_creation_from_ledger() { // 1. Setup - create necessary variables/arguments let ledger_path = Path::new("tests/fixtures/test-ledger"); - let account_paths = vec![ - PathBuf::from("tests/fixtures/accounts"), - PathBuf::from("path/to/account2"), - ]; + let account_paths = vec![ledger_path.join("accounts/run")]; let full_snapshots_path = PathBuf::from("tests/fixtures/test-ledger"); let desired_slot = &144; let tip_distribution_program_id = &TIP_DISTRIBUTION_ID; @@ -216,6 +213,7 @@ async fn test_meta_merkle_creation_from_ledger() { epoch, PROTOCOL_FEE_BPS, false, + &ledger_path.to_path_buf(), ) .unwrap();