Skip to content

Commit 14142cc

Browse files
apollo_gateway: extract async mempool query from blocking task
1 parent 677edc2 commit 14142cc

File tree

4 files changed

+112
-99
lines changed

4 files changed

+112
-99
lines changed

crates/apollo_gateway/src/gateway.rs

Lines changed: 62 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@ use apollo_gateway_types::gateway_types::{
1919
InvokeGatewayOutput,
2020
};
2121
use apollo_infra::component_definitions::ComponentStarter;
22-
use apollo_mempool_types::communication::{AddTransactionArgsWrapper, SharedMempoolClient};
22+
use apollo_mempool_types::communication::{
23+
AddTransactionArgsWrapper,
24+
MempoolClientError,
25+
SharedMempoolClient,
26+
};
2327
use apollo_mempool_types::mempool_types::AddTransactionArgs;
2428
use apollo_network_types::network_types::BroadcastedMessageMetadata;
2529
use apollo_proc_macros::sequencer_latency_histogram;
@@ -33,6 +37,7 @@ use starknet_api::rpc_transaction::{
3337
RpcDeclareTransaction,
3438
RpcTransaction,
3539
};
40+
use starknet_types_core::felt::Felt;
3641
use tracing::{debug, info, warn, Span};
3742

3843
use crate::errors::{
@@ -151,32 +156,49 @@ impl Gateway {
151156
transaction_converter_err_to_deprecated_gw_err(&tx_signature, e)
152157
})?;
153158

154-
let mut blocking_task =
155-
ProcessTxBlockingTask::new(self, executable_tx, tokio::runtime::Handle::current())
156-
.await
157-
.map_err(|e| {
158-
info!(
159-
"Gateway validation failed for tx with signature: {:?} with error: {}",
160-
&tx_signature, e
161-
);
162-
metric_counters.record_add_tx_failure(&e);
163-
e
164-
})?;
159+
let mut blocking_task = ProcessTxBlockingTask::new(
160+
self,
161+
executable_tx.clone(),
162+
tokio::runtime::Handle::current(),
163+
)
164+
.await
165+
.map_err(|e| {
166+
info!(
167+
"Gateway validation failed for tx with signature: {:?} with error: {}",
168+
&tx_signature, e
169+
);
170+
metric_counters.record_add_tx_failure(&e);
171+
e
172+
})?;
165173

166174
let state_reader = blocking_task.get_state_reader().await?;
167175

168176
let account_address = blocking_task.executable_tx.contract_address();
169-
let account_nonce =
170-
state_reader.get_account_nonce(account_address).await.map_err(|e| {
171-
StarknetError::internal_with_logging("Failed to get account nonce", e)
172-
})?;
177+
let account_nonce = state_reader
178+
.get_account_nonce(account_address)
179+
.await
180+
.map_err(|e| StarknetError::internal_with_logging("Failed to get account nonce", e))?;
181+
182+
let is_account_tx_in_mempool =
183+
if should_query_mempool_for_account_tx(&executable_tx, account_nonce) {
184+
self.mempool_client
185+
.account_tx_in_pool_or_recent_block(executable_tx.contract_address())
186+
.await
187+
} else {
188+
Ok(false)
189+
};
173190

174191
let stateful_tx_validator = blocking_task.create_stateful_validator(state_reader).await?;
175192

176-
// Run the blocking task in the current span.
177193
let curr_span = Span::current();
178194
let handle = tokio::task::spawn_blocking(move || {
179-
curr_span.in_scope(|| blocking_task.process_tx(account_nonce, stateful_tx_validator))
195+
curr_span.in_scope(|| {
196+
blocking_task.process_tx(
197+
account_nonce,
198+
stateful_tx_validator,
199+
is_account_tx_in_mempool,
200+
)
201+
})
180202
});
181203
let handle_result = handle.await;
182204
let nonce = match handle_result {
@@ -249,12 +271,22 @@ impl Gateway {
249271
}
250272
}
251273

