Skip to content

Commit b4115d2

Browse files
apollo_gateway: extract async mempool query from blocking task
1 parent 081adc6 commit b4115d2

File tree

4 files changed

+90
-94
lines changed

4 files changed

+90
-94
lines changed

crates/apollo_gateway/src/gateway.rs

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@ use apollo_gateway_types::gateway_types::{
1919
InvokeGatewayOutput,
2020
};
2121
use apollo_infra::component_definitions::ComponentStarter;
22-
use apollo_mempool_types::communication::{AddTransactionArgsWrapper, SharedMempoolClient};
22+
use apollo_mempool_types::communication::{
23+
AddTransactionArgsWrapper,
24+
MempoolClientError,
25+
SharedMempoolClient,
26+
};
2327
use apollo_mempool_types::mempool_types::AddTransactionArgs;
2428
use apollo_network_types::network_types::BroadcastedMessageMetadata;
2529
use apollo_proc_macros::sequencer_latency_histogram;
@@ -34,6 +38,7 @@ use starknet_api::rpc_transaction::{
3438
RpcDeclareTransaction,
3539
RpcTransaction,
3640
};
41+
use starknet_types_core::felt::Felt;
3742
use tracing::{debug, info, warn, Span};
3843

3944
use crate::errors::{
@@ -159,16 +164,14 @@ impl Gateway {
159164
})?;
160165

161166
let mut blocking_task =
162-
ProcessTxBlockingTask::new(self, executable_tx, tokio::runtime::Handle::current())
163-
.await
164-
.map_err(|e| {
165-
info!(
166-
"Gateway validation failed for tx with signature: {:?} with error: {}",
167-
&tx_signature, e
168-
);
169-
metric_counters.record_add_tx_failure(&e);
170-
e
171-
})?;
167+
ProcessTxBlockingTask::new(self, executable_tx.clone()).await.map_err(|e| {
168+
info!(
169+
"Gateway validation failed for tx with signature: {:?} with error: {}",
170+
&tx_signature, e
171+
);
172+
metric_counters.record_add_tx_failure(&e);
173+
e
174+
})?;
172175

173176
let state_reader = blocking_task.get_state_reader().await?;
174177

