Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions crates/apollo_base_layer_tests/src/anvil_base_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,38 +195,45 @@ impl BaseLayerContract for AnvilBaseLayer {
type Error = EthereumBaseLayerError;

async fn get_proved_block_at(
&self,
&mut self,
l1_block: L1BlockNumber,
) -> Result<BlockHashAndNumber, Self::Error> {
self.ethereum_base_layer.get_proved_block_at(l1_block).await
}

async fn latest_l1_block_number(&self) -> Result<L1BlockNumber, Self::Error> {
async fn latest_l1_block_number(&mut self) -> Result<L1BlockNumber, Self::Error> {
self.ethereum_base_layer.latest_l1_block_number().await
}

async fn l1_block_at(
&self,
&mut self,
block_number: L1BlockNumber,
) -> Result<Option<L1BlockReference>, Self::Error> {
self.ethereum_base_layer.l1_block_at(block_number).await
}

async fn events<'a>(
&'a self,
&'a mut self,
block_range: RangeInclusive<L1BlockNumber>,
event_identifiers: &'a [&'a str],
) -> Result<Vec<L1Event>, Self::Error> {
self.ethereum_base_layer.events(block_range, event_identifiers).await
}

async fn get_block_header(
&self,
&mut self,
block_number: L1BlockNumber,
) -> Result<Option<L1BlockHeader>, Self::Error> {
self.ethereum_base_layer.get_block_header(block_number).await
}

async fn get_block_header_immutable(
&self,
block_number: L1BlockNumber,
) -> Result<Option<L1BlockHeader>, Self::Error> {
self.ethereum_base_layer.get_block_header_immutable(block_number).await
}

