Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions bin/stateless-validator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,10 @@ tracing.workspace = true

[dev-dependencies]
# misc
base64.workspace = true
jsonrpsee.workspace = true
jsonrpsee-types.workspace = true
tempfile.workspace = true
tracing-subscriber.workspace = true
zstd.workspace = true

# stateless
stateless-test-utils = { path = "../../crates/stateless-test-utils" }
Expand Down
25 changes: 4 additions & 21 deletions bin/stateless-validator/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::{collections::HashMap, sync::Arc};

use alloy_primitives::{B256, BlockHash};
use alloy_rpc_types_eth::Block;
use base64::{Engine, engine::general_purpose::STANDARD as BASE64};
use clap::Parser;
use jsonrpsee::{
RpcModule,
Expand All @@ -16,7 +15,7 @@ use jsonrpsee::{
use jsonrpsee_types::error::{
CALL_EXECUTION_FAILED_CODE, ErrorObject, ErrorObjectOwned, INVALID_PARAMS_CODE,
};
use stateless_common::{RpcClient, WitnessRequestKeys};
use stateless_common::{RpcClient, WitnessRequestKeys, encode_witness_response};
use stateless_core::{
ChainStore, ContractStore, PipelineConfig, db::BlockMeta, pipeline::run_pipeline,
withdrawals::MptWitness,
Expand Down Expand Up @@ -353,25 +352,9 @@ async fn setup_mock_rpc_server(
)
})?;

let encoded = bincode::serde::encode_to_vec(
&(salt_witness, mpt_witness),
bincode::config::legacy(),
)
.map_err(|e| {
make_rpc_error(
CALL_EXECUTION_FAILED_CODE,
format!("Failed to serialize witness: {e}"),
)
})
.and_then(|raw| {
zstd::encode_all(raw.as_slice(), 9).map_err(|e| {
make_rpc_error(
CALL_EXECUTION_FAILED_CODE,
format!("Failed to compress witness: {e}"),
)
})
})
.map(|compressed| format!("v0:{}", BASE64.encode(compressed)))?;
let encoded = encode_witness_response(&salt_witness, &mpt_witness).map_err(|e| {
make_rpc_error(CALL_EXECUTION_FAILED_CODE, format!("Failed to encode witness: {e}"))
})?;

Ok::<_, ErrorObject<'static>>(encoded)
})
Expand Down
1 change: 1 addition & 0 deletions crates/stateless-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,5 @@ zstd.workspace = true
[dev-dependencies]
jsonrpsee.workspace = true
kanal.workspace = true
stateless-test-utils = { path = "../stateless-test-utils" }
tokio-util.workspace = true
6 changes: 6 additions & 0 deletions crates/stateless-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ pub use rpc_client::{
BackoffPolicy, CodeFetchError, RpcClient, RpcClientConfig, RpcDeadlineExceeded,
SetValidatedBlocksResponse, WitnessRequestKeys,
};
pub mod witness_encoding;
pub use witness_encoding::{
WITNESS_RESPONSE_VERSION_PREFIX, WitnessDecodingError, WitnessEncodingError,
decode_witness_payload, decode_witness_response, encode_witness_payload,
encode_witness_response,
};
pub mod witness_size;
pub use witness_size::{WitnessSizeBreakdown, estimate_witness_size};

