Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 57 additions & 89 deletions crates/core/common/src/streaming_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,23 +108,12 @@ pub enum SpawnError {
info: crate::plan_visitors::CrossNetworkJoinInfo,
},

/// Query references tables from multiple blockchain networks
/// Failed to resolve raw dataset from dependencies
///
/// Streaming queries must operate on a single blockchain network. This error
/// occurs when the catalog contains tables from different networks (e.g., both
/// Ethereum mainnet and Polygon).
///
/// Common causes:
/// - User query joins tables across different networks
/// - Catalog construction error including wrong network tables
/// - Dataset dependency resolution pulling in multi-network data
///
/// The networks field lists all detected networks in the query.
///
/// Streaming queries require a single chain for consistent block ordering
/// and watermark tracking.
#[error("multi-network streaming queries are not supported: {networks:?}")]
MultiNetwork { networks: Vec<NetworkId> },
/// This occurs when BFS through dataset dependencies fails to find a raw
/// (non-derived) dataset whose network can be used for the streaming query.
#[error("failed to resolve raw dataset from dependencies")]
ResolveRawDataset(#[source] ResolveRawDatasetError),

/// Failed to resolve blocks table for network
///
Expand Down Expand Up @@ -383,29 +372,22 @@ impl StreamingQuery {
.map_err(SpawnError::OptimizePlan)?
};

// Collect network and dataset ref data before moving catalog.
// Resolve the network by walking dataset dependencies to find a raw dataset,
// then resolve the blocks table for that network.
let (network, blocks_table) = {
let networks: BTreeSet<&NetworkId> =
catalog.physical_tables().map(|t| t.network()).collect();
if networks.len() != 1 {
let networks: Vec<NetworkId> = networks.into_iter().cloned().collect();
return Err(SpawnError::MultiNetwork { networks });
}
let network = networks.into_iter().next().unwrap().clone();

let unique_refs: BTreeSet<HashReference> = catalog
.physical_tables()
.map(|t| t.dataset_reference().clone())
.collect();

let blocks_table = resolve_blocks_table(
dataset_store,
exec_env.store.clone(),
unique_refs.iter(),
&network,
)
.await
.map_err(SpawnError::ResolveBlocksTable)?;
let (network, raw_dataset) =
resolve_raw_dataset_from_dependencies(dataset_store, unique_refs.iter())
.await
.map_err(SpawnError::ResolveRawDataset)?;

let blocks_table = resolve_blocks_table(raw_dataset, exec_env.store.clone())
.await
.map_err(SpawnError::ResolveBlocksTable)?;

(network, blocks_table)
};
Expand Down Expand Up @@ -1166,15 +1148,9 @@ pub fn keep_alive_stream<'a>(

/// Return a table identifier, in the form `{dataset}.blocks`, for the given network.
async fn resolve_blocks_table(
dataset_store: &DatasetStore,
dataset: Arc<dyn Dataset>,
data_store: DataStore,
root_dataset_refs: impl Iterator<Item = &HashReference>,
network: &NetworkId,
) -> Result<CatalogTable, ResolveBlocksTableError> {
let dataset = search_dependencies_for_raw_dataset(dataset_store, root_dataset_refs, network)
.await
.map_err(ResolveBlocksTableError::SearchDependencies)?;

let table = dataset
.tables()
.iter()
Expand Down Expand Up @@ -1210,12 +1186,6 @@ async fn resolve_blocks_table(
/// This error type is used by `resolve_blocks_table()`.
#[derive(Debug, thiserror::Error)]
pub enum ResolveBlocksTableError {
/// Failed to search dependencies for raw dataset
///
/// This occurs when searching for a raw dataset matching the target network fails.
#[error("Failed to search dependencies for raw dataset")]
SearchDependencies(#[source] SearchDependenciesForRawDatasetError),

/// Blocks table not found
///
/// This occurs when the blocks table is not found in the dataset.
Expand All @@ -1235,96 +1205,94 @@ pub enum ResolveBlocksTableError {
TableNotSynced(String, String),
}

// Breadth-first search over dataset dependencies to find a raw dataset matching the target network.
async fn search_dependencies_for_raw_dataset(
/// Resolve the raw dataset and its network by BFS through dataset dependencies.
///
/// Returns the first raw (non-derived) dataset found and its network, validating that all raw
/// datasets in the dependency tree belong to the same network.
async fn resolve_raw_dataset_from_dependencies(
dataset_store: &DatasetStore,
root_dataset_refs: impl Iterator<Item = &HashReference>,
network: &NetworkId,
) -> Result<Arc<dyn Dataset>, SearchDependenciesForRawDatasetError> {
) -> Result<(NetworkId, Arc<dyn Dataset>), ResolveRawDatasetError> {
let mut found: Option<(NetworkId, Arc<dyn Dataset>)> = None;
let mut queue: VecDeque<Arc<dyn datasets_common::dataset::Dataset>> = VecDeque::new();
for hash_ref in root_dataset_refs {
let dataset = dataset_store
.get_dataset(hash_ref)
.await
.map_err(SearchDependenciesForRawDatasetError::GetDataset)?;
.map_err(ResolveRawDatasetError::GetDataset)?;
queue.push_back(dataset);
}

let mut visited = BTreeSet::new();
while let Some(dataset) = queue.pop_front() {
let dataset_ref = dataset.reference().clone();

// Skip duplicates
if !visited.insert(dataset_ref) {
continue;
}

// Check if this is a raw dataset matching the target network
// Raw dataset: record its network, fail if a second network appears
if !dataset.is::<DerivedDataset>()
&& let Some(table) = dataset.tables().first()
&& table.network() == network
{
// Found matching dataset
return Ok(dataset);
let network = table.network().clone();
match &found {
None => {
found = Some((network, dataset.clone()));
}
Some((first, _)) if *first != network => {
return Err(ResolveRawDatasetError::MultipleNetworks {
first: first.clone(),
second: network,
});
}
Some(_) => {} // same network, continue BFS
}
}

// Enqueue dependencies for exploration (only derived datasets have dependencies)
// Derived dataset: enqueue dependencies
if let Some(derived) = dataset.downcast_ref::<DerivedDataset>() {
for dep in derived.dependencies().values() {
// Resolve the reference to a hash reference first
let hash_ref = dataset_store
.resolve_revision(dep.to_reference())
.await
.map_err(SearchDependenciesForRawDatasetError::ResolveRevision)?
.map_err(ResolveRawDatasetError::ResolveRevision)?
.ok_or_else(|| {
SearchDependenciesForRawDatasetError::NotFound(
dep.to_reference().to_string(),
)
ResolveRawDatasetError::NotFound(dep.to_reference().to_string())
})?;
let dataset = dataset_store
.get_dataset(&hash_ref)
.await
.map_err(SearchDependenciesForRawDatasetError::GetDataset)?;

.map_err(ResolveRawDatasetError::GetDataset)?;
queue.push_back(dataset);
}
}
}

Err(SearchDependenciesForRawDatasetError::NoRawDatasetFound {
network: network.clone(),
})
found.ok_or(ResolveRawDatasetError::NoRawDatasetFound)
}

/// Errors that occur when searching dataset dependencies for a raw dataset
///
/// This error type is used by `search_dependencies_for_raw_dataset()`.
/// Errors that occur when resolving the raw dataset from dependencies.
#[derive(Debug, thiserror::Error)]
pub enum SearchDependenciesForRawDatasetError {
/// Failed to get dataset from dataset store
///
/// This occurs when retrieving the dataset instance from the dataset store fails.
/// The dataset store loads dataset manifests and parses them into Dataset instances.
#[error("Failed to get dataset")]
pub enum ResolveRawDatasetError {
/// Failed to get dataset from dataset store.
#[error("failed to get dataset")]
GetDataset(#[source] crate::dataset_store::GetDatasetError),

/// Failed to resolve revision
///
/// This occurs when resolving a dataset revision fails.
#[error("Failed to resolve revision")]
/// Failed to resolve revision.
#[error("failed to resolve revision")]
ResolveRevision(#[source] ResolveRevisionError),

/// Failed to find dependency
///
/// This occurs when a dependency is not found.
/// Dependency not found.
#[error("dependency '{0}' not found")]
NotFound(String),

/// No raw dataset found for network
///
/// This occurs when no raw dataset is found for the target network.
#[error("no raw dataset found for network '{network}'")]
NoRawDatasetFound { network: NetworkId },
/// Multiple networks found in the dependency tree.
#[error("multiple networks in dependency tree: {first} and {second}")]
MultipleNetworks { first: NetworkId, second: NetworkId },

/// No raw dataset found in the dependency tree.
#[error("no raw dataset found in dependency tree")]
NoRawDatasetFound,
}

#[cfg(test)]
Expand Down
13 changes: 3 additions & 10 deletions crates/services/admin-api/src/handlers/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,20 +359,13 @@ pub async fn handler(
// Prepend the special block number field
let schema = prepend_special_block_num_field(&schema);

// Extract networks from all tables in the catalog
let mut networks: Vec<NetworkId> = planning_ctx
.logical_tables()
.iter()
.map(|t| t.table().network().clone())
.collect();
networks.sort();
networks.dedup();

schemas.insert(
table_name,
TableSchemaWithNetworks {
schema: schema.into(),
networks,
// TODO: Remove this dummy network once the TS manifest builder no longer requires
// exactly one network from the schema endpoint.
networks: vec!["derived".parse().expect("valid network id")],
},
);
}
Expand Down