Skip to content

refactor: fix rate limiter #1636

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions forester-utils/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::time::Duration;
use account_compression::processor::initialize_address_merkle_tree::Pubkey;
use anchor_lang::solana_program::system_instruction;
use light_client::{
Expand All @@ -18,13 +19,15 @@ pub async fn airdrop_lamports<R: RpcConnection>(
let transfer_instruction =
system_instruction::transfer(&rpc.get_payer().pubkey(), destination_pubkey, lamports);
let latest_blockhash = rpc.get_latest_blockhash().await?;

let payer = rpc.get_payer().insecure_clone();
let transaction = Transaction::new_signed_with_payer(
&[transfer_instruction],
Some(&rpc.get_payer().pubkey()),
&vec![&rpc.get_payer()],
Some(&payer.pubkey()),
&[&payer],
latest_blockhash,
);
rpc.process_transaction(transaction).await?;
rpc.process_transaction(transaction, &[&payer]).await?;
Ok(())
}

Expand Down Expand Up @@ -63,7 +66,10 @@ pub async fn wait_for_indexer<R: RpcConnection, I: Indexer<R>>(
"waiting for indexer to catch up, rpc_slot: {}, indexer_slot: {}",
rpc_slot, indexer_slot
);
sleep(std::time::Duration::from_millis(400)).await;
{
sleep(Duration::from_millis(400)).await;
tokio::task::yield_now().await;
}
indexer_slot = indexer.get_indexer_slot(rpc).await.map_err(|e| {
error!("failed to get indexer slot from indexer: {:?}", e);
ForesterUtilsError::Indexer("Failed to get indexer slot".into())
Expand Down
5 changes: 5 additions & 0 deletions forester/src/batch_processor/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ pub(crate) async fn process_batch<R: RpcConnection, I: Indexer<R> + IndexerType<
})?,
);

if !context.is_eligible() {
debug!("Skipping address update due to eligibility check");
return Ok(0);
}