Expand Down
21 changes: 7 additions & 14 deletions crates/stateless-common/src/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use std::{
use alloy_primitives::{B256, Bytes, U64};
use alloy_provider::{Provider, ProviderBuilder, RootProvider};
use alloy_rpc_types_eth::{Block, BlockId, BlockNumberOrTag, Header};
use base64::{Engine, engine::general_purpose::STANDARD as BASE64};
use eyre::{Context, Result, ensure, eyre};
use futures::future;
use op_alloy_network::Optimism;
Expand All @@ -51,6 +50,7 @@ use tracing::{trace, warn};

use crate::{
metrics::{RpcMethod, RpcMetrics},
witness_encoding::decode_witness_response,
witness_size::WitnessSizeBreakdown,
};

Expand Down Expand Up @@ -1028,8 +1028,8 @@ async fn do_get_header(

/// Fetches and decodes witness data from a single RPC provider (one attempt, no retry).
///
/// Format: `"v0:<base64>"` string → base64-decode → zstd-decompress → bincode-legacy-decode
/// into `(SaltWitness, MptWitness)`.
/// Decodes the versioned `mega_getBlockWitness` response with
/// [`decode_witness_response`](crate::decode_witness_response).
async fn fetch_witness_raw(
provider: &RootProvider,
number: u64,
Expand All @@ -1045,16 +1045,8 @@ async fn fetch_witness_raw(
let decode_start = Instant::now();
let (salt_witness, mpt_witness) =
tokio::task::spawn_blocking(move || -> Result<(SaltWitness, MptWitness)> {
let b64_data = encoded
.strip_prefix("v0:")
.ok_or_else(|| eyre!("Witness response missing 'v0:' prefix"))?;
let compressed = BASE64.decode(b64_data).context("base64 decode failed")?;
let decompressed =
zstd::decode_all(compressed.as_slice()).context("zstd decompress failed")?;
let (witness, _): ((SaltWitness, MptWitness), _) =
bincode::serde::decode_from_slice(&decompressed, bincode::config::legacy())
.context("bincode deserialize failed")?;
Ok(witness)
decode_witness_response(&encoded)
.map_err(|e| eyre!("failed to decode witness response: {e}"))
})
.await
.context("decode task panicked")??;
Expand Down Expand Up @@ -1156,6 +1148,7 @@ mod tests {
use tokio_util::sync::CancellationToken;

use super::*;
use crate::witness_encoding::WITNESS_RESPONSE_VERSION_PREFIX;

const LOCALHOST_A: &str = "http://localhost:8545";
const LOCALHOST_B: &str = "http://localhost:8546";
Expand Down Expand Up @@ -1569,7 +1562,7 @@ mod tests {
order.lock().unwrap().push(*label);
// Stub that fails base64/zstd decode, so the round continues to the next
// provider and we see the full routing order.
Ok::<_, ErrorObjectOwned>("v0:AAAA".to_string())
Ok::<_, ErrorObjectOwned>(format!("{WITNESS_RESPONSE_VERSION_PREFIX}AAAA"))
})
.unwrap();
})
Expand Down
167 changes: 167 additions & 0 deletions crates/stateless-common/src/witness_encoding.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
//! Shared witness wire-format helpers for RPC producers and consumers.
//!
//! This module encodes and decodes the temporary compatibility contract used by
//! `mega_getBlockWitness`:
//! `v0:base64(zstd(level=9, bincode-legacy((SaltWitness, MptWitness))))`.

use std::io;

use base64::{Engine, engine::general_purpose::STANDARD as BASE64};
use salt::SaltWitness;
use stateless_core::withdrawals::MptWitness;

/// Version prefix for the RPC response format:
/// `"v0:" + base64(zstd(level=9, bincode-legacy((SaltWitness, MptWitness))))`.
pub const WITNESS_RESPONSE_VERSION_PREFIX: &str = "v0:";

/// Errors produced while serializing or compressing a witness payload.
#[derive(Debug, thiserror::Error)]
pub enum WitnessEncodingError {
#[error("failed to serialize witness: {0}")]
Serialize(#[from] bincode::error::EncodeError),
#[error("failed to compress witness payload: {0}")]
Compress(#[from] io::Error),
}

/// Errors produced while decoding a witness payload or RPC response.
#[derive(Debug, thiserror::Error)]
pub enum WitnessDecodingError {
#[error("witness response missing '{WITNESS_RESPONSE_VERSION_PREFIX}' prefix")]
MissingPrefix,
#[error("failed to decode witness base64 payload: {0}")]
Base64(#[from] base64::DecodeError),
#[error("failed to decompress witness payload: {0}")]
Decompress(#[from] io::Error),
#[error("failed to deserialize witness: {0}")]
Deserialize(#[from] bincode::error::DecodeError),
}

/// Serializes and compresses the witness tuple into the binary payload carried by
/// the versioned RPC response.
///
/// Returns `(uncompressed_size, compressed_payload)`. `uncompressed_size` is the length of
/// the bincode-serialized tuple *before* zstd compression; producers use it on the upload
/// path for compression-ratio statistics. [`encode_witness_response`] discards it.
pub fn encode_witness_payload(
salt_witness: &SaltWitness,
withdrawal_witness: &MptWitness,
) -> Result<(usize, Vec<u8>), WitnessEncodingError> {
let original_data = bincode::serde::encode_to_vec(
(salt_witness, withdrawal_witness),
bincode::config::legacy(),
)?;
Comment thread
abelmega marked this conversation as resolved.
let original_size = original_data.len();
let compressed = zstd::encode_all(original_data.as_slice(), 9)?;
Ok((original_size, compressed))
}

/// Decompresses and deserializes the binary payload carried by the versioned RPC
/// response.
pub fn decode_witness_payload(
compressed: &[u8],
) -> Result<(SaltWitness, MptWitness), WitnessDecodingError> {
let decompressed = zstd::decode_all(compressed)?;
let (witness, _) = bincode::serde::decode_from_slice(&decompressed, bincode::config::legacy())?;
Ok(witness)
}

/// Encodes the witness tuple as a versioned RPC response string.
pub fn encode_witness_response(
salt_witness: &SaltWitness,
withdrawal_witness: &MptWitness,
) -> Result<String, WitnessEncodingError> {
let (_, compressed) = encode_witness_payload(salt_witness, withdrawal_witness)?;
Ok(format!("{WITNESS_RESPONSE_VERSION_PREFIX}{}", BASE64.encode(compressed)))
}

/// Decodes a versioned RPC response string into the witness tuple.
pub fn decode_witness_response(
response: &str,
) -> Result<(SaltWitness, MptWitness), WitnessDecodingError> {
let payload = response
.strip_prefix(WITNESS_RESPONSE_VERSION_PREFIX)
.ok_or(WitnessDecodingError::MissingPrefix)?;
let compressed = BASE64.decode(payload)?;
decode_witness_payload(&compressed)
}

#[cfg(test)]
mod tests {
use stateless_test_utils::fixtures::TestFixtures;

use super::*;

fn first_fixture_witness() -> (SaltWitness, MptWitness) {
let fixtures = TestFixtures::mainnet();
let (_, hash) = fixtures
.paired_blocks()
.into_iter()
.next()
.expect("mainnet fixtures should contain paired witnesses");
let salt_witness = fixtures.salt_witnesses[&hash].clone();
let (mpt_witness, _): (MptWitness, usize) = bincode::serde::decode_from_slice(
&fixtures.mpt_witness_bytes[&hash],
bincode::config::legacy(),
)
.expect("fixture MPT witness should decode");
(salt_witness, mpt_witness)
}

#[test]
fn encode_witness_payload_roundtrip() {
let (salt_witness, mpt_witness) = first_fixture_witness();

let (original_size, compressed) = encode_witness_payload(&salt_witness, &mpt_witness)
.expect("compression should succeed");
let decompressed =
zstd::decode_all(compressed.as_slice()).expect("decompression should succeed");
let (decoded, _): ((SaltWitness, MptWitness), usize) =
bincode::serde::decode_from_slice(&decompressed, bincode::config::legacy())
.expect("deserialization should succeed");

assert_eq!(original_size, decompressed.len());
assert_eq!(decoded.0, salt_witness);
assert_eq!(decoded.1, mpt_witness);
}

#[test]
fn encode_witness_response_roundtrip() {
let (salt_witness, mpt_witness) = first_fixture_witness();

let encoded =
encode_witness_response(&salt_witness, &mpt_witness).expect("encoding should succeed");
let decoded = decode_witness_response(&encoded).expect("decoding should succeed");

assert_eq!(decoded.0, salt_witness);
assert_eq!(decoded.1, mpt_witness);
}

#[test]
fn decode_witness_response_requires_prefix() {
let err = decode_witness_response("not-versioned").expect_err("missing prefix should fail");
assert!(matches!(err, WitnessDecodingError::MissingPrefix));
}
Comment thread
abelmega marked this conversation as resolved.

#[test]
fn decode_witness_response_invalid_base64() {
let err = decode_witness_response("v0:!!!").expect_err("invalid base64 should fail");
assert!(matches!(err, WitnessDecodingError::Base64(_)));
}

#[test]
fn decode_witness_response_invalid_payload() {
let err = decode_witness_response("v0:AAAA").expect_err("corrupt payload should fail");
assert!(matches!(err, WitnessDecodingError::Decompress(_)));
}

#[test]
fn decode_witness_response_invalid_witness() {
// Valid base64 and a valid zstd frame, but the decompressed bytes are not a
// bincode-encoded `(SaltWitness, MptWitness)` tuple.
let compressed =
zstd::encode_all(&b"not a witness"[..], 9).expect("compression should succeed");
let response = format!("{WITNESS_RESPONSE_VERSION_PREFIX}{}", BASE64.encode(compressed));
let err = decode_witness_response(&response).expect_err("corrupt witness should fail");
assert!(matches!(err, WitnessDecodingError::Deserialize(_)));
}
}
Loading