Skip to content

Commit c7af05c

Browse files
frisitanogreged93
andauthored
feat: L1 message delay (#64)
* test: move calldata to files * feat: batch header decoding * feat: improve codec interface * chore: manifests fixes * feat: revert some codec changes * feat: wip derivation pipeline * feat: batch header v7 * feat: add batch abstraction * feat: basic derivation pipeline * feat: implement batch data hash * feat: move PayloadData * feat: improve batch data hash computation * test: derivation * chore: cleaning * fix: lints * fix: lints * fix: skip wasm for derivation pipeline * fix: data hash computation for batch * fix: lint * fix: lint * fix: lints * fix: answer comments * fix: lints * feat: wip * feat: changes to the data model * fix: codec issue * feat: modify derivation pipeline to fetch blob * test: move test data to file * fix: lints * fix: answer comments * test: fix migration * fix: comments * feat: providers crate * feat: l1 providers crate * feat: l1 provider implementation * feat: l1 message provider * feat: pipeline modifications * fix: avoid iterating the cache * chore: simplify derivation pipeline interface * fix: lints * fix: rebasing * fix: answer comments * feat: add sequencer * sequencer initial implementation * sequencer implementation * sequencer implementation * refactor sequencer provider * sequencer implementation * lint * lint deps * address comments * feat: sequencer integration * remove redundant file * add std feature for scroll-reth-primitives * comments and clean up * feat: L1 message delay * address feedback --------- Co-authored-by: Gregory Edison <[email protected]>
1 parent 9faafb3 commit c7af05c

File tree

9 files changed

+203
-122
lines changed

9 files changed

+203
-122
lines changed

bin/rollup/src/args.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ pub struct L1ProviderArgs {
4545
pub initial_backoff: u64,
4646
}
4747

48-
#[derive(Debug, clap::Args)]
48+
#[derive(Debug, Clone, clap::Args)]
4949
pub struct SequencerArgs {
5050
/// The block time for the sequencer.
5151
#[arg(long)]

bin/rollup/src/network.rs

+9-5
Original file line numberDiff line numberDiff line change
@@ -154,17 +154,21 @@ where
154154
// Construct the l1 provider.
155155
let beacon_provider = beacon_provider(l1_provider_args.beacon_rpc_url.to_string());
156156
let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0);
157-
let l1_provider =
158-
OnlineL1Provider::new(beacon_provider, PROVIDER_BLOB_CACHE_SIZE, l1_messages_provider)
159-
.await;
157+
let l1_provider = OnlineL1Provider::new(
158+
beacon_provider,
159+
PROVIDER_BLOB_CACHE_SIZE,
160+
l1_messages_provider.clone(),
161+
)
162+
.await;
160163

161164
// Construct the Sequencer.
162165
let (sequencer, block_time) = if let Some(args) = self.config.sequencer_args {
163-
let message_provider = DatabaseL1MessageProvider::new(db.clone(), 0);
164166
let sequencer = Sequencer::new(
165-
Arc::new(message_provider),
167+
Arc::new(l1_messages_provider),
166168
args.fee_recipient.unwrap_or_default(),
167169
args.max_l1_messages_per_block,
170+
0,
171+
0,
168172
);
169173
(Some(sequencer), Some(args.block_time))
170174
} else {

crates/node/src/lib.rs

+11-11
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use futures::StreamExt;
44
use reth_tokio_util::{EventSender, EventStream};
55
use rollup_node_indexer::{Indexer, IndexerEvent};
6-
use rollup_node_sequencer::{Sequencer, SequencerL1MessageProvider};
6+
use rollup_node_sequencer::Sequencer;
77
use rollup_node_watcher::L1Notification;
88
use scroll_alloy_provider::ScrollEngineApi;
99
use scroll_engine::{EngineDriver, EngineDriverEvent};
@@ -31,7 +31,7 @@ pub use consensus::PoAConsensus;
3131
mod consensus;
3232

3333
use consensus::Consensus;
34-
use rollup_node_providers::{ExecutionPayloadProvider, L1Provider};
34+
use rollup_node_providers::{ExecutionPayloadProvider, L1MessageProvider, L1Provider};
3535
use scroll_db::Database;
3636
use scroll_derivation_pipeline::DerivationPipeline;
3737

@@ -51,7 +51,7 @@ const EVENT_CHANNEL_SIZE: usize = 100;
5151
/// - `forkchoice_state`: The forkchoice state of the rollup node.
5252
/// - `pending_block_imports`: A collection of pending block imports.
5353
/// - `event_sender`: An event sender for sending events to subscribers of the rollup node manager.
54-
pub struct RollupNodeManager<C, EC, P, L1P, SMP> {
54+
pub struct RollupNodeManager<C, EC, P, L1P, L1MP> {
5555
/// The network manager that manages the scroll p2p network.
5656
network: NetworkManager,
5757
/// The engine driver used to communicate with the engine.
@@ -69,13 +69,13 @@ pub struct RollupNodeManager<C, EC, P, L1P, SMP> {
6969
/// An event sender for sending events to subscribers of the rollup node manager.
7070
event_sender: Option<EventSender<RollupEvent>>,
7171
/// The sequencer which is responsible for sequencing transactions and producing new blocks.
72-
sequencer: Option<Sequencer<SMP>>,
72+
sequencer: Option<Sequencer<L1MP>>,
7373
/// The trigger for the block building process.
7474
block_building_trigger: Option<Interval>,
7575
}
7676

77-
impl<C: Debug, EC: Debug, P: Debug, L1P: Debug, SMP: Debug> Debug
78-
for RollupNodeManager<C, EC, P, L1P, SMP>
77+
impl<C: Debug, EC: Debug, P: Debug, L1P: Debug, L1MP: Debug> Debug
78+
for RollupNodeManager<C, EC, P, L1P, L1MP>
7979
{
8080
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
8181
f.debug_struct("RollupNodeManager")
@@ -93,13 +93,13 @@ impl<C: Debug, EC: Debug, P: Debug, L1P: Debug, SMP: Debug> Debug
9393
}
9494
}
9595

96-
impl<C, EC, P, L1P, SMP> RollupNodeManager<C, EC, P, L1P, SMP>
96+
impl<C, EC, P, L1P, L1MP> RollupNodeManager<C, EC, P, L1P, L1MP>
9797
where
9898
C: Consensus + Unpin,
9999
EC: ScrollEngineApi + Unpin + Sync + Send + 'static,
100100
P: ExecutionPayloadProvider + Unpin + Send + Sync + 'static,
101101
L1P: L1Provider + Clone + Send + Sync + 'static,
102-
SMP: SequencerL1MessageProvider + Unpin + Send + Sync + 'static,
102+
L1MP: L1MessageProvider + Unpin + Send + Sync + 'static,
103103
{
104104
/// Create a new [`RollupNodeManager`] instance.
105105
#[allow(clippy::too_many_arguments)]
@@ -111,7 +111,7 @@ where
111111
l1_notification_rx: Option<Receiver<Arc<L1Notification>>>,
112112
consensus: C,
113113
new_block_rx: Option<UnboundedReceiver<NewBlockWithPeer>>,
114-
sequencer: Option<Sequencer<SMP>>,
114+
sequencer: Option<Sequencer<L1MP>>,
115115
block_time: Option<u64>,
116116
) -> Self {
117117
let indexer = Indexer::new(database.clone());
@@ -224,13 +224,13 @@ where
224224
}
225225
}
226226

227-
impl<C, EC, P, L1P, SMP> Future for RollupNodeManager<C, EC, P, L1P, SMP>
227+
impl<C, EC, P, L1P, L1MP> Future for RollupNodeManager<C, EC, P, L1P, L1MP>
228228
where
229229
C: Consensus + Unpin,
230230
EC: ScrollEngineApi + Unpin + Sync + Send + 'static,
231231
P: ExecutionPayloadProvider + Unpin + Send + Sync + 'static,
232232
L1P: L1Provider + Clone + Unpin + Send + Sync + 'static,
233-
SMP: SequencerL1MessageProvider + Unpin + Send + Sync + 'static,
233+
L1MP: L1MessageProvider + Unpin + Send + Sync + 'static,
234234
{
235235
type Output = ();
236236

crates/providers/src/l1/message/database.rs

-78
Original file line numberDiff line numberDiff line change
@@ -49,81 +49,3 @@ impl<DB: DatabaseConnectionProvider + Sync> L1MessageProvider for DatabaseL1Mess
4949
self.index.fetch_add(1, Ordering::Relaxed);
5050
}
5151
}
52-
53-
/// A provider that can provide L1 messages with a delay.
54-
/// This provider is used to delay the L1 messages by a certain number of blocks which builds
55-
/// confidence in the L1 message not being reorged.
56-
#[derive(Debug)]
57-
pub struct DatabaseL1MessageDelayProvider<DB> {
58-
/// The database L1 message provider.
59-
l1_message_provider: DatabaseL1MessageProvider<DB>,
60-
/// The current L1 block number.
61-
l1_tip: AtomicU64,
62-
/// The number of blocks to wait for before including a L1 message in a block.
63-
l1_message_delay: u64,
64-
}
65-
66-
impl<DB> DatabaseL1MessageDelayProvider<DB> {
67-
/// Returns a new instance of the [`DatabaseL1MessageDelayProvider`].
68-
pub fn new(
69-
l1_message_provider: DatabaseL1MessageProvider<DB>,
70-
l1_tip_block_number: u64,
71-
l1_message_delay: u64,
72-
) -> Self {
73-
Self { l1_message_provider, l1_tip: l1_tip_block_number.into(), l1_message_delay }
74-
}
75-
76-
/// Sets the block number of the current L1 head.
77-
pub fn set_l1_head(&self, block_number: u64) {
78-
self.l1_tip.store(block_number, Ordering::Relaxed);
79-
}
80-
}
81-
82-
/// A trait that allows the L1 message delay provider to set the current head number.
83-
pub trait L1MessageDelayProvider {
84-
/// Set the number of the current L1 head block number.
85-
fn set_l1_head(&self, _block_number: u64) {}
86-
}
87-
88-
impl<DB> L1MessageDelayProvider for DatabaseL1MessageDelayProvider<DB> {
89-
fn set_l1_head(&self, block_number: u64) {
90-
Self::set_l1_head(self, block_number);
91-
}
92-
}
93-
94-
impl<DB> L1MessageDelayProvider for DatabaseL1MessageProvider<DB> {}
95-
96-
#[async_trait::async_trait]
97-
impl<DB: DatabaseConnectionProvider + Sync> L1MessageProvider
98-
for DatabaseL1MessageDelayProvider<DB>
99-
{
100-
type Error = L1ProviderError;
101-
102-
async fn get_l1_message_with_block_number(
103-
&self,
104-
) -> Result<Option<L1MessageWithBlockNumber>, Self::Error> {
105-
let msg_w_bn = self.l1_message_provider.get_l1_message_with_block_number().await?;
106-
let result = if let Some(msg_w_bn) = msg_w_bn {
107-
let tx_block_number = msg_w_bn.block_number;
108-
let depth = self.l1_tip.load(Ordering::Relaxed) - tx_block_number;
109-
(depth >= self.l1_message_delay).then_some(msg_w_bn)
110-
} else {
111-
None
112-
};
113-
114-
Ok(result)
115-
}
116-
117-
fn set_index_cursor(&self, index: u64) {
118-
self.l1_message_provider.set_index_cursor(index);
119-
}
120-
121-
fn set_hash_cursor(&self, _hash: B256) {
122-
// TODO: issue 43
123-
todo!()
124-
}
125-
126-
fn increment_cursor(&self) {
127-
self.l1_message_provider.increment_cursor();
128-
}
129-
}

crates/providers/src/l1/message/mod.rs

+26-3
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@ use scroll_alloy_consensus::TxL1Message;
66
use scroll_db::{DatabaseConnectionProvider, DatabaseOperations};
77

88
mod database;
9-
pub use database::{
10-
DatabaseL1MessageDelayProvider, DatabaseL1MessageProvider, L1MessageDelayProvider,
11-
};
9+
pub use database::DatabaseL1MessageProvider;
1210

1311
/// An instance of the trait can provide L1 messages using a cursor approach. Set the cursor for the
1412
/// provider using the queue index or hash and then call
@@ -37,12 +35,37 @@ pub trait L1MessageProvider {
3735
}
3836
}
3937