let tx = rpc
.create_and_send_transaction_with_event::<MerkleTreeEvent>(
&[instruction],
Expand Down
19 changes: 17 additions & 2 deletions forester/src/batch_processor/common.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::sync::Arc;

use std::time::Duration;
use forester_utils::forester_epoch::TreeType;
use light_batched_merkle_tree::{
batch::{Batch, BatchState},
Expand All @@ -10,8 +10,8 @@ use light_client::{indexer::Indexer, rpc::RpcConnection, rpc_pool::SolanaRpcPool
use solana_program::pubkey::Pubkey;
use solana_sdk::signature::Keypair;
use tokio::sync::Mutex;
use tokio::time::Instant;
use tracing::{debug, info, log::error};

use super::{address, error::Result, state, BatchProcessError};
use crate::indexer_type::IndexerType;

Expand All @@ -25,6 +25,16 @@ pub struct BatchContext<R: RpcConnection, I: Indexer<R>> {
pub merkle_tree: Pubkey,
pub output_queue: Pubkey,
pub ixs_per_tx: usize,
pub duration: Duration,
pub start_time: Instant,
}

impl<R: RpcConnection, I: Indexer<R>> BatchContext<R, I> {

pub fn is_eligible(&self) -> bool {
let now = Instant::now();
now.duration_since(self.start_time) <= self.duration
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -53,6 +63,11 @@ impl<R: RpcConnection, I: Indexer<R> + IndexerType<R>> BatchProcessor<R, I> {
let state = self.verify_batch_ready().await;
debug!("Batch ready state: {:?}", state);

if !self.context.is_eligible() {
debug!("Forester is not eligible");
return Ok(0);
}

match state {
BatchReadyState::ReadyForAppend => match self.tree_type {
TreeType::BatchedAddress => address::process_batch(&self.context).await,
Expand Down
10 changes: 9 additions & 1 deletion forester/src/batch_processor/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ pub(crate) async fn perform_append<R: RpcConnection, I: Indexer<R> + IndexerType
));
}

if !context.is_eligible() {
debug!("Skipping append transaction chunk due to eligibility check");
return Ok(());
}
match rpc
.create_and_send_transaction(
&instructions,
Expand Down Expand Up @@ -181,6 +185,11 @@ pub(crate) async fn perform_nullify<R: RpcConnection, I: Indexer<R> + IndexerTyp
));
}

if !context.is_eligible() {
debug!("Skipping append transaction chunk due to eligibility check");
return Ok(());
}

match rpc
.create_and_send_transaction(
&instructions,
Expand All @@ -196,7 +205,6 @@ pub(crate) async fn perform_nullify<R: RpcConnection, I: Indexer<R> + IndexerTyp
instruction_data_vec.len().div_ceil(context.ixs_per_tx),
tx
);
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
Err(e) => {
error!(
Expand Down
4 changes: 4 additions & 0 deletions forester/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ impl<R: RpcConnection, I: Indexer<R> + IndexerType<R>> EpochManager<R, I> {
);
if attempt < max_retries - 1 {
sleep(retry_delay).await;
tokio::task::yield_now().await;
} else {
if let Err(alert_err) = send_pagerduty_alert(
&self
Expand Down Expand Up @@ -876,6 +877,8 @@ impl<R: RpcConnection, I: Indexer<R> + IndexerType<R>> EpochManager<R, I> {
merkle_tree: tree.tree_accounts.merkle_tree,
output_queue: tree.tree_accounts.queue,
ixs_per_tx: self.config.transaction_config.batch_ixs_per_tx,
duration: light_slot_timeout,
start_time: Instant::now(),
};

let start_time = Instant::now();
Expand Down Expand Up @@ -1258,6 +1261,7 @@ pub async fn run_service<R: RpcConnection, I: Indexer<R> + IndexerType<R>>(
if retry_count < config.retry_config.max_retries {
debug!("Retrying in {:?}", retry_delay);
sleep(retry_delay).await;
tokio::task::yield_now().await;
retry_delay = std::cmp::min(retry_delay * 2, MAX_RETRY_DELAY);
} else {
error!(
Expand Down
20 changes: 11 additions & 9 deletions forester/src/rollover/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,18 +296,19 @@ pub async fn perform_state_merkle_tree_rollover_forester<R: RpcConnection>(
)
.await;
let blockhash = context.get_latest_blockhash().await.unwrap();
let signers = &vec![
payer,
&new_queue_keypair,
&new_address_merkle_tree_keypair,
&new_cpi_context_keypair,
];
let transaction = Transaction::new_signed_with_payer(
&instructions,
Some(&payer.pubkey()),
&vec![
&payer,
&new_queue_keypair,
&new_address_merkle_tree_keypair,
&new_cpi_context_keypair,
],
signers,
blockhash,
);
context.process_transaction(transaction).await
context.process_transaction(transaction, signers).await
}

#[allow(clippy::too_many_arguments)]
Expand All @@ -333,13 +334,14 @@ pub async fn perform_address_merkle_tree_rollover<R: RpcConnection>(
)
.await;
let blockhash = context.get_latest_blockhash().await.unwrap();
let signers = &vec![payer, &new_queue_keypair, &new_address_merkle_tree_keypair];
let transaction = Transaction::new_signed_with_payer(
&instructions,
Some(&payer.pubkey()),
&vec![&payer, &new_queue_keypair, &new_address_merkle_tree_keypair],
signers,
blockhash,
);
context.process_transaction(transaction).await
context.process_transaction(transaction, signers).await
}

#[allow(clippy::too_many_arguments)]
Expand Down
4 changes: 2 additions & 2 deletions forester/src/send_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,14 +262,14 @@ pub async fn send_batched_transactions<T: TransactionBuilder, R: RpcConnection>(

let cancel_signal_clone = cancel_signal.clone();
let deadline = timeout_deadline;

let payer_clone = payer.insecure_clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this necessary since we are just passing a reference?

tokio::spawn(async move {
if cancel_signal_clone.load(Ordering::SeqCst) || Instant::now() >= deadline {
return;
}

if let Ok(mut rpc) = pool_clone.get_connection().await {
let result = rpc.process_transaction_with_config(tx, config).await;
let result = rpc.process_transaction_with_config(tx, &[&payer_clone], config).await;
if !cancel_signal_clone.load(Ordering::SeqCst) {
let _ = tx_sender.send(result).await;
}
Expand Down
2 changes: 1 addition & 1 deletion forester/src/slot_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ pub async fn wait_until_slot_reached<R: RpcConnection>(
let current_estimated_slot = slot_tracker.estimated_current_slot();

if current_estimated_slot >= target_slot {
// Double-check with actual RPC call
let actual_slot = rpc.get_slot().await?;
if actual_slot >= target_slot {
break;
Expand All @@ -106,6 +105,7 @@ pub async fn wait_until_slot_reached<R: RpcConnection>(
sleep_duration.as_secs_f64()
);
sleep(sleep_duration).await;
tokio::task::yield_now().await;
}

trace!("Slot {} reached", target_slot);
Expand Down
4 changes: 2 additions & 2 deletions forester/src/smart_transaction.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// adapted from https://github.com/helius-labs/helius-rust-sdk/blob/dev/src/optimized_transaction.rs
// optimized for forester client
use std::time::{Duration, Instant};

use light_client::{rpc::RpcConnection, rpc_pool::SolanaConnectionManager};
use solana_client::rpc_config::RpcSendTransactionConfig;
use solana_sdk::{
Expand Down Expand Up @@ -81,6 +80,7 @@ pub async fn poll_transaction_confirmation<'a, R: RpcConnection>(
pub async fn send_and_confirm_transaction<'a, R: RpcConnection>(
connection: &mut bb8::PooledConnection<'a, SolanaConnectionManager<R>>,
transaction: &Transaction,
signers: &[&Keypair],
send_transaction_config: RpcSendTransactionConfig,
last_valid_block_height: u64,
timeout: Duration,
Expand All @@ -90,7 +90,7 @@ pub async fn send_and_confirm_transaction<'a, R: RpcConnection>(
while Instant::now().duration_since(start_time) < timeout
&& connection.get_slot().await? <= last_valid_block_height
{
let result = connection.send_transaction_with_config(transaction, send_transaction_config);
let result = connection.send_transaction_with_config(transaction, signers, send_transaction_config);

match result.await {
Ok(signature) => {
Expand Down
Loading
Loading