// TODO(Arni): Consider deleting this function from the trait.
async fn get_url(&self) -> Result<Url, Self::Error> {
Ok(self.ethereum_base_layer.url_iterator.get_current_url())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async fn anvil_starts_with_no_contract() {
ordered_l1_endpoint_urls: vec![anvil.endpoint_url()],
..Default::default()
};
let base_layer = EthereumBaseLayerContract::new(base_layer_config.clone());
let mut base_layer = EthereumBaseLayerContract::new(base_layer_config.clone());

let sender_address = ARBITRARY_ANVIL_L1_ACCOUNT_ADDRESS;
let receiver_address = OTHER_ARBITRARY_ANVIL_L1_ACCOUNT_ADDRESS;
Expand Down
8 changes: 4 additions & 4 deletions crates/apollo_central_sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub struct GenericStateSync<
central_source: Arc<TCentralSource>,
pending_source: Arc<TPendingSource>,
pending_classes: Arc<RwLock<PendingClasses>>,
base_layer_source: Option<Arc<TBaseLayerSource>>,
base_layer_source: Option<Arc<Mutex<TBaseLayerSource>>>,
reader: StorageReader,
writer: Arc<Mutex<StorageWriter>>,
sequencer_pub_key: Option<SequencerPublicKey>,
Expand Down Expand Up @@ -915,7 +915,7 @@ impl StateSync {
writer: StorageWriter,
class_manager_client: Option<SharedClassManagerClient>,
) -> Self {
let base_layer_source = base_layer_source.map(Arc::new);
let base_layer_source = base_layer_source.map(|source| Arc::new(Mutex::new(source)));
Self {
config,
shared_highest_block,
Expand Down Expand Up @@ -1004,15 +1004,15 @@ fn stream_new_compiled_classes<TCentralSource: CentralSourceTrait + Sync + Send>
// TODO(dvir): consider combine this function and store_base_layer_block.
fn stream_new_base_layer_block<TBaseLayerSource: BaseLayerSourceTrait + Sync>(
reader: StorageReader,
base_layer_source: Arc<TBaseLayerSource>,
base_layer_source: Arc<Mutex<TBaseLayerSource>>,
base_layer_propagation_sleep_duration: Duration,
) -> impl Stream<Item = Result<SyncEvent, StateSyncError>> {
try_stream! {
loop {
tokio::time::sleep(base_layer_propagation_sleep_duration).await;
let header_marker = reader.begin_ro_txn()?.get_header_marker()?;

match base_layer_source.latest_proved_block().await? {
match base_layer_source.lock().await.latest_proved_block().await? {
Some((block_number, _block_hash)) if header_marker <= block_number => {
debug!(
"Sync headers ({header_marker}) is behind the base layer tip \
Expand Down
4 changes: 2 additions & 2 deletions crates/apollo_central_sync/src/sources/base_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl<Error: std::error::Error + Sync + Send> BaseLayerSourceErrorTrait for Error
#[async_trait]
pub trait BaseLayerSourceTrait {
async fn latest_proved_block(
&self,
&mut self,
) -> Result<Option<(BlockNumber, BlockHash)>, BaseLayerSourceError>;
}

Expand All @@ -39,7 +39,7 @@ impl<
> BaseLayerSourceTrait for BaseLayerSource
{
async fn latest_proved_block(
&self,
&mut self,
) -> Result<Option<(BlockNumber, BlockHash)>, BaseLayerSourceError> {
let finality = 0;
let latest_l1_block_number = self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ async fn run_sync(
central_source: Arc::new(central),
pending_source: Arc::new(pending_source),
pending_classes: Arc::new(RwLock::new(PendingClasses::default())),
base_layer_source: Some(Arc::new(base_layer)),
base_layer_source: Some(Arc::new(Mutex::new(base_layer))),
reader,
writer: Arc::new(Mutex::new(writer)),
sequencer_pub_key: None,
Expand Down
8 changes: 5 additions & 3 deletions crates/apollo_central_sync/src/sync_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ async fn stream_new_base_layer_block_test_header_marker() {
let mut mock = MockBaseLayerSourceTrait::new();
mock.expect_latest_proved_block().times(4).returning(move || Ok(iter.next()));
let mut stream =
stream_new_base_layer_block(reader, Arc::new(mock), Duration::from_millis(0)).boxed();
stream_new_base_layer_block(reader, Arc::new(Mutex::new(mock)), Duration::from_millis(0))
.boxed();

let event = stream.next().await.unwrap().unwrap();
assert_matches!(event, SyncEvent::NewBaseLayerBlock { block_number: BlockNumber(1), .. });
Expand All @@ -167,7 +168,8 @@ async fn stream_new_base_layer_block_no_blocks_on_base_layer() {
mock.expect_latest_proved_block().times(2).returning(move || Ok(values.next().unwrap()));

let mut stream =
stream_new_base_layer_block(reader, Arc::new(mock), Duration::from_millis(0)).boxed();
stream_new_base_layer_block(reader, Arc::new(Mutex::new(mock)), Duration::from_millis(0))
.boxed();

let event = stream.next().await.unwrap().unwrap();
assert_matches!(event, SyncEvent::NewBaseLayerBlock { block_number: BlockNumber(1), .. });
Expand Down Expand Up @@ -201,7 +203,7 @@ async fn store_base_layer_block_test() {
central_source: Arc::new(MockCentralSourceTrait::new()),
pending_source: Arc::new(MockPendingSourceTrait::new()),
pending_classes: Arc::new(RwLock::new(PendingClasses::default())),
base_layer_source: Some(Arc::new(MockBaseLayerSourceTrait::new())),
base_layer_source: Some(Arc::new(Mutex::new(MockBaseLayerSourceTrait::new()))),
reader,
writer: Arc::new(Mutex::new(writer)),
sequencer_pub_key: None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use starknet_api::{calldata, contract_address, felt};
async fn events_from_other_contract() {
const EVENT_IDENTIFIERS: &[EventIdentifier] = &[LOG_MESSAGE_TO_L2_EVENT_IDENTIFIER];

let anvil_base_layer = AnvilBaseLayer::new(None).await;
let mut anvil_base_layer = AnvilBaseLayer::new(None).await;
// Anvil base layer already auto-deployed a starknet contract.
let this_contract = &anvil_base_layer.ethereum_base_layer.contract;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use starknet_api::transaction::{L1HandlerTransaction, TransactionHasher, Transac
#[tokio::test]
async fn scraper_end_to_end() {
// Setup.
let base_layer = AnvilBaseLayer::new(None).await;
let mut base_layer = AnvilBaseLayer::new(None).await;
let contract = &base_layer.ethereum_base_layer.contract;
let mut l1_provider_client = MockL1ProviderClient::default();

Expand Down Expand Up @@ -61,7 +61,7 @@ async fn scraper_end_to_end() {
let receipt = msg.send().await.unwrap().get_receipt().await.unwrap();
l1_handler_timestamps.push(
base_layer
.get_block_header(receipt.block_number.unwrap())
.get_block_header_immutable(receipt.block_number.unwrap())
.await
.unwrap()
.unwrap()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async fn latest_proved_block_ethereum() {
))),
};

let base_layer = AnvilBaseLayer::new(None).await;
let mut base_layer = AnvilBaseLayer::new(None).await;
let provider = &base_layer.anvil_provider;

let mut current_block = provider.get_block_number().await.expect("Failed to get block number");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use starknet_api::block::{BlockHash, BlockHashAndNumber, BlockNumber};

#[tokio::test]
async fn test_mocked_starknet_state_update() {
let base_layer = AnvilBaseLayer::new(None).await;
let mut base_layer = AnvilBaseLayer::new(None).await;

// Check that the contract was initialized (during the construction above).
let genesis_block_number = 1;
Expand Down
2 changes: 1 addition & 1 deletion crates/apollo_l1_gas_price/src/l1_gas_price_scraper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl<B: BaseLayerContract + Send + Sync + Debug> L1GasPriceScraper<B> {
Ok(())
}

async fn latest_l1_block_number(&self) -> L1GasPriceScraperResult<L1BlockNumber, B> {
async fn latest_l1_block_number(&mut self) -> L1GasPriceScraperResult<L1BlockNumber, B> {
let latest_l1_block_number = self
.base_layer
.latest_l1_block_number()
Expand Down
38 changes: 25 additions & 13 deletions crates/apollo_l1_provider/src/l1_scraper.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::any::type_name;
use std::fmt::Debug;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

use apollo_infra::component_client::ClientError;
Expand Down Expand Up @@ -66,10 +67,15 @@ impl<BaseLayerType: BaseLayerContract + Send + Sync + Debug> L1Scraper<BaseLayer

#[instrument(skip(self), err)]
pub async fn run(&mut self) -> L1ScraperResult<(), BaseLayerType> {
// Each of the following function is use as a closure sent to
// retry_until_base_layer_success. This will retry each of them, in turn, in an
// endless loop until they succeed. Since they have different return types, we have
// to use box/pin to pass them as a closure.

// Try to get all the information needed for initialization.
self.scrape_from_this_l1_block = Some(
self.retry_until_base_layer_success(
|_| self.fetch_start_block(),
|this| Box::pin(this.fetch_start_block()),
"fetching start block",
)
.await?,
Expand All @@ -78,7 +84,7 @@ impl<BaseLayerType: BaseLayerContract + Send + Sync + Debug> L1Scraper<BaseLayer
// The last historic L2 height is returned, to be using in initialization.
let historic_l2_height = self
.retry_until_base_layer_success(
|_| self.get_last_historic_l2_height(),
|this| Box::pin(this.get_last_historic_l2_height()),
"fetching last historic L2 height",
)
.await?;
Expand All @@ -87,7 +93,7 @@ impl<BaseLayerType: BaseLayerContract + Send + Sync + Debug> L1Scraper<BaseLayer
// Update the L1 block number we need to scrape from.
self.scrape_from_this_l1_block = Some(
self.retry_until_base_layer_success(
|_| self.initialize(historic_l2_height),
|this| Box::pin(this.initialize(historic_l2_height)),
"initializing",
)
.await?,
Expand All @@ -114,7 +120,7 @@ impl<BaseLayerType: BaseLayerContract + Send + Sync + Debug> L1Scraper<BaseLayer
/// Use config.startup_rewind_time_seconds to estimate an L1 block number
/// that is far enough back to start scraping from.
pub async fn fetch_start_block(
&self,
&mut self,
) -> Result<L1BlockReference, L1ScraperError<BaseLayerType>> {
// Define the safety margin (e.g., extra 50% over the required number of blocks).
const SAFTEY_MARGIN_NUMERATOR: u64 = 3;
Expand Down Expand Up @@ -162,7 +168,7 @@ impl<BaseLayerType: BaseLayerContract + Send + Sync + Debug> L1Scraper<BaseLayer
}

/// Get the last historic L2 height that was proved before the start block number.
async fn get_last_historic_l2_height(&self) -> L1ScraperResult<BlockNumber, BaseLayerType> {
async fn get_last_historic_l2_height(&mut self) -> L1ScraperResult<BlockNumber, BaseLayerType> {
let Some(start_block) = self.scrape_from_this_l1_block else {
panic!(
"Should never get last historic L2 height without first getting the last \
Expand Down Expand Up @@ -193,7 +199,7 @@ impl<BaseLayerType: BaseLayerContract + Send + Sync + Debug> L1Scraper<BaseLayer
/// was restarted).
#[instrument(skip(self), err)]
async fn initialize(
&self,
&mut self,
historic_l2_height: BlockNumber,
) -> L1ScraperResult<L1BlockReference, BaseLayerType> {
let (latest_l1_block, events) = self.fetch_events().await?;
Expand Down Expand Up @@ -238,7 +244,9 @@ impl<BaseLayerType: BaseLayerContract + Send + Sync + Debug> L1Scraper<BaseLayer
}

/// Query the L1 base layer for all events since scrape_from_this_l1_block.
async fn fetch_events(&self) -> L1ScraperResult<(L1BlockReference, Vec<Event>), BaseLayerType> {
async fn fetch_events(
&mut self,
) -> L1ScraperResult<(L1BlockReference, Vec<Event>), BaseLayerType> {
let scrape_timestamp = self.clock.unix_now();

let latest_l1_block_number = self
Expand Down Expand Up @@ -339,14 +347,18 @@ impl<BaseLayerType: BaseLayerContract + Send + Sync + Debug> L1Scraper<BaseLayer

/// Retry the given function if it gets base layer errors. Returns the result of the function in
/// case of success or the error in case of failure.
async fn retry_until_base_layer_success<T, FuncType, Fut>(
&self,
func: FuncType,
async fn retry_until_base_layer_success<T, FuncType>(
&mut self,
mut func: FuncType,
description: &str,
) -> L1ScraperResult<T, BaseLayerType>
// Accept closure that takes a mutable reference to self, returning a future returning a result.
where
FuncType: Fn(&Self) -> Fut,
Fut: Future<Output = L1ScraperResult<T, BaseLayerType>>,
for<'a> FuncType: FnMut(
&'a mut Self,
) -> Pin<
Box<dyn Future<Output = Result<T, L1ScraperError<BaseLayerType>>> + Send + 'a>,
>,
{
loop {
match func(self).await {
Expand All @@ -367,7 +379,7 @@ impl<BaseLayerType: BaseLayerContract + Send + Sync + Debug> L1Scraper<BaseLayer
/// Fetch scrape_from_this_l1_block again, check it still exists and that its hash is the same.
/// If a reorg occurred up to this block, return an error (existing data in the provider is
/// stale).
async fn assert_no_l1_reorgs(&self) -> L1ScraperResult<(), BaseLayerType> {
async fn assert_no_l1_reorgs(&mut self) -> L1ScraperResult<(), BaseLayerType> {
let Some(scrape_from_this_l1_block) = self.scrape_from_this_l1_block else {
panic!(
"Should never assert no l1 reorgs without first getting the last processed L1 \
Expand Down
6 changes: 3 additions & 3 deletions crates/apollo_l1_provider/tests/flow_test_cancellation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ use crate::utils::WAIT_FOR_ASYNC_PROCESSING_DURATION;
async fn new_l1_handler_tx_propose_validate_cancellation_timelock() {
// Setup.
// Setup the base layer.
let base_layer = setup_anvil_base_layer().await;
let mut base_layer = setup_anvil_base_layer().await;

let (l2_hash, nonce) = send_message_from_l1_to_l2(&base_layer, CALL_DATA).await;
let (l2_hash, nonce) = send_message_from_l1_to_l2(&mut base_layer, CALL_DATA).await;

let l1_provider_client =
setup_scraper_and_provider(base_layer.ethereum_base_layer.clone()).await;
Expand Down Expand Up @@ -110,7 +110,7 @@ async fn new_l1_handler_tx_propose_validate_cancellation_timelock() {
// TODO(guyn): check that the event gets deleted, after we add that functionality.

// Check that the scraper and provider are still working.
let (new_l2_hash, _nonce) = send_message_from_l1_to_l2(&base_layer, CALL_DATA_2).await;
let (new_l2_hash, _nonce) = send_message_from_l1_to_l2(&mut base_layer, CALL_DATA_2).await;

assert_ne!(new_l2_hash, l2_hash);

Expand Down
4 changes: 2 additions & 2 deletions crates/apollo_l1_provider/tests/flow_test_one_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ async fn new_l1_handler_tx_propose_validate_cooldown() {
// Setup.

// Setup the base layer.
let base_layer = setup_anvil_base_layer().await;
let mut base_layer = setup_anvil_base_layer().await;

let (l2_hash, _nonce) = send_message_from_l1_to_l2(&base_layer, CALL_DATA).await;
let (l2_hash, _nonce) = send_message_from_l1_to_l2(&mut base_layer, CALL_DATA).await;

let l1_provider_client =
setup_scraper_and_provider(base_layer.ethereum_base_layer.clone()).await;
Expand Down
4 changes: 2 additions & 2 deletions crates/apollo_l1_provider/tests/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,15 @@ pub(crate) async fn setup_scraper_and_provider<
// Need to allow dead code as this is only used in some of the test crates.
#[allow(dead_code)]
pub(crate) async fn send_message_from_l1_to_l2(
base_layer: &AnvilBaseLayer,
base_layer: &mut AnvilBaseLayer,
call_data: &[u8],
) -> (TransactionHash, Uint<256, 4>) {
let contract = &base_layer.ethereum_base_layer.contract;
let last_l1_block_number =
base_layer.ethereum_base_layer.latest_l1_block_number().await.unwrap();
assert!(last_l1_block_number > START_L1_BLOCK_NUMBER + NUMBER_OF_BLOCKS_TO_MINE);

// Send message from L1 to L2.
let contract = &base_layer.ethereum_base_layer.contract;
let call_data = convert_call_data_to_u256(call_data);
let fee = 1_u8;
let message_to_l2 = contract
Expand Down
Loading
Loading