38+
/// Returns the L1 message with block number at the current cursor and advances the cursor if
39+
/// the predicate is satisfied.
40+
async fn next_l1_message_with_block_number_and_predicate(
41+
&self,
42+
predicate: impl Fn(L1MessageWithBlockNumber) -> bool + Send,
43+
) -> Result<Option<L1MessageWithBlockNumber>, Self::Error> {
44+
match self.get_l1_message_with_block_number().await? {
45+
Some(message) if predicate(message.clone()) => {
46+
self.increment_cursor();
47+
Ok(Some(message))
48+
}
49+
_ => Ok(None),
50+
}
51+
}
52+
4053
/// Returns the L1 message with block number at the current cursor and advances the cursor.
4154
async fn next_l1_message(&self) -> Result<Option<TxL1Message>, Self::Error> {
4255
let message = self.next_l1_message_with_block_number().await?;
4356
Ok(message.map(|message| message.transaction))
4457
}
4558

59+
/// Returns the L1 message with block number at the current cursor and advances the cursor if
60+
/// the predicate is satisfied.
61+
async fn next_l1_message_with_predicate(
62+
&self,
63+
predicate: impl Fn(L1MessageWithBlockNumber) -> bool + Send,
64+
) -> Result<Option<TxL1Message>, Self::Error> {
65+
let message = self.next_l1_message_with_block_number_and_predicate(predicate).await?;
66+
Ok(message.map(|message| message.transaction))
67+
}
68+
4669
/// Returns the L1 message at the current cursor.
4770
/// This method does not advance the cursor.
4871
async fn get_l1_message(&self) -> Result<Option<TxL1Message>, Self::Error> {

crates/providers/src/lib.rs

+1-4
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,7 @@ mod execution_payload;
1212

1313
pub use l1::{
1414
blob::L1BlobProvider,
15-
message::{
16-
DatabaseL1MessageDelayProvider, DatabaseL1MessageProvider, L1MessageDelayProvider,
17-
L1MessageProvider,
18-
},
15+
message::{DatabaseL1MessageProvider, L1MessageProvider},
1916
L1Provider, L1ProviderError, OnlineL1Provider,
2017
};
2118
mod l1;

crates/sequencer/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ reth-scroll-primitives.workspace = true
2626
# rollup-node
2727
scroll-db = { workspace = true }
2828
rollup-node-providers.workspace = true
29+
rollup-node-primitives.workspace = true
2930

3031
# misc
3132
futures.workspace = true

crates/sequencer/src/lib.rs

+41-19
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ use alloy_eips::eip2718::Encodable2718;
1212
use alloy_primitives::Address;
1313
use alloy_rpc_types_engine::PayloadAttributes;
1414
use futures::{task::AtomicWaker, Stream};
15-
use rollup_node_providers::{L1MessageDelayProvider, L1MessageProvider};
15+
use rollup_node_primitives::L1MessageWithBlockNumber;
16+
use rollup_node_providers::L1MessageProvider;
1617
use scroll_alloy_rpc_types_engine::ScrollPayloadAttributes;
1718
use std::task::{Context, Poll};
1819

@@ -23,34 +24,42 @@ pub use error::SequencerError;
2324
pub type PayloadBuildingJobFuture =
2425
Pin<Box<dyn Future<Output = Result<ScrollPayloadAttributes, SequencerError>> + Send>>;
2526

26-
/// A trait used to define the L1 message provider for the sequencer.
27-
pub trait SequencerL1MessageProvider: L1MessageProvider + L1MessageDelayProvider {}
28-
impl<T> SequencerL1MessageProvider for T where T: L1MessageProvider + L1MessageDelayProvider {}
29-
3027
/// The sequencer is responsible for sequencing transactions and producing new blocks.
31-
pub struct Sequencer<SMP> {
28+
pub struct Sequencer<P> {
3229
/// A reference to the database
33-
provider: Arc<SMP>,
30+
provider: Arc<P>,
3431
/// The fee recipient
3532
fee_recipient: Address,
3633
/// The number of L1 messages to include in each block.
3734
max_l1_messages_per_block: u64,
35+
/// The current l1 block number.
36+
l1_block_number: u64,
37+
/// The L1 block depth at which L1 messages should be included in the payload.
38+
l1_block_depth: u64,
3839
/// The inflight payload attributes job
3940
payload_attributes_job: Option<PayloadBuildingJobFuture>,
4041
/// A waker to notify when the Sequencer should be polled.
4142
waker: AtomicWaker,
4243
}
4344

44-
impl<SMP> Sequencer<SMP>
45+
impl<P> Sequencer<P>
4546
where
46-
SMP: SequencerL1MessageProvider + Unpin + Send + Sync + 'static,
47+
P: L1MessageProvider + Unpin + Send + Sync + 'static,
4748
{
4849
/// Creates a new sequencer.
49-
pub fn new(provider: Arc<SMP>, fee_recipient: Address, max_l1_messages_per_block: u64) -> Self {
50+
pub fn new(
51+
provider: Arc<P>,
52+
fee_recipient: Address,
53+
max_l1_messages_per_block: u64,
54+
l1_block_number: u64,
55+
l1_block_depth: u64,
56+
) -> Self {
5057
Self {
5158
provider,
5259
fee_recipient,
5360
max_l1_messages_per_block,
61+
l1_block_number,
62+
l1_block_depth,
5463
payload_attributes_job: None,
5564
waker: AtomicWaker::new(),
5665
}
@@ -79,23 +88,32 @@ where
7988
};
8089
let max_l1_messages = self.max_l1_messages_per_block;
8190
let database = self.provider.clone();
91+
let l1_block_number = self.l1_block_number;
92+
let l1_block_depth = self.l1_block_depth;
8293

8394
self.payload_attributes_job = Some(Box::pin(async move {
84-
build_payload_attributes(database, max_l1_messages, payload_attributes).await
95+
build_payload_attributes(
96+
database,
97+
max_l1_messages,
98+
payload_attributes,
99+
l1_block_number,
100+
l1_block_depth,
101+
)
102+
.await
85103
}));
86104

87105
self.waker.wake();
88106
}
89107

90108
/// Handle a reorg event.
91-
pub fn handle_reorg(&mut self, queue_index: u64, block_number: u64) {
109+
pub fn handle_reorg(&mut self, queue_index: u64, l1_block_number: u64) {
92110
self.provider.set_index_cursor(queue_index);
93-
self.provider.set_l1_head(block_number);
111+
self.l1_block_number = l1_block_number;
94112
}
95113

96114
/// Handle a new L1 block.
97115
pub fn handle_new_l1_block(&mut self, block_number: u64) {
98-
self.provider.set_l1_head(block_number);
116+
self.l1_block_number = block_number;
99117
}
100118
}
101119

@@ -127,17 +145,21 @@ impl<SMP> Stream for Sequencer<SMP> {
127145
/// Builds the payload attributes for the sequencer using the given L1 message provider.
128146
/// It collects the L1 messages to include in the payload and returns a `ScrollPayloadAttributes`
129147
/// instance.
130-
async fn build_payload_attributes<
131-
SMP: SequencerL1MessageProvider + Unpin + Send + Sync + 'static,
132-
>(
133-
provider: Arc<SMP>,
148+
async fn build_payload_attributes<P: L1MessageProvider + Unpin + Send + Sync + 'static>(
149+
provider: Arc<P>,
134150
max_l1_messages: u64,
135151
payload_attributes: PayloadAttributes,
152+
current_l1_block_number: u64,
153+
l1_block_depth: u64,
136154
) -> Result<ScrollPayloadAttributes, SequencerError> {
155+
let predicate = |message: L1MessageWithBlockNumber| {
156+
message.block_number + l1_block_depth <= current_l1_block_number
157+
};
158+
137159
// Collect L1 messages to include in payload.
138160
let mut l1_messages = vec![];
139161
for _ in 0..max_l1_messages {
140-
match provider.next_l1_message().await.map_err(Into::into)? {
162+
match provider.next_l1_message_with_predicate(predicate).await.map_err(Into::into)? {
141163
Some(l1_message) => {
142164
l1_messages.push(l1_message.encoded_2718().into());
143165
}

0 commit comments

Comments
 (0)