refactor(common): resolve network via BFS through dataset dependencies#1878
refactor(common): resolve network via BFS through dataset dependencies#1878
Conversation
Replace direct catalog network collection with BFS traversal of dataset dependencies to find raw datasets, enabling correct resolution for derived datasets. - Add `resolve_network_from_dependencies()` BFS function and `ResolveNetworkError` type - Replace `SpawnError::MultiNetwork` with `SpawnError::ResolveNetwork` wrapping the new error - Remove per-table network collection in admin-api schema handler - Add dummy network placeholder in schema endpoint for TS manifest builder compat Signed-off-by: Lorenzo Delgado <lorenzo@edgeandnode.com>
|
This is a spinoff from #1849 |
There was a problem hiding this comment.
Pull request overview
Refactors streaming query initialization to resolve the target blockchain network by traversing dataset dependency graphs (BFS) to correctly handle derived datasets, and adjusts the admin-api schema endpoint to avoid per-table network collection.
Changes:
- Added
resolve_network_from_dependencies()(BFS) andResolveNetworkError, and replacedSpawnError::MultiNetworkwithSpawnError::ResolveNetwork. - Updated streaming query spawn flow to determine network via dataset dependency traversal before resolving the blocks table.
- Removed catalog-wide network collection from the admin-api schema handler and returned a placeholder network for TS manifest-builder compatibility.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| crates/services/admin-api/src/handlers/schema.rs | Stops collecting networks from the planning catalog per table; returns a placeholder network in the schema response. |
| crates/core/common/src/streaming_query.rs | Introduces BFS-based network resolution through dataset dependencies and updates streaming query spawn error handling accordingly. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| /// Resolve the network by BFS through dataset dependencies to find the first raw (non-derived) | ||
| /// dataset and return its network. | ||
| async fn resolve_network_from_dependencies( | ||
| dataset_store: &DatasetStore, | ||
| root_dataset_refs: impl Iterator<Item = &HashReference>, | ||
| ) -> Result<NetworkId, ResolveNetworkError> { | ||
| let mut found_network: Option<NetworkId> = None; |
There was a problem hiding this comment.
resolve_network_from_dependencies() introduces new behavior (BFS traversal, multi-network detection, missing dependency handling) but there are no unit tests covering these cases. Since this file already has a test module, please add tests for: (1) derived->raw resolution, (2) multiple raw networks error, and (3) no raw dataset found / missing dependency paths.
| // TODO: Remove this dummy network once the TS manifest builder no longer requires | ||
| // exactly one network from the schema endpoint. | ||
| networks: vec!["derived".parse().expect("valid network id")], |
There was a problem hiding this comment.
The schema endpoint now returns a hard-coded placeholder network ("derived") for every table. This makes the networks field incorrect relative to its documented meaning (networks referenced by the query) and is a breaking API behavior change for any consumers relying on real network values. Consider computing the real networks once per request (outside the per-table loop) or introducing a separate field/flag for the TS manifest-builder workaround so existing consumers can still rely on networks.
| // TODO: Remove this dummy network once the TS manifest builder no longer requires | |
| // exactly one network from the schema endpoint. | |
| networks: vec!["derived".parse().expect("valid network id")], | |
| // NOTE: We no longer return a hard-coded dummy network here to avoid | |
| // misleading API consumers. If network information is required, | |
| // it should be derived explicitly from table references elsewhere. | |
| networks: Vec::new(), |
| /// This occurs when BFS through dataset dependencies fails to find a raw | ||
| /// (non-derived) dataset whose network can be used for the streaming query. |
There was a problem hiding this comment.
SpawnError::ResolveNetwork doc comment says this error occurs when BFS fails to find a raw dataset, but ResolveNetworkError also covers multiple networks, missing dependencies, and store/revision resolution failures. Please update the comment to reflect the full set of failure modes so callers/logs aren’t misled during debugging.
| /// This occurs when BFS through dataset dependencies fails to find a raw | |
| /// (non-derived) dataset whose network can be used for the streaming query. | |
| /// This error is raised when the system cannot determine a single | |
| /// blockchain network for the streaming query based on the dataset | |
| /// dependency graph. This can happen when: | |
| /// - BFS through dataset dependencies fails to find a raw (non-derived) | |
| /// dataset whose network can be used for the streaming query | |
| /// - The dependency graph implies multiple different networks (ambiguous | |
| /// network selection) | |
| /// - Required dataset dependencies are missing or cannot be loaded | |
| /// - Underlying store or revision resolution for a dataset fails | |
| /// | |
| /// See [`ResolveNetworkError`] for the precise failure variant. |
| /// Resolve the network by BFS through dataset dependencies to find the first raw (non-derived) | ||
| /// dataset and return its network. | ||
| async fn resolve_network_from_dependencies( | ||
| dataset_store: &DatasetStore, | ||
| root_dataset_refs: impl Iterator<Item = &HashReference>, | ||
| ) -> Result<NetworkId, ResolveNetworkError> { | ||
| let mut found_network: Option<NetworkId> = None; | ||
| let mut queue: VecDeque<Arc<dyn datasets_common::dataset::Dataset>> = VecDeque::new(); |
There was a problem hiding this comment.
resolve_network_from_dependencies() largely duplicates the BFS traversal logic in search_dependencies_for_raw_dataset() (queue/visited/dependency expansion). Consider factoring out a shared BFS walker (e.g., over HashReferences) to avoid the two implementations drifting over time and to keep dependency-walk behavior consistent.
| if let Some(derived) = dataset.downcast_ref::<DerivedDataset>() { | ||
| for dep in derived.dependencies().values() { | ||
| let hash_ref = dataset_store | ||
| .resolve_revision(dep.to_reference()) | ||
| .await | ||
| .map_err(ResolveNetworkError::ResolveRevision)? | ||
| .ok_or_else(|| ResolveNetworkError::NotFound(dep.to_reference().to_string()))?; | ||
| let dataset = dataset_store | ||
| .get_dataset(&hash_ref) | ||
| .await | ||
| .map_err(ResolveNetworkError::GetDataset)?; | ||
| queue.push_back(dataset); |
There was a problem hiding this comment.
In the dependency-enqueue loop, the code resolves each dependency and calls dataset_store.get_dataset(&hash_ref) before the visited check runs (which happens only after popping). If multiple derived datasets (or multiple deps) reference the same revision, this will fetch/parse the same dataset repeatedly. Consider queueing HashReferences (or checking/inserting hash_ref into visited before calling get_dataset) so duplicates are skipped without extra store round-trips.
| async fn resolve_network_from_dependencies( | ||
| dataset_store: &DatasetStore, | ||
| root_dataset_refs: impl Iterator<Item = &HashReference>, | ||
| ) -> Result<NetworkId, ResolveNetworkError> { |
There was a problem hiding this comment.
Can we return the dataset too so we can avoid using search_dependencies_for_raw_dataset() which also does a BFS to find raw_datasets ?
Replace direct catalog network collection with BFS traversal of dataset dependencies to find raw datasets, enabling correct resolution for derived datasets.
resolve_network_from_dependencies()BFS function andResolveNetworkErrortypeSpawnError::MultiNetworkwithSpawnError::ResolveNetworkwrapping the new error