274+
// Returns true if we should prefetch mempool data to potentially skip stateful validations:
275+
// applies only to Invoke with nonce == 1 and when the account nonce is zero (pre-deploy state).
276+
fn should_query_mempool_for_account_tx(
277+
executable_tx: &AccountTransaction,
278+
account_nonce: Nonce,
279+
) -> bool {
280+
matches!(executable_tx, AccountTransaction::Invoke(_))
281+
&& executable_tx.nonce() == Nonce(Felt::ONE)
282+
&& account_nonce == Nonce(Felt::ZERO)
283+
}
284+
252285
/// CPU-intensive transaction processing, spawned in a blocking thread to avoid blocking other tasks
253286
/// from running.
254287
struct ProcessTxBlockingTask {
255288
state_reader_factory: Arc<dyn StateReaderFactory>,
256289
stateful_tx_validator_factory: Arc<dyn StatefulTransactionValidatorFactoryTrait>,
257-
mempool_client: SharedMempoolClient,
258290
executable_tx: AccountTransaction,
259291
runtime: tokio::runtime::Handle,
260292
}
@@ -268,23 +300,22 @@ impl ProcessTxBlockingTask {
268300
Ok(Self {
269301
state_reader_factory: gateway.state_reader_factory.clone(),
270302
stateful_tx_validator_factory: gateway.stateful_tx_validator_factory.clone(),
271-
mempool_client: gateway.mempool_client.clone(),
272303
executable_tx,
273304
runtime,
274305
})
275306
}
276307

277308
pub async fn get_state_reader(&self) -> GatewayResult<Box<dyn MempoolStateReader>> {
278309
let state_reader = self
279-
.stateful_tx_validator_factory
280-
.get_state_reader_for_validation(self.state_reader_factory.as_ref())
281-
.await
282-
.map_err(|e| {
283-
StarknetError::internal_with_logging(
284-
"Failed to get state reader from latest block",
285-
e,
286-
)
287-
})?;
310+
.stateful_tx_validator_factory
311+
.get_state_reader_for_validation(self.state_reader_factory.as_ref())
312+
.await
313+
.map_err(|e| {
314+
StarknetError::internal_with_logging(
315+
"Failed to get state reader from latest block",
316+
e,
317+
)
318+
})?;
288319
Ok(state_reader)
289320
}
290321

@@ -307,12 +338,12 @@ impl ProcessTxBlockingTask {
307338
self,
308339
account_nonce: Nonce,
309340
mut stateful_tx_validator: Box<dyn StatefulTransactionValidatorTrait>,
341+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
310342
) -> GatewayResult<Nonce> {
311343
let nonce = stateful_tx_validator.extract_state_nonce_and_run_validations(
312344
&self.executable_tx,
313345
account_nonce,
314-
self.mempool_client,
315-
self.runtime,
346+
is_account_tx_in_mempool,
316347
)?;
317348

318349
Ok(nonce)

crates/apollo_gateway/src/gateway_test.rs

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,14 @@ fn mock_stateless_transaction_validator() -> MockStatelessTransactionValidatorTr
117117
mock_stateless_transaction_validator
118118
}
119119

