Skip to content

Commit f8cf519

Browse files
apollo_gateway: extract async mempool query from blocking task
1 parent c58285c commit f8cf519

File tree

4 files changed

+104
-110
lines changed

4 files changed

+104
-110
lines changed

crates/apollo_gateway/src/gateway.rs

Lines changed: 47 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@ use apollo_gateway_types::gateway_types::{
1919
InvokeGatewayOutput,
2020
};
2121
use apollo_infra::component_definitions::ComponentStarter;
22-
use apollo_mempool_types::communication::{AddTransactionArgsWrapper, SharedMempoolClient};
22+
use apollo_mempool_types::communication::{
23+
AddTransactionArgsWrapper,
24+
MempoolClientError,
25+
SharedMempoolClient,
26+
};
2327
use apollo_mempool_types::mempool_types::AddTransactionArgs;
2428
use apollo_network_types::network_types::BroadcastedMessageMetadata;
2529
use apollo_proc_macros::sequencer_latency_histogram;
@@ -33,6 +37,7 @@ use starknet_api::rpc_transaction::{
3337
RpcDeclareTransaction,
3438
RpcTransaction,
3539
};
40+
use starknet_types_core::felt::Felt;
3641
use tracing::{debug, info, warn, Span};
3742

3843
use crate::errors::{
@@ -152,32 +157,31 @@ impl Gateway {
152157
})?;
153158

154159
let mut blocking_task =
155-
ProcessTxBlockingTask::new(self, executable_tx, tokio::runtime::Handle::current())
156-
.map_err(|e| {
157-
info!(
158-
"Gateway validation failed for tx with signature: {:?} with error: {}",
159-
tx.signature(),
160-
e
161-
);
162-
metric_counters.record_add_tx_failure(&e);
160+
ProcessTxBlockingTask::new(self, executable_tx, tokio::runtime::Handle::current())
161+
.map_err(|e| {
162+
info!(
163+
"Gateway validation failed for tx with signature: {:?} with error: {}",
164+
tx.signature(),
163165
e
164-
})?;
165-
166-
// Obtain account nonce before spawning the blocking computation.
167-
let account_nonce = blocking_task.get_nonce().map_err(|e| {
168-
info!(
169-
"Gateway validation failed for tx with signature: {:?} with error: {}",
170-
tx.signature(),
166+
);
167+
metric_counters.record_add_tx_failure(&e);
171168
e
172-
);
173-
metric_counters.record_add_tx_failure(&e);
174-
e
175-
})?;
169+
})?;
170+
171+
let account_nonce = blocking_task.get_nonce()?;
172+
173+
let is_account_tx_in_mempool =
174+
if should_query_mempool_for_account_tx(&executable_tx, account_nonce) {
175+
self.mempool_client
176+
.account_tx_in_pool_or_recent_block(executable_tx.contract_address())
177+
.await
178+
} else {
179+
Ok(false)
180+
};
176181

177-
// Run the blocking task in the current span.
178182
let curr_span = Span::current();
179183
let handle = tokio::task::spawn_blocking(move || {
180-
curr_span.in_scope(|| blocking_task.process_tx(account_nonce))
184+
curr_span.in_scope(|| blocking_task.process_tx(account_nonce, is_account_tx_in_mempool))
181185
});
182186
let handle_result = handle.await;
183187
let nonce = match handle_result {
@@ -253,43 +257,47 @@ impl Gateway {
253257
}
254258
}
255259

260+
// Returns true if we should prefetch mempool data to potentially skip stateful validations:
261+
// applies only to Invoke with nonce == 1 and when the account nonce is zero (pre-deploy state).
262+
fn should_query_mempool_for_account_tx(
263+
executable_tx: &AccountTransaction,
264+
account_nonce: Nonce,
265+
) -> bool {
266+
matches!(executable_tx, AccountTransaction::Invoke(_))
267+
&& executable_tx.nonce() == Nonce(Felt::ONE)
268+
&& account_nonce == Nonce(Felt::ZERO)
269+
}
270+
256271
/// CPU-intensive transaction processing, spawned in a blocking thread to avoid blocking other tasks
257272
/// from running.
258273
struct ProcessTxBlockingTask {
259274
stateful_tx_validator: Box<dyn StatefulTransactionValidatorTrait + Send>,
260-
mempool_client: SharedMempoolClient,
261275
executable_tx: AccountTransaction,
262-
runtime: tokio::runtime::Handle,
263276
}
264277

