Skip to content

Commit fc642eb

Browse files
apollo_gateway: extract async mempool query from blocking task
1 parent 6af545f commit fc642eb

File tree

4 files changed

+94
-93
lines changed

4 files changed

+94
-93
lines changed

crates/apollo_gateway/src/gateway.rs

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@ use apollo_gateway_types::gateway_types::{
1919
InvokeGatewayOutput,
2020
};
2121
use apollo_infra::component_definitions::ComponentStarter;
22-
use apollo_mempool_types::communication::{AddTransactionArgsWrapper, SharedMempoolClient};
22+
use apollo_mempool_types::communication::{
23+
AddTransactionArgsWrapper,
24+
MempoolClientError,
25+
SharedMempoolClient,
26+
};
2327
use apollo_mempool_types::mempool_types::AddTransactionArgs;
2428
use apollo_network_types::network_types::BroadcastedMessageMetadata;
2529
use apollo_proc_macros::sequencer_latency_histogram;
@@ -34,6 +38,7 @@ use starknet_api::rpc_transaction::{
3438
RpcDeclareTransaction,
3539
RpcTransaction,
3640
};
41+
use starknet_types_core::felt::Felt;
3742
use tracing::{debug, info, warn, Span};
3843

3944
use crate::errors::{
@@ -160,16 +165,14 @@ impl Gateway {
160165
})?;
161166

162167
let mut blocking_task =
163-
ProcessTxBlockingTask::new(self, executable_tx, tokio::runtime::Handle::current())
164-
.await
165-
.map_err(|e| {
166-
info!(
167-
"Gateway validation failed for tx with signature: {:?} with error: {}",
168-
&tx_signature, e
169-
);
170-
metric_counters.record_add_tx_failure(&e);
171-
e
172-
})?;
168+
ProcessTxBlockingTask::new(self, executable_tx.clone()).await.map_err(|e| {
169+
info!(
170+
"Gateway validation failed for tx with signature: {:?} with error: {}",
171+
&tx_signature, e
172+
);
173+
metric_counters.record_add_tx_failure(&e);
174+
e
175+
})?;
173176

174177
let state_reader = blocking_task.get_state_reader().await?;
175178