120+
#[fixture]
121+
fn default_mock_mempool_client() -> MockMempoolClient {
122+
let mut mock_mempool_client = MockMempoolClient::new();
123+
// Gateway tests don't require the mempool client to be checked, any value is fine.
124+
mock_mempool_client.expect_account_tx_in_pool_or_recent_block().returning(|_| Ok(false));
125+
mock_mempool_client
126+
}
127+
120128
#[fixture]
121129
fn mock_dependencies() -> MockDependencies {
122130
let config = GatewayConfig {
@@ -128,7 +136,7 @@ fn mock_dependencies() -> MockDependencies {
128136
};
129137
let state_reader_factory =
130138
local_test_state_reader_factory(CairoVersion::Cairo1(RunnableCairo1::Casm), true);
131-
let mock_mempool_client = MockMempoolClient::new();
139+
let mock_mempool_client = default_mock_mempool_client();
132140
let mock_transaction_converter = MockTransactionConverterTrait::new();
133141
let mock_stateless_transaction_validator = mock_stateless_transaction_validator();
134142
MockDependencies {
@@ -316,11 +324,11 @@ async fn run_add_tx_and_extract_metrics(
316324
async fn process_tx_task(
317325
stateful_tx_validator_factory: MockStatefulTransactionValidatorFactoryTrait,
318326
) -> ProcessTxBlockingTask {
319-
let state_reader_factory = Arc::new(MockStateReaderFactory::new());
327+
let state_reader_factory: Arc<dyn crate::state_reader::StateReaderFactory> =
328+
Arc::new(MockStateReaderFactory::new());
320329
ProcessTxBlockingTask {
321330
state_reader_factory,
322331
stateful_tx_validator_factory: Arc::new(stateful_tx_validator_factory),
323-
mempool_client: Arc::new(MockMempoolClient::new()),
324332
executable_tx: executable_invoke_tx(invoke_args()),
325333
runtime: tokio::runtime::Handle::current(),
326334
}
@@ -560,12 +568,16 @@ async fn process_tx_returns_error_when_extract_state_nonce_and_run_validations_f
560568

561569
mock_stateful_transaction_validator
562570
.expect_extract_state_nonce_and_run_validations()
563-
.return_once(|_, _, _, _| Err(expected_error));
571+
.return_once(|_, _, _| Err(expected_error));
564572

565573
let process_tx_task = process_tx_task(mock_stateful_transaction_validator_factory).await;
566-
let account_nonce = nonce!(0);
574+
567575
let result = tokio::task::spawn_blocking(move || {
568-
process_tx_task.process_tx(account_nonce, Box::new(mock_stateful_transaction_validator))
576+
process_tx_task.process_tx(
577+
nonce!(0),
578+
Box::new(mock_stateful_transaction_validator),
579+
Ok(false),
580+
)
569581
})
570582
.await
571583
.unwrap();
@@ -601,21 +613,22 @@ async fn add_tx_returns_error_when_instantiating_validator_fails(
601613
mut mock_dependencies: MockDependencies,
602614
mut mock_stateful_transaction_validator_factory: MockStatefulTransactionValidatorFactoryTrait,
603615
) {
604-
// Prepare transaction conversion to reach instantiation.
605616
let tx_args = invoke_args();
606617
setup_transaction_converter_mock(&mut mock_dependencies.mock_transaction_converter, &tx_args);
607618

608-
// Fail validator instantiation.
609619
let error_code = StarknetErrorCode::UnknownErrorCode("StarknetErrorCode.InternalError".into());
610620
let expected_error = StarknetError { code: error_code.clone(), message: "placeholder".into() };
611621
mock_stateful_transaction_validator_factory
612622
.expect_get_state_reader_for_validation()
613623
.return_once(|_| Box::pin(async { Err::<_, _>(expected_error) }));
614624

615-
// Build gateway and inject the failing factory.
616625
let mut gateway = mock_dependencies.gateway();
617626
gateway.stateful_tx_validator_factory = Arc::new(mock_stateful_transaction_validator_factory);
618627

619628
let err = gateway.add_tx(tx_args.get_rpc_tx(), p2p_message_metadata()).await.unwrap_err();
620629
assert_eq!(err.code, error_code);
621630
}
631+
632+
// With current implementation, validator instantiation occurs inside ProcessTxBlockingTask::new()
633+
// and unwraps on error. Therefore, add_tx will panic if instantiation fails.
634+
// Gateway should return an error (not panic) when validator instantiation fails.

crates/apollo_gateway/src/stateful_transaction_validator.rs

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use apollo_gateway_types::deprecated_gateway_error::{
55
StarknetErrorCode,
66
};
77
use apollo_gateway_types::errors::GatewaySpecError;
8-
use apollo_mempool_types::communication::SharedMempoolClient;
8+
use apollo_mempool_types::communication::MempoolClientError;
99
use apollo_proc_macros::sequencer_latency_histogram;
1010
use async_trait::async_trait;
1111
use blockifier::blockifier::stateful_validator::{
@@ -134,8 +134,7 @@ pub trait StatefulTransactionValidatorTrait: Send {
134134
&mut self,
135135
executable_tx: &ExecutableTransaction,
136136
account_nonce: Nonce,
137-
mempool_client: SharedMempoolClient,
138-
runtime: tokio::runtime::Handle,
137+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
139138
) -> StatefulTransactionValidatorResult<Nonce>;
140139

141140
fn get_nonce(
@@ -156,10 +155,9 @@ impl<B: BlockifierStatefulValidatorTrait + Send> StatefulTransactionValidatorTra
156155
&mut self,
157156
executable_tx: &ExecutableTransaction,
158157
account_nonce: Nonce,
159-
mempool_client: SharedMempoolClient,
160-
runtime: tokio::runtime::Handle,
158+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
161159
) -> StatefulTransactionValidatorResult<Nonce> {
162-
self.run_transaction_validations(executable_tx, account_nonce, mempool_client, runtime)?;
160+
self.run_transaction_validations(executable_tx, account_nonce, is_account_tx_in_mempool)?;
163161
Ok(account_nonce)
164162
}
165163

@@ -178,11 +176,10 @@ impl<B: BlockifierStatefulValidatorTrait + Send> StatefulTransactionValidator<B>
178176
&mut self,
179177
executable_tx: &ExecutableTransaction,
180178
account_nonce: Nonce,
181-
mempool_client: SharedMempoolClient,
182-
runtime: tokio::runtime::Handle,
179+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
183180
) -> StatefulTransactionValidatorResult<()> {
184181
self.validate_state_preconditions(executable_tx, account_nonce)?;
185-
self.run_validate_entry_point(executable_tx, account_nonce, mempool_client, runtime)?;
182+
self.run_validate_entry_point(executable_tx, account_nonce, is_account_tx_in_mempool)?;
186183
Ok(())
187184
}
188185

@@ -253,11 +250,10 @@ impl<B: BlockifierStatefulValidatorTrait + Send> StatefulTransactionValidator<B>
253250
&mut self,
254251
executable_tx: &ExecutableTransaction,
255252
account_nonce: Nonce,
256-
mempool_client: SharedMempoolClient,
257-
runtime: tokio::runtime::Handle,
253+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
258254
) -> StatefulTransactionValidatorResult<()> {
259255
let skip_validate =
260-
skip_stateful_validations(executable_tx, account_nonce, mempool_client, runtime)?;
256+
skip_stateful_validations(executable_tx, account_nonce, is_account_tx_in_mempool)?;
261257
let only_query = false;
262258
let charge_fee = enforce_fee(executable_tx, only_query);
263259
let strict_nonce_check = false;
@@ -335,8 +331,7 @@ impl<B: BlockifierStatefulValidatorTrait + Send> StatefulTransactionValidator<B>
335331
fn skip_stateful_validations(
336332
tx: &ExecutableTransaction,
337333
account_nonce: Nonce,
338-
mempool_client: SharedMempoolClient,
339-
runtime: tokio::runtime::Handle,
334+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
340335
) -> StatefulTransactionValidatorResult<bool> {
341336
if let ExecutableTransaction::Invoke(ExecutableInvokeTransaction { tx, .. }) = tx {
342337
// check if the transaction nonce is 1, meaning it is post deploy_account, and the
@@ -348,8 +343,7 @@ fn skip_stateful_validations(
348343
// to check if the account exists in the mempool since it means that either it has a
349344
// deploy_account transaction or transactions with future nonces that passed
350345
// validations.
351-
return runtime
352-
.block_on(mempool_client.account_tx_in_pool_or_recent_block(tx.sender_address()))
346+
return is_account_tx_in_mempool
353347
.map_err(|err| mempool_client_err_to_deprecated_gw_err(&tx.signature(), err))
354348
.inspect(|exists| {
355349
if *exists {

0 commit comments

Comments
 (0)