Skip to content

minor(sqlparser): encapsulate PlanerContext, reduce some clones #5814

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions datafusion/sql/src/expr/identifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
Err(_) => {
// check the outer_query_schema and try to find a match
let outer_query_schema_opt =
planner_context.outer_query_schema.as_ref();
if let Some(outer) = outer_query_schema_opt {
if let Some(outer) = planner_context.outer_query_schema() {
match outer.field_with_unqualified_name(normalize_ident.as_str())
{
Ok(field) => {
Expand Down Expand Up @@ -159,10 +157,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
"Unsupported compound identifier: {ids:?}"
)))
} else {
let outer_query_schema_opt =
planner_context.outer_query_schema.as_ref();
// check the outer_query_schema and try to find a match
if let Some(outer) = outer_query_schema_opt {
if let Some(outer) = planner_context.outer_query_schema() {
let search_result = search_dfschema(&ids, outer);
match search_result {
// found matching field with spare identifier(s) for nested field(s) in structure
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
) -> Result<Expr> {
match sql {
SQLExpr::Value(value) => {
self.parse_value(value, &planner_context.prepare_param_data_types)
self.parse_value(value, planner_context.prepare_param_data_types())
}
SQLExpr::Extract { field, expr } => Ok(Expr::ScalarFunction {
fun: BuiltinScalarFunction::DatePart,
Expand Down
18 changes: 9 additions & 9 deletions datafusion/sql/src/expr/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let old_outer_query_schema = planner_context.outer_query_schema.clone();
planner_context.outer_query_schema = Some(input_schema.clone());
let old_outer_query_schema =
planner_context.set_outer_query_schema(Some(input_schema.clone()));
let sub_plan = self.query_to_plan(subquery, planner_context)?;
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.outer_query_schema = old_outer_query_schema;
planner_context.set_outer_query_schema(old_outer_query_schema);
Ok(Expr::Exists {
subquery: Subquery {
subquery: Arc::new(sub_plan),
Expand All @@ -52,11 +52,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let old_outer_query_schema = planner_context.outer_query_schema.clone();
planner_context.outer_query_schema = Some(input_schema.clone());
let old_outer_query_schema =
planner_context.set_outer_query_schema(Some(input_schema.clone()));
let sub_plan = self.query_to_plan(subquery, planner_context)?;
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.outer_query_schema = old_outer_query_schema;
planner_context.set_outer_query_schema(old_outer_query_schema);
let expr = Box::new(self.sql_to_expr(expr, input_schema, planner_context)?);
Ok(Expr::InSubquery {
expr,
Expand All @@ -74,11 +74,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let old_outer_query_schema = planner_context.outer_query_schema.clone();
planner_context.outer_query_schema = Some(input_schema.clone());
let old_outer_query_schema =
planner_context.set_outer_query_schema(Some(input_schema.clone()));
let sub_plan = self.query_to_plan(subquery, planner_context)?;
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.outer_query_schema = old_outer_query_schema;
planner_context.set_outer_query_schema(old_outer_query_schema);
Ok(Expr::ScalarSubquery(Subquery {
subquery: Arc::new(sub_plan),
outer_ref_columns,
Expand Down
65 changes: 52 additions & 13 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,21 @@ impl Default for ParserOptions {
}
}

#[derive(Debug, Clone)]
/// Struct to store the states used by the Planner. The Planner will leverage the states to resolve
/// CTEs, Views, subqueries and PREPARE statements. The states include
/// Common Table Expression (CTE) provided with WITH clause and
/// Parameter Data Types provided with PREPARE statement and the query schema of the
/// outer query plan
#[derive(Debug, Clone)]
pub struct PlannerContext {
/// Data type provided with prepare statement
pub prepare_param_data_types: Vec<DataType>,
/// Map of CTE name to logical plan of the WITH clause
pub ctes: HashMap<String, LogicalPlan>,
/// Data types for numbered parameters ($1, $2, etc), if supplied
/// in `PREPARE` statement
prepare_param_data_types: Vec<DataType>,
/// Map of CTE name to logical plan of the WITH clause.
/// Use Arc<LogicalPlan> to allow cheap cloning
ctes: HashMap<String, Arc<LogicalPlan>>,
/// The query schema of the outer query plan, used to resolve the columns in subquery
pub outer_query_schema: Option<DFSchema>,
outer_query_schema: Option<DFSchema>,
}

impl Default for PlannerContext {
Expand All @@ -100,15 +102,52 @@ impl PlannerContext {
}
}

/// Create a new PlannerContext with provided prepare_param_data_types
pub fn new_with_prepare_param_data_types(
/// Update the PlannerContext with provided prepare_param_data_types
pub fn with_prepare_param_data_types(
mut self,
prepare_param_data_types: Vec<DataType>,
) -> Self {
Self {
prepare_param_data_types,
ctes: HashMap::new(),
outer_query_schema: None,
}
self.prepare_param_data_types = prepare_param_data_types;
self
}

// return a reference to the outer queries schema
pub fn outer_query_schema(&self) -> Option<&DFSchema> {
self.outer_query_schema.as_ref()
}

/// sets the outer query schema, returning the existing one, if
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having accessors also is a good place to put documentation about what they do

/// any
pub fn set_outer_query_schema(
&mut self,
mut schema: Option<DFSchema>,
) -> Option<DFSchema> {
std::mem::swap(&mut self.outer_query_schema, &mut schema);
schema
}

/// Return the types of parameters (`$1`, `$2`, etc) if known
pub fn prepare_param_data_types(&self) -> &[DataType] {
&self.prepare_param_data_types
}

/// returns true if there is a Common Table Expression (CTE) /
/// Subquery for the specified name
pub fn contains_cte(&self, cte_name: &str) -> bool {
self.ctes.contains_key(cte_name)
}

/// Inserts a LogicalPlan for the Common Table Expression (CTE) /
/// Subquery for the specified name
pub fn insert_cte(&mut self, cte_name: impl Into<String>, plan: LogicalPlan) {
let cte_name = cte_name.into();
self.ctes.insert(cte_name, Arc::new(plan));
}

/// Return a plan for the Common Table Expression (CTE) / Subquery for the
/// specified name
pub fn get_cte(&self, cte_name: &str) -> Option<&LogicalPlan> {
self.ctes.get(cte_name).map(|cte| cte.as_ref())
}
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/sql/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
for cte in with.cte_tables {
// A `WITH` block can't use the same name more than once
let cte_name = normalize_ident(cte.alias.name.clone());
if planner_context.ctes.contains_key(&cte_name) {
if planner_context.contains_cte(&cte_name) {
return Err(DataFusionError::SQL(ParserError(format!(
"WITH query name {cte_name:?} specified more than once"
))));
Expand All @@ -68,7 +68,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// projection (e.g. "WITH table(t1, t2) AS SELECT 1, 2").
let logical_plan = self.apply_table_alias(logical_plan, cte.alias)?;

planner_context.ctes.insert(cte_name, logical_plan);
planner_context.insert_cte(cte_name, logical_plan);
}
}
let plan = self.set_expr_to_plan(*set_expr, planner_context)?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// normalize name and alias
let table_ref = self.object_name_to_table_reference(name)?;
let table_name = table_ref.to_string();
let cte = planner_context.ctes.get(&table_name);
let cte = planner_context.get_cte(&table_name);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the use of ctes -- and 4 lines below it is immediately clone()d

(
match (
cte,
Expand Down
12 changes: 5 additions & 7 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
match selection {
Some(predicate_expr) => {
let fallback_schemas = plan.fallback_normalize_schemas();
let outer_query_schema = planner_context.outer_query_schema.clone();
let outer_query_schema_vec =
if let Some(outer) = outer_query_schema.as_ref() {
vec![outer]
} else {
vec![]
};
let outer_query_schema = planner_context.outer_query_schema().cloned();
let outer_query_schema_vec = outer_query_schema
.as_ref()
.map(|schema| vec![schema])
.unwrap_or_else(Vec::new);

let filter_expr =
self.sql_to_expr(predicate_expr, plan.schema(), planner_context)?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/set_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
match set_expr {
SetExpr::Select(s) => self.select_to_plan(*s, planner_context),
SetExpr::Values(v) => {
self.sql_values_to_plan(v, &planner_context.prepare_param_data_types)
self.sql_values_to_plan(v, planner_context.prepare_param_data_types())
}
SetExpr::SetOperation {
op,
Expand Down
6 changes: 3 additions & 3 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.collect::<Result<_>>()?;

// Create planner context with parameters
let mut planner_context =
PlannerContext::new_with_prepare_param_data_types(data_types.clone());
let mut planner_context = PlannerContext::new()
.with_prepare_param_data_types(data_types.clone());

// Build logical plan for inner statement of the prepare statement
let plan = self.sql_statement_to_plan_with_context(
Expand Down Expand Up @@ -898,7 +898,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

// Projection
let mut planner_context =
PlannerContext::new_with_prepare_param_data_types(prepare_param_data_types);
PlannerContext::new().with_prepare_param_data_types(prepare_param_data_types);
let source = self.query_to_plan(*source, &mut planner_context)?;
if fields.len() != source.schema().fields().len() {
Err(DataFusionError::Plan(
Expand Down