Skip to content

perf(core): make mempool use a dashmap #2536

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Perf

### 2025-04-25

- Make mempool use a dashmap instead of a Hashmap behind a RwLock [2536](https://github.com/lambdaclass/ethrex/pull/2536)

### 2025-04-22

- Avoid calculating state transitions after every block in bulk mode [2519](https://github.com/lambdaclass/ethrex/pull/2519)
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,4 @@ secp256k1 = { version = "0.29.1", default-features = false, features = [
keccak-hash = "0.11.0"
axum = "0.8.1"
libsecp256k1 = "0.7.2"
dashmap = { version = "6.1.0", features = ["inline"]}
1 change: 1 addition & 0 deletions crates/blockchain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ thiserror.workspace = true
sha3.workspace = true
tracing.workspace = true
bytes.workspace = true
dashmap.workspace = true
cfg-if = "1.0.0"

k256 = { version = "0.13.3", features = ["ecdh"] }
Expand Down
8 changes: 4 additions & 4 deletions crates/blockchain/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,8 @@ impl Blockchain {
// Add transaction and blobs bundle to storage
let hash = transaction.compute_hash();
self.mempool
.add_transaction(hash, MempoolTransaction::new(transaction, sender))?;
self.mempool.add_blobs_bundle(hash, blobs_bundle)?;
.add_transaction(hash, MempoolTransaction::new(transaction, sender));
self.mempool.add_blobs_bundle(hash, blobs_bundle);
Ok(hash)
}

Expand All @@ -353,13 +353,13 @@ impl Blockchain {

// Add transaction to storage
self.mempool
.add_transaction(hash, MempoolTransaction::new(transaction, sender))?;
.add_transaction(hash, MempoolTransaction::new(transaction, sender));

Ok(hash)
}

/// Remove a transaction from the mempool
pub fn remove_transaction_from_pool(&self, hash: &H256) -> Result<(), StoreError> {
pub fn remove_transaction_from_pool(&self, hash: &H256) {
self.mempool.remove_transaction(hash)
}

Expand Down
121 changes: 35 additions & 86 deletions crates/blockchain/mempool.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use std::{
collections::{HashMap, HashSet},
sync::{Mutex, RwLock},
};
use std::collections::{HashMap, HashSet};

use crate::{
constants::{
Expand All @@ -11,86 +8,52 @@ use crate::{
},
error::MempoolError,
};
use dashmap::DashMap;
use ethrex_common::{
types::{BlobsBundle, BlockHeader, ChainConfig, MempoolTransaction, Transaction, TxType},
Address, H256, U256,
};
use ethrex_storage::error::StoreError;

#[derive(Debug, Default)]
pub struct Mempool {
transaction_pool: RwLock<HashMap<H256, MempoolTransaction>>,
blobs_bundle_pool: Mutex<HashMap<H256, BlobsBundle>>,
transaction_pool: DashMap<H256, MempoolTransaction>,
blobs_bundle_pool: DashMap<H256, BlobsBundle>,
}
impl Mempool {
pub fn new() -> Self {
Self::default()
}

/// Add transaction to the pool without doing validity checks
pub fn add_transaction(
&self,
hash: H256,
transaction: MempoolTransaction,
) -> Result<(), StoreError> {
let mut tx_pool = self
.transaction_pool
.write()
.map_err(|error| StoreError::MempoolWriteLock(error.to_string()))?;
tx_pool.insert(hash, transaction);

Ok(())
pub fn add_transaction(&self, hash: H256, transaction: MempoolTransaction) {
self.transaction_pool.insert(hash, transaction);
}

/// Add a blobs bundle to the pool by its blob transaction hash
pub fn add_blobs_bundle(
&self,
tx_hash: H256,
blobs_bundle: BlobsBundle,
) -> Result<(), StoreError> {
self.blobs_bundle_pool
.lock()
.map_err(|error| StoreError::Custom(error.to_string()))?
.insert(tx_hash, blobs_bundle);
Ok(())
pub fn add_blobs_bundle(&self, tx_hash: H256, blobs_bundle: BlobsBundle) {
self.blobs_bundle_pool.insert(tx_hash, blobs_bundle);
}

/// Get a blobs bundle to the pool given its blob transaction hash
pub fn get_blobs_bundle(&self, tx_hash: H256) -> Result<Option<BlobsBundle>, StoreError> {
Ok(self
.blobs_bundle_pool
.lock()
.map_err(|error| StoreError::Custom(error.to_string()))?
.get(&tx_hash)
.cloned())
pub fn get_blobs_bundle(&self, tx_hash: H256) -> Option<BlobsBundle> {
self.blobs_bundle_pool.get(&tx_hash).as_deref().cloned()
}

/// Remove a transaction from the pool
pub fn remove_transaction(&self, hash: &H256) -> Result<(), StoreError> {
let mut tx_pool = self
.transaction_pool
.write()
.map_err(|error| StoreError::MempoolWriteLock(error.to_string()))?;
if let Some(tx) = tx_pool.get(hash) {
pub fn remove_transaction(&self, hash: &H256) {
if let Some((_, tx)) = self.transaction_pool.remove(hash) {
if matches!(tx.tx_type(), TxType::EIP4844) {
self.blobs_bundle_pool
.lock()
.map_err(|error| StoreError::Custom(error.to_string()))?
.remove(&tx.compute_hash());
self.blobs_bundle_pool.remove(&tx.compute_hash());
}

tx_pool.remove(hash);
};

Ok(())
}

/// Applies the filter and returns a set of suitable transactions from the mempool.
/// These transactions will be grouped by sender and sorted by nonce
pub fn filter_transactions(
&self,
filter: &PendingTxFilter,
) -> Result<HashMap<Address, Vec<MempoolTransaction>>, StoreError> {
) -> HashMap<Address, Vec<MempoolTransaction>> {
let filter_tx = |tx: &Transaction| -> bool {
// Filter by tx type
let is_blob_tx = matches!(tx, Transaction::EIP4844Transaction(_));
Expand Down Expand Up @@ -129,14 +92,11 @@ impl Mempool {
pub fn filter_transactions_with_filter_fn(
&self,
filter: &dyn Fn(&Transaction) -> bool,
) -> Result<HashMap<Address, Vec<MempoolTransaction>>, StoreError> {
) -> HashMap<Address, Vec<MempoolTransaction>> {
let mut txs_by_sender: HashMap<Address, Vec<MempoolTransaction>> = HashMap::new();
let tx_pool = self
.transaction_pool
.read()
.map_err(|error| StoreError::MempoolReadLock(error.to_string()))?;

for (_, tx) in tx_pool.iter() {
for item in self.transaction_pool.iter() {
let tx = item.value();
if filter(tx) {
txs_by_sender
.entry(tx.sender())
Expand All @@ -146,42 +106,33 @@ impl Mempool {
}

txs_by_sender.iter_mut().for_each(|(_, txs)| txs.sort());
Ok(txs_by_sender)
txs_by_sender
}

/// Gets hashes from possible_hashes that are not already known in the mempool.
pub fn filter_unknown_transactions(
&self,
possible_hashes: &[H256],
) -> Result<Vec<H256>, StoreError> {
let tx_pool = self
pub fn filter_unknown_transactions(&self, possible_hashes: &[H256]) -> Vec<H256> {
let tx_set: HashSet<_> = self
.transaction_pool
.read()
.map_err(|error| StoreError::MempoolReadLock(error.to_string()))?;

let tx_set: HashSet<_> = tx_pool.iter().map(|(hash, _)| hash).collect();
Ok(possible_hashes
.iter()
.map(|item| *item.key())
.collect();
possible_hashes
.iter()
.filter(|hash| !tx_set.contains(hash))
.copied()
.collect())
.collect()
}

pub fn get_transaction_by_hash(
&self,
transaction_hash: H256,
) -> Result<Option<Transaction>, StoreError> {
pub fn get_transaction_by_hash(&self, transaction_hash: H256) -> Option<Transaction> {
let tx = self
.transaction_pool
.read()
.map_err(|error| StoreError::MempoolReadLock(error.to_string()))?
.get(&transaction_hash)
.map(|e| e.clone().into());
.map(|e| e.value().clone().into());

Ok(tx)
tx
}

pub fn get_nonce(&self, address: &Address) -> Result<Option<u64>, MempoolError> {
pub fn get_nonce(&self, address: &Address) -> Option<u64> {
let pending_filter = PendingTxFilter {
min_tip: None,
base_fee: None,
Expand All @@ -190,13 +141,13 @@ impl Mempool {
only_blob_txs: false,
};

let pending_txs = self.filter_transactions(&pending_filter)?;
let pending_txs = self.filter_transactions(&pending_filter);
let nonce = match pending_txs.get(address) {
Some(txs) => txs.last().map(|tx| tx.nonce() + 1),
None => None,
};

Ok(nonce)
nonce
}
}

Expand Down Expand Up @@ -647,11 +598,9 @@ mod tests {
let mempool = Mempool::new();
let filter =
|tx: &Transaction| -> bool { matches!(tx, Transaction::EIP4844Transaction(_)) };
mempool
.add_transaction(blob_tx_hash, blob_tx.clone())
.unwrap();
mempool.add_transaction(plain_tx_hash, plain_tx).unwrap();
let txs = mempool.filter_transactions_with_filter_fn(&filter).unwrap();
mempool.add_transaction(blob_tx_hash, blob_tx.clone());
mempool.add_transaction(plain_tx_hash, plain_tx);
let txs = mempool.filter_transactions_with_filter_fn(&filter);
assert_eq!(txs, HashMap::from([(blob_tx.sender(), vec![blob_tx])]));
}

Expand All @@ -669,7 +618,7 @@ mod tests {
commitments: commitments.to_vec(),
proofs: proofs.to_vec(),
};
mempool.add_blobs_bundle(H256::random(), bundle).unwrap();
mempool.add_blobs_bundle(H256::random(), bundle);
}
}
}
15 changes: 10 additions & 5 deletions crates/blockchain/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,12 +331,12 @@ impl Blockchain {
Ok((
// Plain txs
TransactionQueue::new(
self.mempool.filter_transactions(&plain_tx_filter)?,
self.mempool.filter_transactions(&plain_tx_filter),
context.base_fee_per_gas(),
)?,
// Blob txs
TransactionQueue::new(
self.mempool.filter_transactions(&blob_tx_filter)?,
self.mempool.filter_transactions(&blob_tx_filter),
context.base_fee_per_gas(),
)?,
))
Expand All @@ -354,6 +354,11 @@ impl Blockchain {
debug!("Fetching transactions from mempool");
// Fetch mempool transactions
let (mut plain_txs, mut blob_txs) = self.fetch_mempool_transactions(context)?;
debug!(
"Fetched {} plain txs and {} blob txs",
plain_txs.txs.len(),
blob_txs.txs.len()
);
// Execute and add transactions to payload (if suitable)
loop {
// Check if we have enough gas to run more transactions
Expand Down Expand Up @@ -401,7 +406,7 @@ impl Blockchain {
// Pull transaction from the mempool
debug!("Ignoring replay-protected transaction: {}", tx_hash);
txs.pop();
self.remove_transaction_from_pool(&head_tx.tx.compute_hash())?;
self.remove_transaction_from_pool(&head_tx.tx.compute_hash());
continue;
}

Expand All @@ -415,7 +420,7 @@ impl Blockchain {
Ok(receipt) => {
txs.shift()?;
// Pull transaction from the mempool
self.remove_transaction_from_pool(&head_tx.tx.compute_hash())?;
self.remove_transaction_from_pool(&head_tx.tx.compute_hash());

metrics!(METRICS_TX.inc_tx_with_status_and_type(
MetricsTxStatus::Succeeded,
Expand Down Expand Up @@ -469,7 +474,7 @@ impl Blockchain {
.get_fork_blob_schedule(context.payload.header.timestamp)
.map(|schedule| schedule.max)
.unwrap_or_default() as usize;
let Some(blobs_bundle) = self.mempool.get_blobs_bundle(tx_hash)? else {
let Some(blobs_bundle) = self.mempool.get_blobs_bundle(tx_hash) else {
// No blob tx should enter the mempool without its blobs bundle so this is an internal error
return Err(
StoreError::Custom(format!("No blobs bundle found for blob tx {tx_hash}")).into(),
Expand Down
6 changes: 3 additions & 3 deletions crates/l2/sequencer/block_producer/payload_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub async fn build_payload(
Ok(context.into())
}

/// Same as `blockchain::fill_transactions` but enforces that the `StateDiff` size
/// Same as `blockchain::fill_transactions` but enforces that the `StateDiff` size
/// stays within the blob size limit after processing each transaction.
pub async fn fill_transactions(
blockchain: Arc<Blockchain>,
Expand Down Expand Up @@ -151,7 +151,7 @@ pub async fn fill_transactions(
// Pull transaction from the mempool
debug!("Ignoring replay-protected transaction: {}", tx_hash);
txs.pop();
blockchain.remove_transaction_from_pool(&head_tx.tx.compute_hash())?;
blockchain.remove_transaction_from_pool(&head_tx.tx.compute_hash());
continue;
}

Expand Down Expand Up @@ -188,7 +188,7 @@ pub async fn fill_transactions(
}
txs.shift()?;
// Pull transaction from the mempool
blockchain.remove_transaction_from_pool(&head_tx.tx.compute_hash())?;
blockchain.remove_transaction_from_pool(&head_tx.tx.compute_hash());

metrics!(METRICS_TX.inc_tx_with_status_and_type(
MetricsTxStatus::Succeeded,
Expand Down
4 changes: 2 additions & 2 deletions crates/networking/p2p/rlpx/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {
let txs: Vec<MempoolTransaction> = self
.blockchain
.mempool
.filter_transactions_with_filter_fn(&filter)?
.filter_transactions_with_filter_fn(&filter)
.into_values()
.flatten()
.collect();
Expand Down Expand Up @@ -493,7 +493,7 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {
{
//TODO(#1415): evaluate keeping track of requests to avoid sending the same twice.
let hashes =
new_pooled_transaction_hashes.get_transactions_to_request(&self.blockchain)?;
new_pooled_transaction_hashes.get_transactions_to_request(&self.blockchain);

//TODO(#1416): Evaluate keeping track of the request-id.
let request = GetPooledTransactions::new(random(), hashes);
Expand Down
Loading