Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 11 additions & 34 deletions crates/core/common/src/catalog/logical/for_admin_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ pub type TableReferencesMap = BTreeMap<
/// This function is used during derived dataset manifest validation to create a logical
/// catalog that can validate SQL queries against specific dataset versions.
///
/// ## Precondition
///
/// All dependency aliases referenced in `refs` (table and function references) must exist as keys
/// in `manifest_deps`. Callers must validate this before calling `create`; violation will panic.
///
/// ## Process
///
/// 1. Flattens table references from the references map
Expand Down Expand Up @@ -128,12 +133,9 @@ async fn resolve_tables<'a>(
}
TableReference::Partial { schema, table } => {
// Schema is already parsed as DepAlias, lookup in dependencies map
let dataset_ref = manifest_deps.get(schema.as_ref()).ok_or_else(|| {
ResolveTablesError::DependencyAliasNotFound {
table_name: table_name.clone(),
alias: schema.as_ref().clone(),
}
})?;
let dataset_ref = manifest_deps
.get(schema.as_ref())
.expect("dep alias validated before catalog creation");

// Skip if table reference is already resolved (optimization to avoid redundant dataset loading)
let Entry::Vacant(entry) = tables
Expand Down Expand Up @@ -217,12 +219,9 @@ async fn resolve_udfs<'a>(
match schema.as_ref() {
DepAliasOrSelfRef::DepAlias(dep_alias) => {
// External dependency reference - lookup in dependencies map
let dataset_ref = manifest_deps.get(dep_alias).ok_or_else(|| {
ResolveUdfsError::DependencyAliasNotFound {
table_name: table_name.clone(),
alias: dep_alias.clone(),
}
})?;
let dataset_ref = manifest_deps
.get(dep_alias)
.expect("dep alias validated before catalog creation");

// Check vacancy BEFORE loading dataset
let Entry::Vacant(entry) = udfs
Expand Down Expand Up @@ -336,17 +335,6 @@ pub enum ResolveTablesError {
table_ref: String,
},

/// Dependency alias not found when processing table reference
#[error(
"In table '{table_name}': Dependency alias '{alias}' referenced in table but not provided in dependencies"
)]
DependencyAliasNotFound {
/// The table being processed when the error occurred
table_name: TableName,
/// The dependency alias that was not found in the dependencies map
alias: DepAlias,
},

/// Failed to retrieve dataset from store when loading dataset for table reference
#[error("In table '{table_name}': Failed to retrieve dataset '{reference}'")]
GetDataset {
Expand Down Expand Up @@ -374,17 +362,6 @@ pub enum ResolveTablesError {

#[derive(Debug, thiserror::Error)]
pub enum ResolveUdfsError {
/// Dependency alias not found when processing function reference
#[error(
"In table '{table_name}': Dependency alias '{alias}' referenced in function but not provided in dependencies"
)]
DependencyAliasNotFound {
/// The table being processed when the error occurred
table_name: TableName,
/// The dependency alias that was not found in the dependencies map
alias: DepAlias,
},

/// Failed to retrieve dataset from store when loading dataset for function
#[error("In table '{table_name}': Failed to retrieve dataset '{reference}' for function")]
GetDataset {
Expand Down
45 changes: 11 additions & 34 deletions crates/core/common/src/catalog/logical/for_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ pub type ResolvedReferences = (
/// This function builds a logical catalog with schemas only, enabling query plan generation
/// and schema inference without accessing physical parquet files.
///
/// ## Precondition
///
/// All dependency aliases referenced in `refs` (table and function references) must exist as keys
/// in `manifest_deps`. Callers must validate this before calling `create`; violation will panic.
///
/// ## Where Used
///
/// This function is used during derived dataset dump execution when only logical validation
Expand Down Expand Up @@ -95,11 +100,9 @@ async fn resolve_tables(
}
TableReference::Partial { schema, table } => {
// Schema is already parsed as DepAlias, lookup in dependencies map
let dataset_ref = manifest_deps.get(schema.as_ref()).ok_or_else(|| {
ResolveTablesError::DependencyAliasNotFound {
alias: schema.as_ref().clone(),
}
})?;
let dataset_ref = manifest_deps
.get(schema.as_ref())
.expect("dep alias validated before catalog creation");

// Skip if table reference is already resolved (optimization to avoid redundant dataset loading)
let Entry::Vacant(entry) = tables
Expand Down Expand Up @@ -179,11 +182,9 @@ async fn resolve_udfs(
match schema.as_ref() {
DepAliasOrSelfRef::DepAlias(dep_alias) => {
// External dependency reference - lookup in dependencies map
let dataset_ref = manifest_deps.get(dep_alias).ok_or_else(|| {
ResolveUdfsError::DependencyAliasNotFound {
alias: dep_alias.clone(),
}
})?;
let dataset_ref = manifest_deps
.get(dep_alias)
.expect("dep alias validated before catalog creation");

// Check vacancy BEFORE loading dataset
let Entry::Vacant(entry) = udfs
Expand Down Expand Up @@ -307,18 +308,6 @@ pub enum ResolveTablesError {
table_ref: String,
},

/// Dependency alias not found when processing table reference.
///
/// This occurs when a table reference uses an alias that was not provided
/// in the dependencies map.
#[error(
"Dependency alias '{alias}' referenced in table reference but not provided in dependencies"
)]
DependencyAliasNotFound {
/// The dependency alias that was not found in the dependencies map
alias: DepAlias,
},

/// Failed to retrieve dataset from store when loading dataset for table reference.
///
/// This occurs when loading a dataset definition fails:
Expand Down Expand Up @@ -350,18 +339,6 @@ pub enum ResolveTablesError {
/// Errors that can occur when resolving UDF references with dependencies.
#[derive(Debug, thiserror::Error)]
pub enum ResolveUdfsError {
/// Dependency alias not found when processing function reference.
///
/// This occurs when a function reference uses an alias that was not provided
/// in the dependencies map.
#[error(
"Dependency alias '{alias}' referenced in function reference but not provided in dependencies"
)]
DependencyAliasNotFound {
/// The dependency alias that was not found in the dependencies map
alias: DepAlias,
},

/// Failed to retrieve dataset from store when loading dataset for function.
///
/// This occurs when loading a dataset definition for a function fails:
Expand Down
48 changes: 13 additions & 35 deletions crates/core/common/src/catalog/logical/for_manifest_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,13 @@ pub type TableReferencesMap = BTreeMap<
/// Builds a unified logical catalog from table and function references across multiple tables,
/// resolving dependency aliases to datasets for schema-only validation (no physical data access).
///
/// Delegates to specialized helpers:
/// ## Precondition
///
/// All dependency aliases referenced in `refs` (table and function references) must exist as keys
/// in `manifest_deps`. Callers must validate this before calling `create`; violation will panic.
///
/// ## Delegates to
///
/// - [`resolve_tables`] - Resolves table references to `LogicalTable` instances
/// - [`resolve_udfs`] - Resolves function references to UDFs
pub async fn create(
Expand Down Expand Up @@ -112,12 +118,9 @@ async fn resolve_tables<'a>(
});
}
TableReference::Partial { schema, table } => {
let dataset_ref = manifest_deps.get(schema.as_ref()).ok_or_else(|| {
ResolveTablesError::DependencyAliasNotFound {
table_name: table_name.clone(),
alias: schema.as_ref().clone(),
}
})?;
let dataset_ref = manifest_deps
.get(schema.as_ref())
.expect("dep alias validated before catalog creation");

let Entry::Vacant(entry) = tables
.entry(dataset_ref.hash().clone())
Expand Down Expand Up @@ -182,12 +185,9 @@ async fn resolve_udfs<'a>(
}
FunctionReference::Qualified { schema, function } => match schema.as_ref() {
DepAliasOrSelfRef::DepAlias(dep_alias) => {
let dataset_ref = manifest_deps.get(dep_alias).ok_or_else(|| {
ResolveUdfsError::DependencyAliasNotFound {
table_name: table_name.clone(),
alias: dep_alias.clone(),
}
})?;
let dataset_ref = manifest_deps
.get(dep_alias)
.expect("dep alias validated before catalog creation");

// Check vacancy BEFORE loading dataset
let Entry::Vacant(entry) = udfs
Expand Down Expand Up @@ -292,17 +292,6 @@ pub enum ResolveTablesError {
table_ref: String,
},

/// Dependency alias not found when processing table reference
#[error(
"In table '{table_name}': Dependency alias '{alias}' referenced in table but not provided in dependencies"
)]
DependencyAliasNotFound {
/// The table being processed when the error occurred
table_name: TableName,
/// The dependency alias that was not found in the dependencies map
alias: DepAlias,
},

/// Failed to retrieve dataset from store when loading dataset for table reference
#[error("In table '{table_name}': Failed to retrieve dataset '{reference}'")]
GetDataset {
Expand Down Expand Up @@ -330,17 +319,6 @@ pub enum ResolveTablesError {

#[derive(Debug, thiserror::Error)]
pub enum ResolveUdfsError {
/// Dependency alias not found when processing function reference
#[error(
"In table '{table_name}': Dependency alias '{alias}' referenced in function but not provided in dependencies"
)]
DependencyAliasNotFound {
/// The table being processed when the error occurred
table_name: TableName,
/// The dependency alias that was not found in the dependencies map
alias: DepAlias,
},

/// Failed to retrieve dataset from store when loading dataset for function
#[error("In table '{table_name}': Failed to retrieve dataset '{reference}' for function")]
GetDataset {
Expand Down
48 changes: 37 additions & 11 deletions crates/services/admin-api/src/handlers/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use common::{
exec_env::default_session_config,
metadata::{AmpMetadataFromParquetError, amp_metadata_from_parquet_file},
sql::{
ResolveFunctionReferencesError, ResolveTableReferencesError, resolve_function_references,
resolve_table_references,
FunctionReference, ResolveFunctionReferencesError, ResolveTableReferencesError,
TableReference, resolve_function_references, resolve_table_references,
},
};
use datafusion::sql::parser::Statement;
Expand Down Expand Up @@ -288,6 +288,18 @@ pub async fn validate_derived_manifest(
},
})?;

// Validate dependency aliases in table references before catalog creation
for table_ref in &table_refs {
if let TableReference::Partial { schema, .. } = table_ref
&& !dependencies.contains_key(schema.as_ref())
{
return Err(ManifestValidationError::DependencyAliasNotFound {
table_name: table_name.clone(),
alias: schema.to_string(),
});
}
}

// Extract function references (supports both external deps and self-references)
let func_refs = resolve_function_references::<DepAliasOrSelfRef>(&stmt).map_err(|err| {
ManifestValidationError::FunctionReferenceResolution {
Expand All @@ -296,6 +308,19 @@ pub async fn validate_derived_manifest(
}
})?;

// Validate dependency aliases in function references before catalog creation
for func_ref in &func_refs {
if let FunctionReference::Qualified { schema, .. } = func_ref
&& let DepAliasOrSelfRef::DepAlias(dep_alias) = schema.as_ref()
&& !dependencies.contains_key(dep_alias)
{
return Err(ManifestValidationError::DependencyAliasNotFound {
table_name: table_name.clone(),
alias: dep_alias.to_string(),
});
}
}

references.insert(table_name.clone(), (table_refs, func_refs));
statements.insert(table_name.clone(), stmt);
}
Expand Down Expand Up @@ -323,18 +348,12 @@ pub async fn validate_derived_manifest(
ResolveTablesError::UnqualifiedTable { .. } => {
ManifestValidationError::UnqualifiedTable(err)
}
ResolveTablesError::DependencyAliasNotFound { .. } => {
ManifestValidationError::DependencyAliasNotFound(err)
}
ResolveTablesError::GetDataset { .. } => ManifestValidationError::GetDataset(err),
ResolveTablesError::TableNotFoundInDataset { .. } => {
ManifestValidationError::TableNotFoundInDataset(err)
}
},
CreateLogicalCatalogError::ResolveUdfs(resolve_error) => match resolve_error {
ResolveUdfsError::DependencyAliasNotFound { .. } => {
ManifestValidationError::DependencyAliasNotFound(err)
}
ResolveUdfsError::GetDataset { .. } => ManifestValidationError::GetDataset(err),
ResolveUdfsError::EthCallUdfCreation { .. } => {
ManifestValidationError::EthCallUdfCreation(err)
Expand Down Expand Up @@ -512,9 +531,16 @@ pub enum ManifestValidationError {

/// Dependency alias not found
///
/// A table reference uses an alias that was not provided in the dependencies map.
#[error("Dependency alias not found: {0}")]
DependencyAliasNotFound(#[source] CreateLogicalCatalogError),
/// A table or function reference uses an alias that was not provided in the dependencies map.
#[error(
"Dependency alias not found: In table '{table_name}': Dependency alias '{alias}' referenced in table but not provided in dependencies"
)]
DependencyAliasNotFound {
/// The table being processed when the error occurred
table_name: TableName,
/// The dependency alias that was not found
alias: String,
},

/// Non-incremental SQL operation in table query
///
Expand Down
Loading