265278
impl ProcessTxBlockingTask {
266-
pub fn new(
267-
gateway: &Gateway,
268-
executable_tx: AccountTransaction,
269-
runtime: tokio::runtime::Handle,
270-
) -> GatewayResult<Self> {
279+
pub fn new(gateway: &Gateway, executable_tx: AccountTransaction) -> GatewayResult<Self> {
271280
let stateful_tx_validator = gateway
272281
.stateful_tx_validator_factory
273282
.instantiate_validator(gateway.state_reader_factory.as_ref())?;
274-
Ok(Self {
275-
stateful_tx_validator,
276-
mempool_client: gateway.mempool_client.clone(),
277-
executable_tx,
278-
runtime,
279-
})
283+
Ok(Self { stateful_tx_validator, executable_tx })
280284
}
281285

282286
fn get_nonce(&mut self) -> GatewayResult<Nonce> {
283287
let address = self.executable_tx.contract_address();
284-
self.stateful_tx_validator.get_nonce(address)
288+
let nonce = self.stateful_tx_validator.get_nonce(address)?;
289+
Ok(nonce)
285290
}
286291

287-
fn process_tx(mut self, account_nonce: Nonce) -> GatewayResult<Nonce> {
292+
fn process_tx(
293+
mut self,
294+
account_nonce: Nonce,
295+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
296+
) -> GatewayResult<Nonce> {
288297
let nonce = self.stateful_tx_validator.extract_state_nonce_and_run_validations(
289298
&self.executable_tx,
290299
account_nonce,
291-
self.mempool_client,
292-
self.runtime,
300+
is_account_tx_in_mempool,
293301
)?;
294302

295303
Ok(nonce)

crates/apollo_gateway/src/gateway_test.rs

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,14 @@ fn mock_stateless_transaction_validator() -> MockStatelessTransactionValidatorTr
118118
mock_stateless_transaction_validator
119119
}
120120

121+
#[fixture]
122+
fn default_mock_mempool_client() -> MockMempoolClient {
123+
let mut mock_mempool_client = MockMempoolClient::new();
124+
// Gateway tests don't require the mempool client to be checked, any value is fine.
125+
mock_mempool_client.expect_account_tx_in_pool_or_recent_block().returning(|_| Ok(false));
126+
mock_mempool_client
127+
}
128+
121129
#[fixture]
122130
fn mock_dependencies() -> MockDependencies {
123131
let config = GatewayConfig {
@@ -129,7 +137,7 @@ fn mock_dependencies() -> MockDependencies {
129137
};
130138
let state_reader_factory =
131139
local_test_state_reader_factory(CairoVersion::Cairo1(RunnableCairo1::Casm), true);
132-
let mock_mempool_client = MockMempoolClient::new();
140+
let mock_mempool_client = default_mock_mempool_client();
133141
let mock_transaction_converter = MockTransactionConverterTrait::new();
134142
let mock_stateless_transaction_validator = mock_stateless_transaction_validator();
135143
MockDependencies {
@@ -318,16 +326,12 @@ fn process_tx_task(
318326
stateful_transaction_validator_factory: MockStatefulTransactionValidatorFactoryTrait,
319327
) -> ProcessTxBlockingTask {
320328
let state_reader_factory = Arc::new(MockStateReaderFactory::new());
321-
let stateful_tx_validator = StatefulTransactionValidatorFactoryTrait::instantiate_validator(
322-
&stateful_transaction_validator_factory,
323-
state_reader_factory.as_ref(),
324-
)
325-
.expect("instantiate_validator should be mocked in tests");
329+
let stateful_tx_validator = stateful_transaction_validator_factory
330+
.instantiate_validator(state_reader_factory.as_ref())
331+
.expect("instantiate_validator should be mocked in tests");
326332
ProcessTxBlockingTask {
327333
stateful_tx_validator,
328-
mempool_client: Arc::new(MockMempoolClient::new()),
329334
executable_tx: executable_invoke_tx(invoke_args()),
330-
runtime: tokio::runtime::Handle::current(),
331335
}
332336
}
333337

@@ -572,10 +576,11 @@ async fn process_tx_returns_error_when_extract_state_nonce_and_run_validations_f
572576
.return_once(|_| Ok(Box::new(mock_stateful_transaction_validator)));
573577

574578
let process_tx_task = process_tx_task(mock_stateful_transaction_validator_factory);
575-
let account_nonce = nonce!(0);
576-
let result = tokio::task::spawn_blocking(move || process_tx_task.process_tx(account_nonce))
577-
.await
578-
.unwrap();
579+
580+
let result =
581+
tokio::task::spawn_blocking(move || process_tx_task.process_tx(nonce!(0), Ok(false)))
582+
.await
583+
.unwrap();
579584

580585
assert!(result.is_err());
581586
assert_eq!(result.unwrap_err().code, error_code);
@@ -608,21 +613,22 @@ async fn add_tx_returns_error_when_instantiating_validator_fails(
608613
mut mock_dependencies: MockDependencies,
609614
mut mock_stateful_transaction_validator_factory: MockStatefulTransactionValidatorFactoryTrait,
610615
) {
611-
// Prepare transaction conversion to reach instantiation.
612616
let tx_args = invoke_args();
613617
setup_transaction_converter_mock(&mut mock_dependencies.mock_transaction_converter, &tx_args);
614618

615-
// Fail validator instantiation.
616619
let error_code = StarknetErrorCode::UnknownErrorCode("StarknetErrorCode.InternalError".into());
617620
let expected_error = StarknetError { code: error_code.clone(), message: "placeholder".into() };
618621
mock_stateful_transaction_validator_factory
619622
.expect_instantiate_validator()
620623
.return_once(|_| Err(expected_error));
621624

622-
// Build gateway and inject the failing factory.
623625
let mut gateway = mock_dependencies.gateway();
624626
gateway.stateful_tx_validator_factory = Arc::new(mock_stateful_transaction_validator_factory);
625627

626628
let err = gateway.add_tx(tx_args.get_rpc_tx(), p2p_message_metadata()).await.unwrap_err();
627629
assert_eq!(err.code, error_code);
628630
}
631+
632+
// With current implementation, validator instantiation occurs inside ProcessTxBlockingTask::new()
633+
// and unwraps on error. Therefore, add_tx will panic if instantiation fails.
634+
// Gateway should return an error (not panic) when validator instantiation fails.

crates/apollo_gateway/src/stateful_transaction_validator.rs

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use apollo_gateway_types::deprecated_gateway_error::{
55
StarknetErrorCode,
66
};
77
use apollo_gateway_types::errors::GatewaySpecError;
8-
use apollo_mempool_types::communication::SharedMempoolClient;
8+
use apollo_mempool_types::communication::MempoolClientError;
99
use apollo_proc_macros::sequencer_latency_histogram;
1010
use blockifier::blockifier::stateful_validator::{
1111
StatefulValidator,
@@ -104,8 +104,12 @@ pub trait StatefulTransactionValidatorTrait: Send {
104104
&mut self,
105105
executable_tx: &ExecutableTransaction,
106106
account_nonce: Nonce,
107-
mempool_client: SharedMempoolClient,
108-
runtime: tokio::runtime::Handle,
107+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
108+
) -> StatefulTransactionValidatorResult<Nonce>;
109+
110+
fn get_nonce(
111+
&mut self,
112+
account_address: ContractAddress,
109113
) -> StatefulTransactionValidatorResult<Nonce>;
110114

111115
fn get_nonce(
@@ -126,10 +130,9 @@ impl<B: BlockifierStatefulValidatorTrait + Send> StatefulTransactionValidatorTra
126130
&mut self,
127131
executable_tx: &ExecutableTransaction,
128132
account_nonce: Nonce,
129-
mempool_client: SharedMempoolClient,
130-
runtime: tokio::runtime::Handle,
133+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
131134
) -> StatefulTransactionValidatorResult<Nonce> {
132-
self.run_transaction_validations(executable_tx, account_nonce, mempool_client, runtime)?;
135+
self.run_transaction_validations(executable_tx, account_nonce, is_account_tx_in_mempool)?;
133136
Ok(account_nonce)
134137
}
135138

@@ -148,11 +151,10 @@ impl<B: BlockifierStatefulValidatorTrait + Send> StatefulTransactionValidator<B>
148151
&mut self,
149152
executable_tx: &ExecutableTransaction,
150153
account_nonce: Nonce,
151-
mempool_client: SharedMempoolClient,
152-
runtime: tokio::runtime::Handle,
154+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
153155
) -> StatefulTransactionValidatorResult<()> {
154156
self.validate_state_preconditions(executable_tx, account_nonce)?;
155-
self.run_validate_entry_point(executable_tx, account_nonce, mempool_client, runtime)?;
157+
self.run_validate_entry_point(executable_tx, account_nonce, is_account_tx_in_mempool)?;
156158
Ok(())
157159
}
158160

@@ -215,11 +217,10 @@ impl<B: BlockifierStatefulValidatorTrait + Send> StatefulTransactionValidator<B>
215217
&mut self,
216218
executable_tx: &ExecutableTransaction,
217219
account_nonce: Nonce,
218-
mempool_client: SharedMempoolClient,
219-
runtime: tokio::runtime::Handle,
220+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
220221
) -> StatefulTransactionValidatorResult<()> {
221222
let skip_validate =
222-
skip_stateful_validations(executable_tx, account_nonce, mempool_client, runtime)?;
223+
skip_stateful_validations(executable_tx, account_nonce, is_account_tx_in_mempool)?;
223224
let only_query = false;
224225
let charge_fee = enforce_fee(executable_tx, only_query);
225226
let strict_nonce_check = false;
@@ -290,8 +291,7 @@ impl<B: BlockifierStatefulValidatorTrait + Send> StatefulTransactionValidator<B>
290291
fn skip_stateful_validations(
291292
tx: &ExecutableTransaction,
292293
account_nonce: Nonce,
293-
mempool_client: SharedMempoolClient,
294-
runtime: tokio::runtime::Handle,
294+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
295295
) -> StatefulTransactionValidatorResult<bool> {
296296
if let ExecutableTransaction::Invoke(ExecutableInvokeTransaction { tx, .. }) = tx {
297297
// check if the transaction nonce is 1, meaning it is post deploy_account, and the
@@ -303,8 +303,7 @@ fn skip_stateful_validations(
303303
// to check if the account exists in the mempool since it means that either it has a
304304
// deploy_account transaction or transactions with future nonces that passed
305305
// validations.
306-
return runtime
307-
.block_on(mempool_client.account_tx_in_pool_or_recent_block(tx.sender_address()))
306+
return is_account_tx_in_mempool
308307
.map_err(|err| mempool_client_err_to_deprecated_gw_err(&tx.signature(), err))
309308
.inspect(|exists| {
310309
if *exists {

0 commit comments

Comments
 (0)