Skip to content

Commit 11841e0

Browse files
committed
[query] refactor: add DatabendQueryContext::build_table_from_source_plan()
1 parent 8a2440b commit 11841e0

File tree

6 files changed

+34
-40
lines changed

6 files changed

+34
-40
lines changed

common/meta/types/src/table_info.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,13 @@ impl Display for TableIdent {
5555
pub struct TableInfo {
5656
pub ident: TableIdent,
5757

58+
/// For a table it is `db_name.table_name`.
59+
/// For a table function, it is `table_name(args)`
5860
pub desc: String,
61+
62+
/// `name` is meant to be used with table-function.
63+
/// Table-function is identified by `name`.
64+
/// A table in the contrast, can only be identified by table-id.
5965
pub name: String,
6066

6167
/// The essential information about a table definition.

query/src/interpreters/plan_scheduler.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ use common_tracing::tracing;
4848
use crate::api::BroadcastAction;
4949
use crate::api::FlightAction;
5050
use crate::api::ShuffleAction;
51-
use crate::catalogs::Catalog;
5251
use crate::catalogs::TablePtr;
5352
use crate::catalogs::ToReadDataSourcePlan;
5453
use crate::sessions::DatabendQueryContext;
@@ -784,14 +783,7 @@ impl PlanScheduler {
784783
}
785784

786785
fn visit_data_source(&mut self, plan: &ReadDataSourcePlan, _: &mut Tasks) -> Result<()> {
787-
let table = if plan.tbl_args.is_none() {
788-
let catalog = self.query_context.get_catalog();
789-
catalog.build_table(&plan.table_info)?
790-
} else {
791-
self.query_context
792-
.get_table_function(&plan.table_info.name, plan.tbl_args.clone())?
793-
.as_table()
794-
};
786+
let table = self.query_context.build_table_from_source_plan(plan)?;
795787

796788
match table.is_local() {
797789
true => self.visit_local_data_source(plan),

query/src/optimizers/optimizer_scatters.rs

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ use common_planners::SortPlan;
3232
use common_planners::StageKind;
3333
use common_planners::StagePlan;
3434

35-
use crate::catalogs::Catalog;
3635
use crate::optimizers::Optimizer;
3736
use crate::sessions::DatabendQueryContext;
3837
use crate::sessions::DatabendQueryContextRef;
@@ -281,17 +280,9 @@ impl PlanRewriter for ScattersOptimizerImpl {
281280
}
282281

283282
fn rewrite_read_data_source(&mut self, plan: &ReadDataSourcePlan) -> Result<PlanNode> {
284-
let context = self.ctx.clone();
285-
let select_table = if plan.tbl_args.is_none() {
286-
let catalog = context.get_catalog();
287-
catalog.build_table(&plan.table_info)?
288-
} else {
289-
context
290-
.get_table_function(&plan.table_info.name, plan.tbl_args.clone())?
291-
.as_table()
292-
};
283+
let t = self.ctx.build_table_from_source_plan(plan)?;
293284

294-
match select_table.is_local() {
285+
match t.is_local() {
295286
false => self.running_mode = RunningMode::Cluster,
296287
true => self.running_mode = RunningMode::Standalone,
297288
}

query/src/pipelines/transforms/transform_source.rs

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use common_streams::ProgressStream;
2323
use common_streams::SendableDataBlockStream;
2424
use common_tracing::tracing;
2525

26-
use crate::catalogs::Catalog;
2726
use crate::pipelines::processors::EmptyProcessor;
2827
use crate::pipelines::processors::Processor;
2928
use crate::sessions::DatabendQueryContextRef;
@@ -42,16 +41,7 @@ impl SourceTransform {
4241
}
4342

4443
async fn read_table(&self) -> Result<SendableDataBlockStream> {
45-
let table = if self.source_plan.tbl_args.is_none() {
46-
let catalog = self.ctx.get_catalog();
47-
catalog.build_table(&self.source_plan.table_info)?
48-
} else {
49-
let func_meta = self.ctx.get_table_function(
50-
&self.source_plan.table_info.name,
51-
self.source_plan.tbl_args.clone(),
52-
)?;
53-
func_meta.as_table()
54-
};
44+
let table = self.ctx.build_table_from_source_plan(&self.source_plan)?;
5545

5646
// TODO(xp): get_single_node_table_io_context() or
5747
// get_cluster_table_io_context()?

query/src/sessions/context.rs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,17 @@ use common_meta_types::NodeInfo;
3333
use common_planners::Part;
3434
use common_planners::Partitions;
3535
use common_planners::PlanNode;
36+
use common_planners::ReadDataSourcePlan;
3637
use common_planners::Statistics;
3738
use common_streams::AbortStream;
3839
use common_streams::SendableDataBlockStream;
3940

4041
use crate::catalogs::impls::DatabaseCatalog;
4142
use crate::catalogs::Catalog;
4243
use crate::catalogs::Table;
43-
use crate::catalogs::TableFunction;
4444
use crate::clusters::ClusterRef;
4545
use crate::configs::Config;
4646
use crate::datasources::common::ContextDalBuilder;
47-
use crate::datasources::table_func_engine::TableArgs;
4847
use crate::sessions::context_shared::DatabendQueryContextShared;
4948
use crate::sessions::SessionManagerRef;
5049
use crate::sessions::Settings;
@@ -157,13 +156,25 @@ impl DatabendQueryContext {
157156
self.shared.get_table(database, table)
158157
}
159158

160-
pub fn get_table_function(
159+
/// Build a table instance the plan wants to operate on.
160+
///
161+
/// A plan just contains raw information about a table or table function.
162+
/// This method builds a `dyn Table`, which provides table specific io methods the plan needs.
163+
pub fn build_table_from_source_plan(
161164
&self,
162-
function_name: &str,
163-
tbl_args: TableArgs,
164-
) -> Result<Arc<dyn TableFunction>> {
165-
self.get_catalog()
166-
.get_table_function(function_name, tbl_args)
165+
plan: &ReadDataSourcePlan,
166+
) -> Result<Arc<dyn Table>> {
167+
let catalog = self.get_catalog();
168+
169+
let t = if plan.tbl_args.is_none() {
170+
catalog.build_table(&plan.table_info)?
171+
} else {
172+
catalog
173+
.get_table_function(&plan.table_info.name, plan.tbl_args.clone())?
174+
.as_table()
175+
};
176+
177+
Ok(t)
167178
}
168179

169180
pub fn get_id(&self) -> String {

query/src/sql/plan_parser.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ use sqlparser::ast::Statement;
6565
use sqlparser::ast::TableFactor;
6666
use sqlparser::ast::UnaryOperator;
6767

68+
use crate::catalogs::Catalog;
6869
use crate::catalogs::ToReadDataSourcePlan;
6970
use crate::functions::ContextFunction;
7071
use crate::sessions::DatabendQueryContextRef;
@@ -788,7 +789,10 @@ impl PlanParser {
788789
},
789790
)?;
790791

791-
let table_func = self.ctx.get_table_function(&table_name, Some(table_args))?;
792+
let table_func = self
793+
.ctx
794+
.get_catalog()
795+
.get_table_function(&table_name, Some(table_args))?;
792796
meta_id = table_func.get_id();
793797
meta_version = table_func.get_table_info().ident.version;
794798
table = table_func.as_table();

0 commit comments

Comments
 (0)