@@ -178,12 +181,26 @@ impl Gateway {
178181
.await
179182
.map_err(|e| StarknetError::internal_with_logging("Failed to get account nonce", e))?;
180183

184+
let is_account_tx_in_mempool =
185+
if should_query_mempool_for_account_tx(&executable_tx, account_nonce) {
186+
self.mempool_client
187+
.account_tx_in_pool_or_recent_block(executable_tx.contract_address())
188+
.await
189+
} else {
190+
Ok(false)
191+
};
192+
181193
let stateful_tx_validator = blocking_task.create_stateful_validator(state_reader).await?;
182194

183-
// Run the blocking task in the current span.
184195
let curr_span = Span::current();
185196
let handle = tokio::task::spawn_blocking(move || {
186-
curr_span.in_scope(|| blocking_task.process_tx(account_nonce, stateful_tx_validator))
197+
curr_span.in_scope(|| {
198+
blocking_task.process_tx(
199+
account_nonce,
200+
stateful_tx_validator,
201+
is_account_tx_in_mempool,
202+
)
203+
})
187204
});
188205
let handle_result = handle.await;
189206
let nonce = match handle_result {
@@ -256,28 +273,31 @@ impl Gateway {
256273
}
257274
}
258275

276+
// Returns true if we should prefetch mempool data to potentially skip stateful validations:
277+
// applies only to Invoke with nonce == 1 and when the account nonce is zero (pre-deploy state).
278+
fn should_query_mempool_for_account_tx(
279+
executable_tx: &AccountTransaction,
280+
account_nonce: Nonce,
281+
) -> bool {
282+
matches!(executable_tx, AccountTransaction::Invoke(_))
283+
&& executable_tx.nonce() == Nonce(Felt::ONE)
284+
&& account_nonce == Nonce(Felt::ZERO)
285+
}
286+
259287
/// CPU-intensive transaction processing, spawned in a blocking thread to avoid blocking other tasks
260288
/// from running.
261289
struct ProcessTxBlockingTask {
262290
state_reader_factory: Arc<dyn StateReaderFactory>,
263291
stateful_tx_validator_factory: Arc<dyn StatefulTransactionValidatorFactoryTrait>,
264-
mempool_client: SharedMempoolClient,
265292
executable_tx: AccountTransaction,
266-
runtime: tokio::runtime::Handle,
267293
}
268294

269295
impl ProcessTxBlockingTask {
270-
pub async fn new(
271-
gateway: &Gateway,
272-
executable_tx: AccountTransaction,
273-
runtime: tokio::runtime::Handle,
274-
) -> GatewayResult<Self> {
296+
pub async fn new(gateway: &Gateway, executable_tx: AccountTransaction) -> GatewayResult<Self> {
275297
Ok(Self {
276298
state_reader_factory: gateway.state_reader_factory.clone(),
277299
stateful_tx_validator_factory: gateway.stateful_tx_validator_factory.clone(),
278-
mempool_client: gateway.mempool_client.clone(),
279300
executable_tx,
280-
runtime,
281301
})
282302
}
283303

@@ -316,12 +336,12 @@ impl ProcessTxBlockingTask {
316336
self,
317337
account_nonce: Nonce,
318338
mut stateful_tx_validator: Box<dyn StatefulTransactionValidatorTrait>,
339+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
319340
) -> GatewayResult<Nonce> {
320341
let nonce = stateful_tx_validator.extract_state_nonce_and_run_validations(
321342
&self.executable_tx,
322343
account_nonce,
323-
self.mempool_client,
324-
self.runtime,
344+
is_account_tx_in_mempool,
325345
)?;
326346

327347
Ok(nonce)

crates/apollo_gateway/src/gateway_test.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,14 @@ fn mock_stateless_transaction_validator() -> MockStatelessTransactionValidatorTr
118118
mock_stateless_transaction_validator
119119
}
120120

121+
#[fixture]
122+
fn default_mock_mempool_client() -> MockMempoolClient {
123+
let mut mock_mempool_client = MockMempoolClient::new();
124+
// Gateway tests don't require the mempool client to be checked, any value is fine.
125+
mock_mempool_client.expect_account_tx_in_pool_or_recent_block().returning(|_| Ok(false));
126+
mock_mempool_client
127+
}
128+
121129
#[fixture]
122130
fn mock_dependencies() -> MockDependencies {
123131
let config = GatewayConfig {
@@ -130,7 +138,7 @@ fn mock_dependencies() -> MockDependencies {
130138
};
131139
let state_reader_factory =
132140
local_test_state_reader_factory(CairoVersion::Cairo1(RunnableCairo1::Casm), true);
133-
let mock_mempool_client = MockMempoolClient::new();
141+
let mock_mempool_client = default_mock_mempool_client();
134142
let mock_transaction_converter = MockTransactionConverterTrait::new();
135143
let mock_stateless_transaction_validator = mock_stateless_transaction_validator();
136144
MockDependencies {
@@ -318,13 +326,12 @@ async fn run_add_tx_and_extract_metrics(
318326
async fn process_tx_task(
319327
stateful_tx_validator_factory: MockStatefulTransactionValidatorFactoryTrait,
320328
) -> ProcessTxBlockingTask {
321-
let state_reader_factory = Arc::new(MockStateReaderFactory::new());
329+
let state_reader_factory: Arc<dyn crate::state_reader::StateReaderFactory> =
330+
Arc::new(MockStateReaderFactory::new());
322331
ProcessTxBlockingTask {
323332
state_reader_factory,
324333
stateful_tx_validator_factory: Arc::new(stateful_tx_validator_factory),
325-
mempool_client: Arc::new(MockMempoolClient::new()),
326334
executable_tx: executable_invoke_tx(invoke_args()),
327-
runtime: tokio::runtime::Handle::current(),
328335
}
329336
}
330337

@@ -562,12 +569,16 @@ async fn process_tx_returns_error_when_extract_state_nonce_and_run_validations_f
562569

563570
mock_stateful_transaction_validator
564571
.expect_extract_state_nonce_and_run_validations()
565-
.return_once(|_, _, _, _| Err(expected_error));
572+
.return_once(|_, _, _| Err(expected_error));
566573

567574
let process_tx_task = process_tx_task(mock_stateful_transaction_validator_factory).await;
568-
let account_nonce = nonce!(0);
575+
569576
let result = tokio::task::spawn_blocking(move || {
570-
process_tx_task.process_tx(account_nonce, Box::new(mock_stateful_transaction_validator))
577+
process_tx_task.process_tx(
578+
nonce!(0),
579+
Box::new(mock_stateful_transaction_validator),
580+
Ok(false),
581+
)
571582
})
572583
.await
573584
.unwrap();
@@ -603,7 +614,6 @@ async fn add_tx_returns_error_when_getting_state_reader_fails(
603614
mut mock_dependencies: MockDependencies,
604615
mut mock_stateful_transaction_validator_factory: MockStatefulTransactionValidatorFactoryTrait,
605616
) {
606-
// Prepare transaction conversion to reach instantiation.
607617
let tx_args = invoke_args();
608618
setup_transaction_converter_mock(&mut mock_dependencies.mock_transaction_converter, &tx_args);
609619

@@ -613,7 +623,6 @@ async fn add_tx_returns_error_when_getting_state_reader_fails(
613623
.expect_get_state_reader_for_validation()
614624
.return_once(|_| Err(expected_error));
615625

616-
// Build gateway and inject the failing factory.
617626
let mut gateway = mock_dependencies.gateway();
618627
gateway.stateful_tx_validator_factory = Arc::new(mock_stateful_transaction_validator_factory);
619628

crates/apollo_gateway/src/stateful_transaction_validator.rs

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use apollo_gateway_types::deprecated_gateway_error::{
77
StarknetErrorCode,
88
};
99
use apollo_gateway_types::errors::GatewaySpecError;
10-
use apollo_mempool_types::communication::SharedMempoolClient;
10+
use apollo_mempool_types::communication::MempoolClientError;
1111
use apollo_proc_macros::sequencer_latency_histogram;
1212
use async_trait::async_trait;
1313
use blockifier::blockifier::stateful_validator::{
@@ -149,8 +149,7 @@ pub trait StatefulTransactionValidatorTrait: Send {
149149
&mut self,
150150
executable_tx: &ExecutableTransaction,
151151
account_nonce: Nonce,
152-
mempool_client: SharedMempoolClient,
153-
runtime: tokio::runtime::Handle,
152+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
154153
) -> StatefulTransactionValidatorResult<Nonce>;
155154

156155
fn get_nonce(
@@ -171,10 +170,9 @@ impl<B: BlockifierStatefulValidatorTrait + Send> StatefulTransactionValidatorTra
171170
&mut self,
172171
executable_tx: &ExecutableTransaction,
173172
account_nonce: Nonce,
174-
mempool_client: SharedMempoolClient,
175-
runtime: tokio::runtime::Handle,
173+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
176174
) -> StatefulTransactionValidatorResult<Nonce> {
177-
self.run_transaction_validations(executable_tx, account_nonce, mempool_client, runtime)?;
175+
self.run_transaction_validations(executable_tx, account_nonce, is_account_tx_in_mempool)?;
178176
Ok(account_nonce)
179177
}
180178

@@ -193,11 +191,10 @@ impl<B: BlockifierStatefulValidatorTrait + Send> StatefulTransactionValidator<B>
193191
&mut self,
194192
executable_tx: &ExecutableTransaction,
195193
account_nonce: Nonce,
196-
mempool_client: SharedMempoolClient,
197-
runtime: tokio::runtime::Handle,
194+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
198195
) -> StatefulTransactionValidatorResult<()> {
199196
self.validate_state_preconditions(executable_tx, account_nonce)?;
200-
self.run_validate_entry_point(executable_tx, account_nonce, mempool_client, runtime)?;
197+
self.run_validate_entry_point(executable_tx, account_nonce, is_account_tx_in_mempool)?;
201198
Ok(())
202199
}
203200

@@ -294,11 +291,10 @@ impl<B: BlockifierStatefulValidatorTrait + Send> StatefulTransactionValidator<B>
294291
&mut self,
295292
executable_tx: &ExecutableTransaction,
296293
account_nonce: Nonce,
297-
mempool_client: SharedMempoolClient,
298-
runtime: tokio::runtime::Handle,
294+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
299295
) -> StatefulTransactionValidatorResult<()> {
300296
let skip_validate =
301-
skip_stateful_validations(executable_tx, account_nonce, mempool_client, runtime)?;
297+
skip_stateful_validations(executable_tx, account_nonce, is_account_tx_in_mempool)?;
302298
let only_query = false;
303299
let charge_fee = enforce_fee(executable_tx, only_query);
304300
let strict_nonce_check = false;
@@ -354,8 +350,7 @@ impl<B: BlockifierStatefulValidatorTrait + Send> StatefulTransactionValidator<B>
354350
fn skip_stateful_validations(
355351
tx: &ExecutableTransaction,
356352
account_nonce: Nonce,
357-
mempool_client: SharedMempoolClient,
358-
runtime: tokio::runtime::Handle,
353+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
359354
) -> StatefulTransactionValidatorResult<bool> {
360355
if let ExecutableTransaction::Invoke(ExecutableInvokeTransaction { tx, .. }) = tx {
361356
// check if the transaction nonce is 1, meaning it is post deploy_account, and the
@@ -367,10 +362,7 @@ fn skip_stateful_validations(
367362
// to check if the account exists in the mempool since it means that either it has a
368363
// deploy_account transaction or transactions with future nonces that passed
369364
// validations.
370-
// TODO(Itamar): Avoid blocking here by moving this mempool existence check to the async
371-
// path prior to spawning the blocking task, and pass the result into validation.
372-
return runtime
373-
.block_on(mempool_client.account_tx_in_pool_or_recent_block(tx.sender_address()))
365+
return is_account_tx_in_mempool
374366
.map_err(|err| mempool_client_err_to_deprecated_gw_err(&tx.signature(), err))
375367
.inspect(|exists| {
376368
if *exists {

0 commit comments

Comments
 (0)