Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/core/common/src/catalog/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ pub mod for_query;
pub mod reader;
pub mod snapshot;

pub use catalog::{Catalog, CatalogTable, EarliestBlockError};
pub use catalog::{Catalog, EarliestBlockError};
64 changes: 16 additions & 48 deletions crates/core/common/src/catalog/physical/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub struct Catalog {
/// UDFs specific to the datasets corresponding to the resolved tables.
udfs: Vec<ScalarUDF>,
/// The physical catalog entries, each pairing a physical table with SQL naming.
entries: Vec<CatalogTable>,
entries: Vec<(Arc<PhysicalTable>, Arc<str>)>,
/// Dependency alias to hash reference mappings for lazy resolution.
dep_aliases: BTreeMap<String, HashReference>,
}
Expand All @@ -26,7 +26,7 @@ impl Catalog {
pub fn new(
tables: Vec<LogicalTable>,
udfs: Vec<ScalarUDF>,
entries: Vec<CatalogTable>,
entries: Vec<(Arc<PhysicalTable>, Arc<str>)>,
dep_aliases: BTreeMap<String, HashReference>,
) -> Self {
Catalog {
Expand All @@ -43,14 +43,16 @@ impl Catalog {
}

/// Returns the catalog entries.
pub fn entries(&self) -> &[CatalogTable] {
pub fn entries(&self) -> &[(Arc<PhysicalTable>, Arc<str>)] {
&self.entries
}

/// Convenience iterator returning physical tables for consumers that only need
/// physical storage access (e.g. compaction, garbage collection).
pub fn physical_tables(&self) -> impl Iterator<Item = &Arc<PhysicalTable>> {
self.entries.iter().map(|entry| entry.physical_table())
self.entries
.iter()
.map(|(physical_table, _)| physical_table)
}

/// Returns the logical tables.
Expand All @@ -64,7 +66,14 @@ impl Catalog {
}

/// Consumes the catalog, returning its entries, logical tables, and UDFs.
pub fn into_parts(self) -> (Vec<CatalogTable>, Vec<LogicalTable>, Vec<ScalarUDF>) {
#[expect(clippy::type_complexity)]
pub fn into_parts(
self,
) -> (
Vec<(Arc<PhysicalTable>, Arc<str>)>,
Vec<LogicalTable>,
Vec<ScalarUDF>,
) {
(self.entries, self.tables, self.udfs)
}

Expand All @@ -74,9 +83,8 @@ impl Catalog {
/// when no table has synced data.
pub async fn earliest_block(&self) -> Result<Option<BlockNum>, EarliestBlockError> {
let mut earliest = None;
for entry in &self.entries {
let snapshot = entry
.physical_table()
for (physical_table, _) in &self.entries {
let snapshot = physical_table
.snapshot(false)
.await
.map_err(EarliestBlockError::Snapshot)?;
Expand Down Expand Up @@ -120,43 +128,3 @@ impl crate::retryable::RetryableErrorExt for EarliestBlockError {
}
}
}

/// A catalog entry that pairs a physical table with SQL naming information.
///
/// `PhysicalTable` represents pure physical storage (revision, segments, canonical chains,
/// file access). `CatalogTable` adds the SQL catalog identity — the schema string under
/// which the table is registered for SQL queries.
///
/// This separation allows physical-only consumers (compaction, garbage collection, parquet
/// writing) to work with `PhysicalTable` without carrying SQL naming concerns.
#[derive(Debug, Clone)]
pub struct CatalogTable {
/// The underlying physical table providing storage access (segments, snapshots, file I/O).
physical_table: Arc<PhysicalTable>,

/// The dataset reference portion of SQL table references.
///
/// SQL table references have the format `<dataset_ref>.<table>` (e.g., `anvil_rpc.blocks`).
/// This field stores the string form of the `<dataset_ref>` portion.
sql_schema_name: String,
}

impl CatalogTable {
/// Creates a new catalog table entry pairing a physical table with its SQL schema name.
pub fn new(physical_table: Arc<PhysicalTable>, sql_schema_name: String) -> Self {
Self {
physical_table,
sql_schema_name,
}
}

/// Returns a reference to the underlying physical table.
pub fn physical_table(&self) -> &Arc<PhysicalTable> {
&self.physical_table
}

/// Returns the dataset reference portion for SQL table references.
pub fn sql_schema_name(&self) -> &str {
&self.sql_schema_name
}
}
13 changes: 9 additions & 4 deletions crates/core/common/src/catalog/physical/for_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
//! It resolves dependency tables from manifest deps and SQL table references,
//! then adds physical parquet locations.

use std::collections::{BTreeMap, btree_map::Entry};
use std::{
collections::{BTreeMap, btree_map::Entry},
sync::Arc,
};

use amp_data_store::DataStore;
use datafusion::logical_expr::ScalarUDF;
use datasets_common::{hash::Hash, hash_reference::HashReference, table_name::TableName};
use datasets_derived::deps::DepAlias;

use super::catalog::{Catalog, CatalogTable};
use super::catalog::Catalog;
use crate::{
catalog::logical::LogicalTable,
datasets_cache::{DatasetsCache, GetDatasetError},
Expand Down Expand Up @@ -124,15 +127,17 @@ pub async fn create(
source,
})?;

let sql_schema_name = table.sql_schema_name().to_string();
let physical_table = PhysicalTable::from_revision(
data_store.clone(),
table.dataset_reference().clone(),
dataset.start_block(),
table.table().clone(),
revision,
);
entries.push(CatalogTable::new(physical_table.into(), sql_schema_name));
entries.push((
Arc::from(physical_table),
Arc::from(table.sql_schema_name()),
));
}

// Build dep_aliases map
Expand Down
9 changes: 6 additions & 3 deletions crates/core/common/src/catalog/physical/for_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
//! It resolves tables directly from parsed SQL table references, bypassing the
//! logical catalog.

use std::collections::BTreeSet;
use std::{collections::BTreeSet, sync::Arc};

use amp_data_store::DataStore;
use datasets_common::{
hash_reference::HashReference, partial_reference::PartialReference, reference::Reference,
table_name::TableName,
};

use super::catalog::{Catalog, CatalogTable};
use super::catalog::Catalog;
use crate::{
datasets_cache::{DatasetsCache, GetDatasetError, ResolveRevisionError},
physical_table::table::PhysicalTable,
Expand Down Expand Up @@ -107,7 +107,10 @@ pub async fn create(
dataset_table,
revision,
);
entries.push(CatalogTable::new(physical_table.into(), sql_schema_name));
entries.push((
Arc::from(physical_table),
Arc::from(sql_schema_name.as_str()),
));
}

Ok(Catalog::new(vec![], vec![], entries, Default::default()))
Expand Down
26 changes: 15 additions & 11 deletions crates/core/common/src/catalog/physical/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use datafusion::{
use datafusion_datasource::compute_all_files_statistics;
use futures::{Stream, StreamExt as _};

use super::{catalog::CatalogTable, reader};
use super::reader;
use crate::{
BlockRange,
catalog::logical::LogicalTable,
Expand All @@ -43,7 +43,7 @@ pub struct CatalogSnapshot {
/// UDFs specific to the datasets corresponding to the resolved tables.
udfs: Vec<ScalarUDF>,
/// Each snapshot is paired with its SQL table ref schema string.
table_snapshots: Vec<(Arc<PhyTableSnapshot>, String)>,
table_snapshots: Vec<(Arc<PhyTableSnapshot>, Arc<str>)>,
}

impl CatalogSnapshot {
Expand All @@ -54,17 +54,16 @@ impl CatalogSnapshot {
pub async fn from_catalog(
tables: Vec<LogicalTable>,
udfs: Vec<ScalarUDF>,
entries: &[CatalogTable],
entries: &[(Arc<PhysicalTable>, Arc<str>)],
ignore_canonical_segments: bool,
) -> Result<Self, FromCatalogError> {
let mut table_snapshots = Vec::new();
for entry in entries {
let snapshot = entry
.physical_table()
for (physical_table, sql_schema_name) in entries {
let snapshot = physical_table
.snapshot(ignore_canonical_segments)
.await
.map_err(FromCatalogError)?;
table_snapshots.push((Arc::new(snapshot), entry.sql_schema_name().to_string()));
table_snapshots.push((Arc::new(snapshot), Arc::clone(sql_schema_name)));
}

Ok(Self {
Expand All @@ -83,22 +82,27 @@ impl CatalogSnapshot {
pub fn table_snapshots(&self) -> impl Iterator<Item = (&Arc<PhyTableSnapshot>, &str)> {
self.table_snapshots
.iter()
.map(|(s, sql)| (s, sql.as_str()))
.map(|(snapshot, sql_schema_name)| (snapshot, &**sql_schema_name))
}

/// Returns the user-defined functions.
pub fn udfs(&self) -> &[ScalarUDF] {
&self.udfs
}

/// Reconstructs `CatalogTable` entries from the snapshotted data.
/// Reconstructs catalog entries from the snapshotted data.
///
/// Useful for rebuilding a `Catalog` from a snapshot (e.g. for forked-chain contexts).
// TODO: Revisit. Can reconstruct by any other means?
pub fn catalog_entries(&self) -> Vec<CatalogTable> {
pub fn catalog_entries(&self) -> Vec<(Arc<PhysicalTable>, Arc<str>)> {
self.table_snapshots
.iter()
.map(|(s, sql)| CatalogTable::new(s.physical_table().clone(), sql.clone()))
.map(|(snapshot, sql_schema_name)| {
(
snapshot.physical_table().clone(),
Arc::clone(sql_schema_name),
)
})
.collect()
}
}
Expand Down
8 changes: 4 additions & 4 deletions crates/core/common/src/plan_visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,12 +403,12 @@ pub fn find_cross_network_join(
let table_to_network: BTreeMap<DFTableReference, NetworkId> = catalog
.entries()
.iter()
.map(|t| {
.map(|(physical_table, sql_schema_name)| {
let table_ref = TableReference::Partial {
schema: Arc::new(t.sql_schema_name().to_owned()),
table: Arc::new(t.physical_table().table_name().clone()),
schema: Arc::new(sql_schema_name.to_string()),
table: Arc::new(physical_table.table_name().clone()),
};
(table_ref.into(), t.physical_table().network().clone())
(table_ref.into(), physical_table.network().clone())
})
.collect();

Expand Down
27 changes: 14 additions & 13 deletions crates/core/common/src/streaming_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@ use crate::{
array::{RecordBatch, TimestampNanosecondArray},
datatypes::SchemaRef,
},
catalog::{
logical::LogicalTable,
physical::{Catalog, CatalogTable},
},
catalog::{logical::LogicalTable, physical::Catalog},
context::{
exec::{ExecContext, ExecContextBuilder},
plan::PlanContextBuilder,
Expand Down Expand Up @@ -322,7 +319,7 @@ pub struct StreamingQuery {
preserve_block_num: bool,
network: NetworkId,
/// `blocks` table for the network associated with the catalog.
blocks_table: CatalogTable,
blocks_table: (Arc<PhysicalTable>, Arc<str>),
/// The single-network cursor for the previously processed range. This may be provided by the
/// consumer (as a multi-network cursor) and converted to this single-network cursor.
prev_cursor: Option<NetworkCursor>,
Expand Down Expand Up @@ -552,11 +549,11 @@ impl StreamingQuery {
let blocks_ctx = {
// Construct a catalog for the single `blocks_table`.
let catalog = {
let table = &self.blocks_table;
let (physical_table, sql_schema_name) = &self.blocks_table;
let resolved_table = LogicalTable::new(
table.sql_schema_name().to_string(),
table.physical_table().dataset_reference().clone(),
table.physical_table().table().clone(),
sql_schema_name.to_string(),
physical_table.dataset_reference().clone(),
physical_table.table().clone(),
);
Catalog::new(
vec![resolved_table],
Expand Down Expand Up @@ -839,11 +836,12 @@ impl StreamingQuery {
let hash_constraint = hash
.map(|h| format!("AND hash = x'{}'", h.encode_hex()))
.unwrap_or_default();
let (blocks_physical_table, blocks_sql_schema_name) = &self.blocks_table;
let sql = format!(
"SELECT hash, parent_hash, timestamp FROM {} WHERE block_num = {} {} LIMIT 1",
TableReference::Partial {
schema: Arc::new(self.blocks_table.sql_schema_name().to_owned()),
table: Arc::new(self.blocks_table.physical_table().table_name().clone()),
schema: Arc::new(blocks_sql_schema_name.to_string()),
table: Arc::new(blocks_physical_table.table_name().clone()),
}
.to_quoted_string(),
number,
Expand Down Expand Up @@ -1189,7 +1187,7 @@ pub fn keep_alive_stream<'a>(
async fn resolve_blocks_table(
dataset: Arc<RawDataset>,
data_store: DataStore,
) -> Result<CatalogTable, ResolveBlocksTableError> {
) -> Result<(Arc<PhysicalTable>, Arc<str>), ResolveBlocksTableError> {
let table = dataset
.tables()
.iter()
Expand Down Expand Up @@ -1217,7 +1215,10 @@ async fn resolve_blocks_table(
table.clone(),
revision,
);
Ok(CatalogTable::new(physical_table.into(), sql_schema_name))
Ok((
Arc::new(physical_table),
Arc::from(sql_schema_name.as_str()),
))
}

/// Errors that occur when resolving the blocks table for a network
Expand Down
4 changes: 2 additions & 2 deletions crates/core/common/tests/it_session_async_resolution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use amp_object_store::url::ObjectStoreUrl;
use amp_providers_registry::{ProviderConfigsStore, ProvidersRegistry};
use async_trait::async_trait;
use common::{
catalog::physical::{Catalog, CatalogTable},
catalog::physical::Catalog,
context::{
exec::ExecContextBuilder,
session::{SessionStateBuilder, is_user_input_error},
Expand Down Expand Up @@ -555,7 +555,7 @@ async fn exec_statement_to_plan_with_overlapping_async_and_physical_tables_succe
let catalog = Catalog::new(
vec![],
vec![],
vec![CatalogTable::new(physical_table, "test_schema".to_string())],
vec![(physical_table, Arc::from("test_schema"))],
Default::default(),
);

Expand Down
7 changes: 2 additions & 5 deletions crates/core/worker-datasets-raw/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,7 @@ use amp_worker_core::{
};
use common::{
BlockNum,
catalog::{
logical::LogicalTable,
physical::{Catalog, CatalogTable},
},
catalog::{logical::LogicalTable, physical::Catalog},
parquet::errors::ParquetError,
physical_table::{MissingRangesError, PhysicalTable, segments::merge_ranges},
retryable::RetryableErrorExt as _,
Expand Down Expand Up @@ -224,7 +221,7 @@ pub async fn dump(
vec![],
tables
.iter()
.map(|(t, _)| CatalogTable::new(Arc::clone(t), sql_schema_name.clone()))
.map(|(t, _)| (Arc::clone(t), Arc::from(sql_schema_name.as_str())))
.collect(),
Default::default(),
);
Expand Down
Loading