Skip to content

Commit 8349379

Browse files
apollo_gateway: extract async mempool query from blocking task
1 parent 0757cb7 commit 8349379

File tree

4 files changed

+62
-76
lines changed

4 files changed

+62
-76
lines changed

crates/apollo_gateway/src/gateway.rs

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@ use apollo_gateway_types::gateway_types::{
1919
InvokeGatewayOutput,
2020
};
2121
use apollo_infra::component_definitions::ComponentStarter;
22-
use apollo_mempool_types::communication::{AddTransactionArgsWrapper, SharedMempoolClient};
22+
use apollo_mempool_types::communication::{
23+
AddTransactionArgsWrapper,
24+
MempoolClientError,
25+
SharedMempoolClient,
26+
};
2327
use apollo_mempool_types::mempool_types::AddTransactionArgs;
2428
use apollo_network_types::network_types::BroadcastedMessageMetadata;
2529
use apollo_proc_macros::sequencer_latency_histogram;
@@ -150,8 +154,15 @@ impl Gateway {
150154
transaction_converter_err_to_deprecated_gw_err(&tx_signature, e)
151155
})?;
152156

157+
// Prefetch mempool result in async context to avoid blocking-on-async in a blocking
158+
// thread.
159+
let is_account_tx_in_mempool = self
160+
.mempool_client
161+
.account_tx_in_pool_or_recent_block(executable_tx.contract_address())
162+
.await;
163+
153164
let blocking_task =
154-
ProcessTxBlockingTask::new(self, executable_tx, tokio::runtime::Handle::current());
165+
ProcessTxBlockingTask::new(self, executable_tx, is_account_tx_in_mempool);
155166
// Run the blocking task in the current span.
156167
let curr_span = Span::current();
157168
let handle =
@@ -235,23 +246,21 @@ impl Gateway {
235246
struct ProcessTxBlockingTask {
236247
stateful_tx_validator_factory: Arc<dyn StatefulTransactionValidatorFactoryTrait>,
237248
state_reader_factory: Arc<dyn StateReaderFactory>,
238-
mempool_client: SharedMempoolClient,
239249
executable_tx: AccountTransaction,
240-
runtime: tokio::runtime::Handle,
250+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
241251
}
242252

243253
impl ProcessTxBlockingTask {
244254
pub fn new(
245255
gateway: &Gateway,
246256
executable_tx: AccountTransaction,
247-
runtime: tokio::runtime::Handle,
257+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
248258
) -> Self {
249259
Self {
250260
stateful_tx_validator_factory: gateway.stateful_tx_validator_factory.clone(),
251261
state_reader_factory: gateway.state_reader_factory.clone(),
252-
mempool_client: gateway.mempool_client.clone(),
253262
executable_tx,
254-
runtime,
263+
is_account_tx_in_mempool,
255264
}
256265
}
257266

@@ -262,8 +271,7 @@ impl ProcessTxBlockingTask {
262271

263272
let nonce = stateful_transaction_validator.extract_state_nonce_and_run_validations(
264273
&self.executable_tx,
265-
self.mempool_client,
266-
self.runtime,
274+
self.is_account_tx_in_mempool,
267275
)?;
268276

269277
Ok(nonce)

crates/apollo_gateway/src/gateway_test.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,15 @@ fn mock_stateless_transaction_validator() -> MockStatelessTransactionValidatorTr
117117
mock_stateless_transaction_validator
118118
}
119119

120+
#[fixture]
121+
fn default_mock_mempool_client() -> MockMempoolClient {
122+
let mut mock = MockMempoolClient::new();
123+
// TODO(itamars): Add test which mempool is checked.
124+
mock.expect_account_tx_in_pool_or_recent_block()
125+
.returning(|_| Err(MempoolClientError::MempoolError(MempoolError::MempoolFull)));
126+
mock
127+
}
128+
120129
#[fixture]
121130
fn mock_dependencies() -> MockDependencies {
122131
let config = GatewayConfig {
@@ -128,7 +137,7 @@ fn mock_dependencies() -> MockDependencies {
128137
};
129138
let state_reader_factory =
130139
local_test_state_reader_factory(CairoVersion::Cairo1(RunnableCairo1::Casm), true);
131-
let mock_mempool_client = MockMempoolClient::new();
140+
let mock_mempool_client = default_mock_mempool_client();
132141
let mock_transaction_converter = MockTransactionConverterTrait::new();
133142
let mock_stateless_transaction_validator = mock_stateless_transaction_validator();
134143
MockDependencies {
@@ -315,13 +324,13 @@ async fn run_add_tx_and_extract_metrics(
315324

316325
fn process_tx_task(
317326
stateful_transaction_validator_factory: MockStatefulTransactionValidatorFactoryTrait,
327+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
318328
) -> ProcessTxBlockingTask {
319329
ProcessTxBlockingTask {
320330
stateful_tx_validator_factory: Arc::new(stateful_transaction_validator_factory),
321331
state_reader_factory: Arc::new(MockStateReaderFactory::new()),
322-
mempool_client: Arc::new(MockMempoolClient::new()),
323332
executable_tx: executable_invoke_tx(invoke_args()),
324-
runtime: tokio::runtime::Handle::current(),
333+
is_account_tx_in_mempool,
325334
}
326335
}
327336

@@ -559,13 +568,13 @@ async fn process_tx_returns_error_when_extract_state_nonce_and_run_validations_f
559568

560569
mock_stateful_transaction_validator
561570
.expect_extract_state_nonce_and_run_validations()
562-
.return_once(|_, _, _| Err(expected_error));
571+
.return_once(|_, _| Err(expected_error));
563572

564573
mock_stateful_transaction_validator_factory
565574
.expect_instantiate_validator()
566575
.return_once(|_| Ok(Box::new(mock_stateful_transaction_validator)));
567576

568-
let process_tx_task = process_tx_task(mock_stateful_transaction_validator_factory);
577+
let process_tx_task = process_tx_task(mock_stateful_transaction_validator_factory, Ok(false));
569578

570579
let result = tokio::task::spawn_blocking(move || process_tx_task.process_tx()).await.unwrap();
571580

@@ -608,7 +617,7 @@ async fn process_tx_returns_error_when_instantiating_validator_fails(
608617
.expect_instantiate_validator()
609618
.return_once(|_| Err(expected_error));
610619

611-
let process_tx_task = process_tx_task(mock_stateful_transaction_validator_factory);
620+
let process_tx_task = process_tx_task(mock_stateful_transaction_validator_factory, Ok(false));
612621

613622
let result = tokio::task::spawn_blocking(move || process_tx_task.process_tx()).await.unwrap();
614623

crates/apollo_gateway/src/stateful_transaction_validator.rs

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use apollo_gateway_types::deprecated_gateway_error::{
55
StarknetErrorCode,
66
};
77
use apollo_gateway_types::errors::GatewaySpecError;
8-
use apollo_mempool_types::communication::SharedMempoolClient;
8+
use apollo_mempool_types::communication::MempoolClientError;
99
use apollo_proc_macros::sequencer_latency_histogram;
1010
use blockifier::blockifier::stateful_validator::{
1111
StatefulValidator,
@@ -103,8 +103,7 @@ pub trait StatefulTransactionValidatorTrait {
103103
fn extract_state_nonce_and_run_validations(
104104
&mut self,
105105
executable_tx: &ExecutableTransaction,
106-
mempool_client: SharedMempoolClient,
107-
runtime: tokio::runtime::Handle,
106+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
108107
) -> StatefulTransactionValidatorResult<Nonce>;
109108
}
110109

@@ -119,8 +118,7 @@ impl<B: BlockifierStatefulValidatorTrait> StatefulTransactionValidatorTrait
119118
fn extract_state_nonce_and_run_validations(
120119
&mut self,
121120
executable_tx: &ExecutableTransaction,
122-
mempool_client: SharedMempoolClient,
123-
runtime: tokio::runtime::Handle,
121+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
124122
) -> StatefulTransactionValidatorResult<Nonce> {
125123
let address = executable_tx.contract_address();
126124
let account_nonce =
@@ -132,7 +130,7 @@ impl<B: BlockifierStatefulValidatorTrait> StatefulTransactionValidatorTrait
132130
e,
133131
)
134132
})?;
135-
self.run_transaction_validations(executable_tx, account_nonce, mempool_client, runtime)?;
133+
self.run_transaction_validations(executable_tx, account_nonce, is_account_tx_in_mempool)?;
136134
Ok(account_nonce)
137135
}
138136
}
@@ -142,11 +140,10 @@ impl<B: BlockifierStatefulValidatorTrait> StatefulTransactionValidator<B> {
142140
&mut self,
143141
executable_tx: &ExecutableTransaction,
144142
account_nonce: Nonce,
145-
mempool_client: SharedMempoolClient,
146-
runtime: tokio::runtime::Handle,
143+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
147144
) -> StatefulTransactionValidatorResult<()> {
148145
self.validate_state_preconditions(executable_tx, account_nonce)?;
149-
self.run_validate_entry_point(executable_tx, account_nonce, mempool_client, runtime)?;
146+
self.run_validate_entry_point(executable_tx, account_nonce, is_account_tx_in_mempool)?;
150147
Ok(())
151148
}
152149

@@ -209,11 +206,10 @@ impl<B: BlockifierStatefulValidatorTrait> StatefulTransactionValidator<B> {
209206
&mut self,
210207
executable_tx: &ExecutableTransaction,
211208
account_nonce: Nonce,
212-
mempool_client: SharedMempoolClient,
213-
runtime: tokio::runtime::Handle,
209+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
214210
) -> StatefulTransactionValidatorResult<()> {
215211
let skip_validate =
216-
skip_stateful_validations(executable_tx, account_nonce, mempool_client, runtime)?;
212+
skip_stateful_validations(executable_tx, account_nonce, is_account_tx_in_mempool)?;
217213
let only_query = false;
218214
let charge_fee = enforce_fee(executable_tx, only_query);
219215
let strict_nonce_check = false;
@@ -284,8 +280,7 @@ impl<B: BlockifierStatefulValidatorTrait> StatefulTransactionValidator<B> {
284280
fn skip_stateful_validations(
285281
tx: &ExecutableTransaction,
286282
account_nonce: Nonce,
287-
mempool_client: SharedMempoolClient,
288-
runtime: tokio::runtime::Handle,
283+
is_account_tx_in_mempool: Result<bool, MempoolClientError>,
289284
) -> StatefulTransactionValidatorResult<bool> {
290285
if let ExecutableTransaction::Invoke(ExecutableInvokeTransaction { tx, .. }) = tx {
291286
// check if the transaction nonce is 1, meaning it is post deploy_account, and the
@@ -297,8 +292,7 @@ fn skip_stateful_validations(
297292
// to check if the account exists in the mempool since it means that either it has a
298293
// deploy_account transaction or transactions with future nonces that passed
299294
// validations.
300-
return runtime
301-
.block_on(mempool_client.account_tx_in_pool_or_recent_block(tx.sender_address()))
295+
return is_account_tx_in_mempool
302296
.map_err(|err| mempool_client_err_to_deprecated_gw_err(&tx.signature(), err))
303297
.inspect(|exists| {
304298
if *exists {

crates/apollo_gateway/src/stateful_transaction_validator_test.rs

Lines changed: 20 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
1-
use std::sync::Arc;
2-
31
use apollo_gateway_config::config::StatefulTransactionValidatorConfig;
42
use apollo_gateway_types::deprecated_gateway_error::{
53
KnownStarknetErrorCode,
64
StarknetError,
75
StarknetErrorCode,
86
};
9-
use apollo_mempool_types::communication::MockMempoolClient;
107
use blockifier::blockifier::stateful_validator::{
118
MockStatefulValidatorTrait as MockBlockifierStatefulValidatorTrait,
129
StatefulValidatorError as BlockifierStatefulValidatorError,
@@ -59,26 +56,17 @@ async fn test_get_nonce_fail_on_extract_state_nonce_and_run_validations() {
5956
)))
6057
});
6158

62-
let mut mock_mempool_client = MockMempoolClient::new();
63-
mock_mempool_client.expect_account_tx_in_pool_or_recent_block().returning(|_| {
64-
// The mempool does not have any transactions from the sender.
65-
Ok(false)
66-
});
67-
let mempool_client = Arc::new(mock_mempool_client);
68-
let runtime = tokio::runtime::Handle::current();
69-
7059
let mut stateful_validator = StatefulTransactionValidator {
7160
config: StatefulTransactionValidatorConfig::default(),
7261
blockifier_stateful_tx_validator: mock_blockifier_validator,
7362
};
7463

64+
// The test fails before the mempool is checked, so we fix it to Ok(false).
65+
let is_account_tx_in_mempool = Ok(false);
7566
let executable_tx = create_executable_invoke_tx(CairoVersion::Cairo1(RunnableCairo1::Casm));
7667
let result = tokio::task::spawn_blocking(move || {
77-
stateful_validator.extract_state_nonce_and_run_validations(
78-
&executable_tx,
79-
mempool_client,
80-
runtime,
81-
)
68+
stateful_validator
69+
.extract_state_nonce_and_run_validations(&executable_tx, is_account_tx_in_mempool)
8270
})
8371
.await
8472
.unwrap();
@@ -131,25 +119,16 @@ async fn test_extract_state_nonce_and_run_validations(
131119
mock_blockifier_validator.expect_validate().return_once(|_| expected_result.map(|_| ()));
132120
mock_blockifier_validator.expect_block_info().return_const(BlockInfo::default());
133121

134-
let mut mock_mempool_client = MockMempoolClient::new();
135-
mock_mempool_client.expect_account_tx_in_pool_or_recent_block().returning(|_| {
136-
// The mempool does not have any transactions from the sender.
137-
Ok(false)
138-
});
139-
let mempool_client = Arc::new(mock_mempool_client);
140-
let runtime = tokio::runtime::Handle::current();
141-
142122
let mut stateful_validator = StatefulTransactionValidator {
143123
config: StatefulTransactionValidatorConfig::default(),
144124
blockifier_stateful_tx_validator: mock_blockifier_validator,
145125
};
146126

127+
// Mempool is checked only when tx nonce is 1; Not the case here, so set a fixed value.
128+
let is_account_tx_in_mempool = Ok(false);
147129
let result = tokio::task::spawn_blocking(move || {
148-
stateful_validator.extract_state_nonce_and_run_validations(
149-
&executable_tx,
150-
mempool_client,
151-
runtime,
152-
)
130+
stateful_validator
131+
.extract_state_nonce_and_run_validations(&executable_tx, is_account_tx_in_mempool)
153132
})
154133
.await
155134
.unwrap();
@@ -232,25 +211,17 @@ async fn test_skip_validate(
232211
.returning(|_| Ok(()));
233212
mock_blockifier_validator.expect_block_info().return_const(BlockInfo::default());
234213

235-
let mut mock_mempool_client = MockMempoolClient::new();
236-
mock_mempool_client
237-
.expect_account_tx_in_pool_or_recent_block()
238-
.returning(move |_| Ok(contains_tx));
239-
let mempool_client = Arc::new(mock_mempool_client);
240-
241-
let runtime = tokio::runtime::Handle::current();
242-
243214
let mut stateful_validator = StatefulTransactionValidator {
244215
config: StatefulTransactionValidatorConfig::default(),
245216
blockifier_stateful_tx_validator: mock_blockifier_validator,
246217
};
247218

219+
let is_account_tx_in_mempool = Ok(contains_tx);
248220
tokio::task::spawn_blocking(move || {
249221
let _ = stateful_validator.run_transaction_validations(
250222
&executable_tx,
251223
sender_nonce,
252-
mempool_client,
253-
runtime,
224+
is_account_tx_in_mempool,
254225
);
255226
})
256227
.await
@@ -352,12 +323,13 @@ async fn validate_resource_bounds(
352323
blockifier_stateful_tx_validator: mock_blockifier_validator,
353324
};
354325

326+
// Presence is irrelevant for resource-bound validation; fix to Ok(false) for determinism.
327+
let is_account_tx_in_mempool = Ok(false);
355328
let result = tokio::task::spawn_blocking(move || {
356329
stateful_validator.run_transaction_validations(
357330
&executable_tx,
358331
account_nonce,
359-
Arc::new(MockMempoolClient::new()),
360-
tokio::runtime::Handle::current(),
332+
is_account_tx_in_mempool,
361333
)
362334
})
363335
.await
@@ -401,12 +373,14 @@ async fn test_is_valid_nonce(
401373
nonce: tx_nonce,
402374
resource_bounds: ValidResourceBounds::create_for_testing(),
403375
));
376+
377+
// Mempool is checked only when account nonce is 0; Not the case here, so set a fixed value.
378+
let is_account_tx_in_mempool = Ok(false);
404379
let result = tokio::task::spawn_blocking(move || {
405380
stateful_validator.run_transaction_validations(
406381
&executable_tx,
407382
account_nonce,
408-
Arc::new(MockMempoolClient::new()),
409-
tokio::runtime::Handle::current(),
383+
is_account_tx_in_mempool,
410384
)
411385
})
412386
.await
@@ -449,12 +423,13 @@ async fn test_reject_future_declares(
449423
blockifier_stateful_tx_validator: mock_blockifier_validator,
450424
};
451425

426+
// Mempool is checked only when account nonce is 0; Not the case here, so set a fixed value.
427+
let is_account_tx_in_mempool = Ok(false);
452428
let result = tokio::task::spawn_blocking(move || {
453429
stateful_validator.run_transaction_validations(
454430
&executable_tx,
455431
nonce!(account_nonce),
456-
Arc::new(MockMempoolClient::new()),
457-
tokio::runtime::Handle::current(),
432+
is_account_tx_in_mempool,
458433
)
459434
})
460435
.await

0 commit comments

Comments
 (0)