@@ -179,12 +182,26 @@ impl Gateway {
179182
.await
180183
.map_err(|e| StarknetError::internal_with_logging("Failed to get account nonce", e))?;
181184

185+
let is_account_tx_in_mempool =
186+
if should_query_mempool_for_account_tx(&executable_tx, account_nonce) {
187+
self.mempool_client
188+
.account_tx_in_pool_or_recent_block(executable_tx.contract_address())
189+
.await
190+
} else {
191+
Ok(false)
192+
};
193+
182194
let stateful_tx_validator = blocking_task.create_stateful_validator(state_reader).await?;
183195

184-
// Run the blocking task in the current span.
185196
let curr_span = Span::current();
186197
let handle = tokio::task::spawn_blocking(move || {
187-
curr_span.in_scope(|| blocking_task.process_tx(account_nonce, stateful_tx_validator))
198+
curr_span.in_scope(|| {
199+
blocking_task.process_tx(
200+
account_nonce,
201+
stateful_tx_validator,
202+
is_account_tx_in_mempool,
203+
)
204+
})
188205
});
189206
let handle_result = handle.await;
190207
let nonce = match handle_result {
@@ -257,28 +274,31 @@ impl Gateway {
257274
}
258275
}
259276

277+
// Returns true if we should prefetch mempool data to potentially skip stateful validations:
278+
// applies only to Invoke with nonce == 1 and when the account nonce is zero (pre-deploy state).
279+
fn should_query_mempool_for_account_tx(
280+
executable_tx: &AccountTransaction,
281+
account_nonce: Nonce,
282+
) -> bool {
283+
matches!(executable_tx, AccountTransaction::Invoke(_))
284+
&& executable_tx.nonce() == Nonce(Felt::ONE)
285+
&& account_nonce == Nonce(Felt::ZERO)
286+
}
287+
260288
/// CPU-intensive transaction processing, spawned in a blocking thread to avoid blocking other tasks
261289
/// from running.
262290
struct ProcessTxBlockingTask {
263291
state_reader_factory: Arc<dyn StateReaderFactory>,
264292
stateful_tx_validator_factory: Arc<dyn StatefulTransactionValidatorFactoryTrait>,
265-
mempool_client: SharedMempoolClient,
266293
executable_tx: AccountTransaction,
267-
runtime: tokio::runtime::Handle,
268294
}
269295

270296
impl ProcessTxBlockingTask {
271-
pub async fn new(
272-
gateway: &Gateway,
273-
executable_tx: AccountTransaction,
274-
runtime: tokio::runtime::Handle,
275-
) -> GatewayResult<Self> {
297+
pub async fn new(gateway: &Gateway, executable_tx: AccountTransaction) -> GatewayResult<Self> {
276298
Ok(Self {
277299
state_reader_factory: gateway.state_reader_factory.clone(),
278300
stateful_tx_validator_factory: gateway.stateful_tx_validator_factory.clone(),
279-
mempool_client: gateway.mempool_client.clone(),
280301
executable_tx,
281-
runtime,
282302
})
283303
}
284304

@@ -317,12 +337,12 @@ impl ProcessTxBlockingTask {
317337
self,
318338
account_nonce: Nonce,
319339
mut stateful_tx_validator: Box<dyn StatefulTransactionValidatorTrait>,
340+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
320341
) -> GatewayResult<Nonce> {
321342
let nonce = stateful_tx_validator.extract_state_nonce_and_run_validations(
322343
&self.executable_tx,
323344
account_nonce,
324-
self.mempool_client,
325-
self.runtime,
345+
is_account_tx_in_mempool,
326346
)?;
327347

328348
Ok(nonce)

crates/apollo_gateway/src/gateway_test.rs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,14 @@ fn mock_stateless_transaction_validator() -> MockStatelessTransactionValidatorTr
118118
mock_stateless_transaction_validator
119119
}
120120

121+
#[fixture]
122+
fn default_mock_mempool_client() -> MockMempoolClient {
123+
let mut mock_mempool_client = MockMempoolClient::new();
124+
// Gateway tests don't require the mempool client to be checked, any value is fine.
125+
mock_mempool_client.expect_account_tx_in_pool_or_recent_block().returning(|_| Ok(false));
126+
mock_mempool_client
127+
}
128+
121129
#[fixture]
122130
fn mock_dependencies() -> MockDependencies {
123131
let config = GatewayConfig {
@@ -130,7 +138,7 @@ fn mock_dependencies() -> MockDependencies {
130138
};
131139
let state_reader_factory =
132140
local_test_state_reader_factory(CairoVersion::Cairo1(RunnableCairo1::Casm), true);
133-
let mock_mempool_client = MockMempoolClient::new();
141+
let mock_mempool_client = default_mock_mempool_client();
134142
let mock_transaction_converter = MockTransactionConverterTrait::new();
135143
let mock_stateless_transaction_validator = mock_stateless_transaction_validator();
136144
MockDependencies {
@@ -318,13 +326,12 @@ async fn run_add_tx_and_extract_metrics(
318326
async fn process_tx_task(
319327
stateful_tx_validator_factory: MockStatefulTransactionValidatorFactoryTrait,
320328
) -> ProcessTxBlockingTask {
321-
let state_reader_factory = Arc::new(MockStateReaderFactory::new());
329+
let state_reader_factory: Arc<dyn crate::state_reader::StateReaderFactory> =
330+
Arc::new(MockStateReaderFactory::new());
322331
ProcessTxBlockingTask {
323332
state_reader_factory,
324333
stateful_tx_validator_factory: Arc::new(stateful_tx_validator_factory),
325-
mempool_client: Arc::new(MockMempoolClient::new()),
326334
executable_tx: executable_invoke_tx(invoke_args()),
327-
runtime: tokio::runtime::Handle::current(),
328335
}
329336
}
330337

@@ -562,12 +569,16 @@ async fn process_tx_returns_error_when_extract_state_nonce_and_run_validations_f
562569

563570
mock_stateful_transaction_validator
564571
.expect_extract_state_nonce_and_run_validations()
565-
.return_once(|_, _, _, _| Err(expected_error));
572+
.return_once(|_, _, _| Err(expected_error));
566573

567574
let process_tx_task = process_tx_task(mock_stateful_transaction_validator_factory).await;
568-
let account_nonce = nonce!(0);
575+
569576
let result = tokio::task::spawn_blocking(move || {
570-
process_tx_task.process_tx(account_nonce, Box::new(mock_stateful_transaction_validator))
577+
process_tx_task.process_tx(
578+
nonce!(0),
579+
Box::new(mock_stateful_transaction_validator),
580+
Ok(false),
581+
)
571582
})
572583
.await
573584
.unwrap();
@@ -603,21 +614,22 @@ async fn add_tx_returns_error_when_instantiating_validator_fails(
603614
mut mock_dependencies: MockDependencies,
604615
mut mock_stateful_transaction_validator_factory: MockStatefulTransactionValidatorFactoryTrait,
605616
) {
606-
// Prepare transaction conversion to reach instantiation.
607617
let tx_args = invoke_args();
608618
setup_transaction_converter_mock(&mut mock_dependencies.mock_transaction_converter, &tx_args);
609619

610-
// Fail validator instantiation.
611620
let error_code = StarknetErrorCode::UnknownErrorCode("StarknetErrorCode.InternalError".into());
612621
let expected_error = StarknetError { code: error_code.clone(), message: "placeholder".into() };
613622
mock_stateful_transaction_validator_factory
614623
.expect_get_state_reader_for_validation()
615624
.return_once(|_| Box::pin(async { Err::<_, _>(expected_error) }));
616625

617-
// Build gateway and inject the failing factory.
618626
let mut gateway = mock_dependencies.gateway();
619627
gateway.stateful_tx_validator_factory = Arc::new(mock_stateful_transaction_validator_factory);
620628

621629
let err = gateway.add_tx(tx_args.get_rpc_tx(), p2p_message_metadata()).await.unwrap_err();
622630
assert_eq!(err.code, error_code);
623631
}
632+
633+
// With current implementation, validator instantiation occurs inside ProcessTxBlockingTask::new()
634+
// and unwraps on error. Therefore, add_tx will panic if instantiation fails.
635+
// Gateway should return an error (not panic) when validator instantiation fails.

crates/apollo_gateway/src/stateful_transaction_validator.rs

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use apollo_gateway_types::deprecated_gateway_error::{
55
StarknetErrorCode,
66
};
77
use apollo_gateway_types::errors::GatewaySpecError;
8-
use apollo_mempool_types::communication::SharedMempoolClient;
8+
use apollo_mempool_types::communication::MempoolClientError;
99
use apollo_proc_macros::sequencer_latency_histogram;
1010
use async_trait::async_trait;
1111
use blockifier::blockifier::stateful_validator::{
@@ -147,8 +147,7 @@ pub trait StatefulTransactionValidatorTrait: Send {
147147
&mut self,
148148
executable_tx: &ExecutableTransaction,
149149
account_nonce: Nonce,
150-
mempool_client: SharedMempoolClient,
151-
runtime: tokio::runtime::Handle,
150+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
152151
) -> StatefulTransactionValidatorResult<Nonce>;
153152

154153
fn get_nonce(
@@ -169,10 +168,9 @@ impl<B: BlockifierStatefulValidatorTrait + Send> StatefulTransactionValidatorTra
169168
&mut self,
170169
executable_tx: &ExecutableTransaction,
171170
account_nonce: Nonce,
172-
mempool_client: SharedMempoolClient,
173-
runtime: tokio::runtime::Handle,
171+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
174172
) -> StatefulTransactionValidatorResult<Nonce> {
175-
self.run_transaction_validations(executable_tx, account_nonce, mempool_client, runtime)?;
173+
self.run_transaction_validations(executable_tx, account_nonce, is_account_tx_in_mempool)?;
176174
Ok(account_nonce)
177175
}
178176

@@ -191,11 +189,10 @@ impl<B: BlockifierStatefulValidatorTrait + Send> StatefulTransactionValidator<B>
191189
&mut self,
192190
executable_tx: &ExecutableTransaction,
193191
account_nonce: Nonce,
194-
mempool_client: SharedMempoolClient,
195-
runtime: tokio::runtime::Handle,
192+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
196193
) -> StatefulTransactionValidatorResult<()> {
197194
self.validate_state_preconditions(executable_tx, account_nonce)?;
198-
self.run_validate_entry_point(executable_tx, account_nonce, mempool_client, runtime)?;
195+
self.run_validate_entry_point(executable_tx, account_nonce, is_account_tx_in_mempool)?;
199196
Ok(())
200197
}
201198

@@ -292,11 +289,10 @@ impl<B: BlockifierStatefulValidatorTrait + Send> StatefulTransactionValidator<B>
292289
&mut self,
293290
executable_tx: &ExecutableTransaction,
294291
account_nonce: Nonce,
295-
mempool_client: SharedMempoolClient,
296-
runtime: tokio::runtime::Handle,
292+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
297293
) -> StatefulTransactionValidatorResult<()> {
298294
let skip_validate =
299-
skip_stateful_validations(executable_tx, account_nonce, mempool_client, runtime)?;
295+
skip_stateful_validations(executable_tx, account_nonce, is_account_tx_in_mempool)?;
300296
let only_query = false;
301297
let charge_fee = enforce_fee(executable_tx, only_query);
302298
let strict_nonce_check = false;
@@ -352,8 +348,7 @@ impl<B: BlockifierStatefulValidatorTrait + Send> StatefulTransactionValidator<B>
352348
fn skip_stateful_validations(
353349
tx: &ExecutableTransaction,
354350
account_nonce: Nonce,
355-
mempool_client: SharedMempoolClient,
356-
runtime: tokio::runtime::Handle,
351+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
357352
) -> StatefulTransactionValidatorResult<bool> {
358353
if let ExecutableTransaction::Invoke(ExecutableInvokeTransaction { tx, .. }) = tx {
359354
// check if the transaction nonce is 1, meaning it is post deploy_account, and the
@@ -365,8 +360,7 @@ fn skip_stateful_validations(
365360
// to check if the account exists in the mempool since it means that either it has a
366361
// deploy_account transaction or transactions with future nonces that passed
367362
// validations.
368-
return runtime
369-
.block_on(mempool_client.account_tx_in_pool_or_recent_block(tx.sender_address()))
363+
return is_account_tx_in_mempool
370364
.map_err(|err| mempool_client_err_to_deprecated_gw_err(&tx.signature(), err))
371365
.inspect(|exists| {
372366
if *exists {

0 commit comments

Comments
 (0)