Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion crates/core/common/src/context/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use datafusion::{
self,
arrow::array::RecordBatch,
catalog::{AsyncCatalogProvider as TableAsyncCatalogProvider, MemorySchemaProvider},
common::tree_node::Transformed,
datasource::{DefaultTableSource, TableType},
error::DataFusionError,
execution::{
RecordBatchStream, SendableRecordBatchStream, TaskContext,
Expand All @@ -20,7 +22,7 @@ use datafusion::{
memory_pool::{MemoryPool, human_readable_size},
object_store::ObjectStoreRegistry,
},
logical_expr::LogicalPlan,
logical_expr::{LogicalPlan, TableScan},
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{ExecutionPlan, displayable, execute_stream, stream::RecordBatchStreamAdapter},
sql::parser,
Expand Down Expand Up @@ -72,6 +74,15 @@ pub struct ExecContext {
}

impl ExecContext {
/// Attaches a detached logical plan to this query context by replacing
/// `PlanTable` table sources in `TableScan` nodes with actual
/// `QueryableSnapshot` providers from the catalog.
#[tracing::instrument(skip_all, err)]
pub(crate) fn attach(&self, plan: LogicalPlan) -> Result<LogicalPlan, DataFusionError> {
plan.transform_with_subqueries(|node| attach_table_node(node, self))
.map(|t| t.data)
}

/// Returns the physical catalog snapshot backing this query context.
///
/// Exposes segment-level data for streaming query consumers that need to
Expand Down Expand Up @@ -710,6 +721,30 @@ pub enum RegisterTableError {
RegisterUdf(#[source] DataFusionError),
}

/// Replaces `PlanTable` table sources with actual `QueryableSnapshot` providers
/// in a single plan node.
fn attach_table_node(
mut node: LogicalPlan,
ctx: &ExecContext,
) -> Result<Transformed<LogicalPlan>, DataFusionError> {
match &mut node {
LogicalPlan::TableScan(TableScan {
table_name, source, ..
}) if source.table_type() == TableType::Base && source.get_logical_plan().is_none() => {
let table_ref: TableReference<String> = table_name
.clone()
.try_into()
.map_err(|err| DataFusionError::External(Box::new(err)))?;
let provider = ctx
.get_table(&table_ref)
.map_err(|err| DataFusionError::External(Box::new(err)))?;
*source = Arc::new(DefaultTableSource::new(provider));
Ok(Transformed::yes(node))
}
_ => Ok(Transformed::no(node)),
}
}

/// `logical_optimize` controls whether logical optimizations should be applied to `plan`.
#[tracing::instrument(skip_all, err)]
async fn execute_plan(
Expand Down
54 changes: 7 additions & 47 deletions crates/core/common/src/detached_logical_plan.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,9 @@
use std::sync::Arc;

use datafusion::{
common::{
DFSchemaRef,
tree_node::{Transformed, TreeNode, TreeNodeRecursion},
},
datasource::{DefaultTableSource, TableType},
error::DataFusionError,
logical_expr::{LogicalPlan, TableScan},
};
use datafusion::{common::DFSchemaRef, error::DataFusionError, logical_expr::LogicalPlan};

use crate::{
context::exec::ExecContext,
incrementalizer::NonIncrementalQueryError,
plan_visitors::{is_incremental, propagate_block_num},
sql::TableReference,
};

/// A plan that has `PlanTable` for its `TableProvider`s. It cannot be executed before being
Expand All @@ -28,35 +17,6 @@ impl DetachedLogicalPlan {
Self(plan)
}

/// Attaches this plan to a query context by replacing `PlanTable` providers
/// with actual `QueryableSnapshot` providers from the catalog.
#[tracing::instrument(skip_all, err)]
pub fn attach_to(self, ctx: &ExecContext) -> Result<LogicalPlan, AttachPlanError> {
Ok(self
.0
.transform_with_subqueries(|mut node| match &mut node {
// Insert the clauses in non-view table scans
LogicalPlan::TableScan(TableScan {
table_name, source, ..
}) if source.table_type() == TableType::Base
&& source.get_logical_plan().is_none() =>
{
let table_ref: TableReference<String> = table_name
.clone()
.try_into()
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let provider = ctx
.get_table(&table_ref)
.map_err(|e| DataFusionError::External(e.into()))?;
*source = Arc::new(DefaultTableSource::new(provider));
Ok(Transformed::yes(node))
}
_ => Ok(Transformed::no(node)),
})
.map_err(AttachPlanError)?
.data)
}

/// Validates that the plan can be processed incrementally.
pub fn is_incremental(&self) -> Result<(), NonIncrementalQueryError> {
is_incremental(&self.0)
Expand All @@ -72,12 +32,12 @@ impl DetachedLogicalPlan {
Ok(Self(propagate_block_num(self.0)?))
}

/// Applies a visitor closure to each node in the logical plan tree.
pub fn apply<F>(&self, f: F) -> Result<TreeNodeRecursion, DataFusionError>
where
F: FnMut(&LogicalPlan) -> Result<TreeNodeRecursion, DataFusionError>,
{
self.0.apply(f)
/// Attaches this plan to a query context, replacing `PlanTable` table
/// sources and `PlanJsUdf`-backed scalar functions with execution-ready
/// providers.
#[tracing::instrument(skip_all, err)]
pub fn attach_to(self, ctx: &ExecContext) -> Result<LogicalPlan, AttachPlanError> {
ctx.attach(self.0).map_err(AttachPlanError)
}
}

Expand Down