Skip to content

feat: support directly forward transactions to sequencer #265

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: scroll
Choose a base branch
from
Open
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 12 additions & 14 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1207,18 +1207,18 @@ where
debug!(target: "engine::tree", "received backfill sync finished event");
self.backfill_sync_state = BackfillSyncState::Idle;

// backfill height is the block number that the backfill finished at
let mut backfill_height = ctrl.block_number();

// Pipeline unwound, memorize the invalid block and wait for CL for next sync target.
if let ControlFlow::Unwind { bad_block, target } = &ctrl {
let backfill_height = if let ControlFlow::Unwind { bad_block, target } = &ctrl {
warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
// update the `invalid_headers` cache with the new invalid header
self.state.invalid_headers.insert(**bad_block);

// if this was an unwind then the target is the new height
backfill_height = Some(*target);
}
Some(*target)
} else {
// backfill height is the block number that the backfill finished at
ctrl.block_number()
};

// backfill height is the block number that the backfill finished at
let Some(backfill_height) = backfill_height else { return Ok(()) };
Expand Down Expand Up @@ -1780,20 +1780,18 @@ where
) -> Option<B256> {
let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();

// check if the distance exceeds the threshold for backfill sync
let mut exceeds_backfill_threshold =
self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number);

// check if the downloaded block is the tracked finalized block
if let Some(buffered_finalized) = sync_target_state
let mut exceeds_backfill_threshold = if let Some(buffered_finalized) = sync_target_state
.as_ref()
.and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
{
// if we have buffered the finalized block, we should check how far
// we're off
exceeds_backfill_threshold =
self.exceeds_backfill_run_threshold(canonical_tip_num, buffered_finalized.number());
}
self.exceeds_backfill_run_threshold(canonical_tip_num, buffered_finalized.number())
} else {
// check if the distance exceeds the threshold for backfill sync
self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number)
};

// If this is invoked after we downloaded a block we can check if this block is the
// finalized block
Expand Down
8 changes: 4 additions & 4 deletions crates/net/network/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,20 +842,20 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
"Session disconnected"
);

let mut reason = None;
if let Some(ref err) = error {
let reason = if let Some(ref err) = error {
// If the connection was closed due to an error, we report
// the peer
self.swarm.state_mut().peers_mut().on_active_session_dropped(
&remote_addr,
&peer_id,
err,
);
reason = err.as_disconnected();
err.as_disconnected()
} else {
// Gracefully disconnected
self.swarm.state_mut().peers_mut().on_active_session_gracefully_closed(peer_id);
}
None
};
self.metrics.closed_sessions.increment(1);
self.update_active_connection_metrics();

Expand Down
9 changes: 5 additions & 4 deletions crates/rpc/rpc-builder/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,17 @@ impl AuthServerConfig {
.map_err(|err| RpcError::server_error(err, ServerKind::Auth(socket_addr)))?;

let handle = server.start(module.inner.clone());
let mut ipc_handle: Option<jsonrpsee::server::ServerHandle> = None;

if let Some(ipc_server_config) = ipc_server_config {
let ipc_handle = if let Some(ipc_server_config) = ipc_server_config {
let ipc_endpoint_str = ipc_endpoint
.clone()
.unwrap_or_else(|| constants::DEFAULT_ENGINE_API_IPC_ENDPOINT.to_string());
let ipc_server = ipc_server_config.build(ipc_endpoint_str);
let res = ipc_server.start(module.inner).await?;
ipc_handle = Some(res);
}
Some(res)
} else {
None
};

Ok(AuthServerHandle { handle: Some(handle), local_addr, secret, ipc_endpoint, ipc_handle })
}
Expand Down
8 changes: 3 additions & 5 deletions crates/rpc/rpc-eth-api/src/helpers/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,17 +277,15 @@ pub trait LoadState:
{
self.spawn_blocking_io(move |this| {
// first fetch the on chain nonce of the account
let on_chain_account_nonce = this
let mut next_nonce = this
.latest_state()?
.account_nonce(&address)
.map_err(Self::Error::from_eth_err)?
.unwrap_or_default();

let mut next_nonce = on_chain_account_nonce;
// Retrieve the highest consecutive transaction for the sender from the transaction pool
if let Some(highest_tx) = this
.pool()
.get_highest_consecutive_transaction_by_sender(address, on_chain_account_nonce)
if let Some(highest_tx) =
this.pool().get_highest_consecutive_transaction_by_sender(address, next_nonce)
{
// Return the nonce of the highest consecutive transaction + 1
next_nonce = highest_tx.nonce().checked_add(1).ok_or_else(|| {
Expand Down
10 changes: 10 additions & 0 deletions crates/scroll/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ alloy-primitives.workspace = true
alloy-rpc-types-eth.workspace = true
alloy-consensus.workspace = true
revm.workspace = true
alloy-transport.workspace = true
alloy-json-rpc.workspace = true
alloy-rpc-client.workspace = true
alloy-transport-http.workspace = true

# reqwest
reqwest = { workspace = true, default-features = false, features = ["rustls-tls-native-roots"] }

# tracing
tracing.workspace = true

# async
parking_lot.workspace = true
Expand Down
36 changes: 36 additions & 0 deletions crates/scroll/rpc/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
//! RPC errors specific to Scroll.

use alloy_json_rpc::ErrorPayload;
use alloy_rpc_types_eth::BlockError;
use alloy_transport::{RpcError, TransportErrorKind};
use jsonrpsee_types::error::INTERNAL_ERROR_CODE;
use reth_evm::execute::ProviderError;
use reth_rpc_convert::transaction::EthTxEnvError;
use reth_rpc_eth_api::{AsEthApiError, TransactionConversionError};
Expand All @@ -13,12 +16,16 @@ pub enum ScrollEthApiError {
/// L1 ethereum error.
#[error(transparent)]
Eth(#[from] EthApiError),
/// Sequencer client error.
#[error(transparent)]
Sequencer(#[from] SequencerClientError),
}

impl AsEthApiError for ScrollEthApiError {
fn as_err(&self) -> Option<&EthApiError> {
match self {
Self::Eth(err) => Some(err),
_ => None,
}
}
}
Expand All @@ -27,6 +34,7 @@ impl From<ScrollEthApiError> for jsonrpsee_types::error::ErrorObject<'static> {
fn from(err: ScrollEthApiError) -> Self {
match err {
ScrollEthApiError::Eth(err) => err.into(),
ScrollEthApiError::Sequencer(err) => err.into(),
}
}
}
Expand Down Expand Up @@ -69,3 +77,31 @@ impl From<ProviderError> for ScrollEthApiError {
Self::Eth(EthApiError::from(value))
}
}

/// Error type when interacting with the Sequencer
#[derive(Debug, thiserror::Error)]
pub enum SequencerClientError {
/// Wrapper around an [`RpcError<TransportErrorKind>`].
#[error(transparent)]
HttpError(#[from] RpcError<TransportErrorKind>),
/// Thrown when serializing transaction to forward to sequencer
#[error("invalid sequencer transaction")]
InvalidSequencerTransaction,
}

impl From<SequencerClientError> for jsonrpsee_types::error::ErrorObject<'static> {
fn from(err: SequencerClientError) -> Self {
match err {
SequencerClientError::HttpError(RpcError::ErrorResp(ErrorPayload {
code,
message,
data,
})) => jsonrpsee_types::error::ErrorObject::owned(code as i32, message, data),
err => jsonrpsee_types::error::ErrorObject::owned(
INTERNAL_ERROR_CODE,
err.to_string(),
None::<String>,
),
}
}
}
47 changes: 42 additions & 5 deletions crates/scroll/rpc/src/eth/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Scroll-Reth `eth_` endpoint implementation.
use alloy_primitives::U256;
use eyre::WrapErr;
use reth_chainspec::{EthChainSpec, EthereumHardforks};
use reth_evm::ConfigureEvm;
use reth_network_api::NetworkInfo;
Expand Down Expand Up @@ -40,6 +41,8 @@ mod pending_block;
pub mod receipt;
pub mod transaction;

use crate::SequencerClient;

/// Adapter for [`EthApiInner`], which holds all the data required to serve core `eth_` API.
pub type EthApiNodeBackend<N> = EthApiInner<
<N as RpcNodeCore>::Provider,
Expand Down Expand Up @@ -73,8 +76,8 @@ pub struct ScrollEthApi<N: ScrollNodeCore, NetworkT = Scroll> {

impl<N: ScrollNodeCore, NetworkT> ScrollEthApi<N, NetworkT> {
/// Creates a new [`ScrollEthApi`].
pub fn new(eth_api: EthApiNodeBackend<N>) -> Self {
let inner = Arc::new(ScrollEthApiInner { eth_api });
pub fn new(eth_api: EthApiNodeBackend<N>, sequencer_client: Option<SequencerClient>) -> Self {
let inner = Arc::new(ScrollEthApiInner { eth_api, sequencer_client });
Self {
inner: inner.clone(),
_nt: PhantomData,
Expand All @@ -98,6 +101,11 @@ where
self.inner.eth_api()
}

/// Returns the configured sequencer client, if any.
pub fn sequencer_client(&self) -> Option<&SequencerClient> {
self.inner.sequencer_client()
}

/// Return a builder for the [`ScrollEthApi`].
pub const fn builder() -> ScrollEthApiBuilder {
ScrollEthApiBuilder::new()
Expand Down Expand Up @@ -307,23 +315,41 @@ impl<N: ScrollNodeCore, NetworkT> fmt::Debug for ScrollEthApi<N, NetworkT> {
pub struct ScrollEthApiInner<N: ScrollNodeCore> {
/// Gateway to node's core components.
pub eth_api: EthApiNodeBackend<N>,
/// Sequencer client, configured to forward submitted transactions to sequencer of given Scroll
/// network.
sequencer_client: Option<SequencerClient>,
}

impl<N: ScrollNodeCore> ScrollEthApiInner<N> {
/// Returns a reference to the [`EthApiNodeBackend`].
const fn eth_api(&self) -> &EthApiNodeBackend<N> {
&self.eth_api
}

/// Returns the configured sequencer client, if any.
const fn sequencer_client(&self) -> Option<&SequencerClient> {
self.sequencer_client.as_ref()
}
}

/// A type that knows how to build a [`ScrollEthApi`].
#[derive(Debug, Default)]
pub struct ScrollEthApiBuilder {}
pub struct ScrollEthApiBuilder {
/// Sequencer client, configured to forward submitted transactions to sequencer of given Scroll
/// network.
sequencer_url: Option<String>,
}

impl ScrollEthApiBuilder {
/// Creates a [`ScrollEthApiBuilder`] instance.
pub const fn new() -> Self {
Self {}
Self { sequencer_url: None }
}

/// With a [`SequencerClient`].
pub fn with_sequencer(mut self, sequencer_url: Option<String>) -> Self {
self.sequencer_url = sequencer_url;
self
}
}

Expand All @@ -335,6 +361,7 @@ where
type EthApi = ScrollEthApi<N>;

async fn build_eth_api(self, ctx: EthApiCtx<'_, N>) -> eyre::Result<Self::EthApi> {
let Self { sequencer_url } = self;
let eth_api = reth_rpc::EthApiBuilder::new(
ctx.components.provider().clone(),
ctx.components.pool().clone(),
Expand All @@ -350,6 +377,16 @@ where
.proof_permits(ctx.config.proof_permits)
.build_inner();

Ok(ScrollEthApi::new(eth_api))
let sequencer_client = if let Some(url) = sequencer_url {
Some(
SequencerClient::new(&url)
.await
.wrap_err_with(|| "Failed to init sequencer client with: {url}")?,
)
} else {
None
};

Ok(ScrollEthApi::new(eth_api, sequencer_client))
}
}
Loading
Loading