Skip to content

Commit

Permalink
fix ingestion
Browse files Browse the repository at this point in the history
Signed-off-by: Yuchen Liang <[email protected]>
  • Loading branch information
yliang412 committed Feb 11, 2025
1 parent 7f9449b commit cd7ef16
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 21 deletions.
2 changes: 1 addition & 1 deletion optd-core/src/cascades/memo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use anyhow::Result;
#[trait_variant::make(Send)]
pub trait Memoize: Send + Sync + 'static {
/// Creates or get an optimization goal for a group with some required physical properties.
async fn create_or_get_goal(
async fn create_or_get_relation_group_goal(
&self,
group_id: RelationalGroupId,
required_physical_props: Vec<PhysicalProperty>,
Expand Down
2 changes: 1 addition & 1 deletion optd-core/src/storage/memo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl SqliteMemo {
}

impl Memoize for SqliteMemo {
async fn create_or_get_goal(
async fn create_or_get_relation_group_goal(
&self,
group_id: RelationalGroupId,
required_physical_props: Vec<PhysicalProperty>,
Expand Down
4 changes: 3 additions & 1 deletion optd-datafusion/sql/test_filter.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ INSERT INTO departments VALUES
(2, 'Marketing');


SELECT * FROM employees WHERE id = 2;
explain SELECT * FROM employees WHERE id = 2;

SELECT * FROM employees WHERE id = 2;
4 changes: 3 additions & 1 deletion optd-datafusion/sql/test_join.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ INSERT INTO departments VALUES
(2, 'Marketing');


SELECT * FROM employees JOIN departments WHERE employees.department_id = departments.id;
explain SELECT * FROM employees INNER JOIN departments ON employees.department_id = departments.id;

SELECT * FROM employees INNER JOIN departments ON employees.department_id = departments.id;
4 changes: 3 additions & 1 deletion optd-datafusion/sql/test_scan.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ INSERT INTO departments VALUES
(2, 'Marketing');


SELECT * FROM employees;
explain SELECT * FROM employees;

SELECT * FROM employees;
17 changes: 11 additions & 6 deletions optd-datafusion/src/converter/into_optd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use anyhow::bail;
use datafusion::{
common::DFSchema,
logical_expr::{utils::conjunction, LogicalPlan as DatafusionLogicalPlan, Operator},
logical_expr::{utils::conjunction, LogicalPlan as DFLogicalPlan, Operator},
prelude::Expr,
};
use optd_core::{
Expand Down Expand Up @@ -89,18 +89,18 @@ impl ConversionContext<'_> {

pub fn conv_df_to_optd_relational(
&mut self,
df_logical_plan: &DatafusionLogicalPlan,
df_logical_plan: &DFLogicalPlan,
) -> anyhow::Result<Arc<LogicalPlan>> {
let operator = match df_logical_plan {
DatafusionLogicalPlan::Filter(df_filter) => LogicalOperator::Filter(Filter {
DFLogicalPlan::Filter(df_filter) => LogicalOperator::Filter(Filter {
child: self.conv_df_to_optd_relational(&df_filter.input)?,
predicate: Self::conv_df_to_optd_scalar(
&df_filter.predicate,
df_filter.input.schema(),
0,
)?,
}),
DatafusionLogicalPlan::Join(join) => {
DFLogicalPlan::Join(join) => {
let mut join_cond = Vec::new();
for (left, right) in &join.on {
let left = Self::conv_df_to_optd_scalar(left, join.left.schema(), 0)?;
Expand All @@ -110,6 +110,11 @@ impl ConversionContext<'_> {
operator: ScalarOperator::Equal(Equal { left, right }),
}));
}
if let Some(filter) = &join.filter {
let filter =
Self::conv_df_to_optd_scalar(filter, df_logical_plan.schema().as_ref(), 0)?;
join_cond.push(filter);
}
if join_cond.is_empty() {
join_cond.push(Arc::new(ScalarPlan {
operator: ScalarOperator::Constant(Constant {
Expand All @@ -125,7 +130,7 @@ impl ConversionContext<'_> {
Self::flatten_scalar_as_conjunction(join_cond, 0),
))
}
DatafusionLogicalPlan::TableScan(table_scan) => {
DFLogicalPlan::TableScan(table_scan) => {
let table_name = table_scan.table_name.to_quoted_string();

let combine_filters = conjunction(table_scan.filters.to_vec());
Expand All @@ -148,7 +153,7 @@ impl ConversionContext<'_> {

scan
}
DatafusionLogicalPlan::Projection(projection) => {
DFLogicalPlan::Projection(projection) => {
let input = self.conv_df_to_optd_relational(projection.input.as_ref())?;
let mut exprs = Vec::new();
for expr in &projection.expr {
Expand Down
2 changes: 1 addition & 1 deletion optd-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub async fn run_queries(queries: String) -> Result<()> {
// Run our execution engine on the physical plan
let df_physical_plan = sql.clone().create_physical_plan().await?;
let plan = df_physical_plan.clone();
println!("{:#?}", df_physical_plan.clone());
// println!("{:#?}", df_physical_plan.clone());
// let df_physical_plan = df_physical_plan.children()[0].clone();
let mut print_results: Vec<RecordBatch> = vec![];
let now = SystemTime::now();
Expand Down
50 changes: 41 additions & 9 deletions optd-datafusion/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ use anyhow::Ok;
use async_trait::async_trait;
use datafusion::{
execution::{context::QueryPlanner, SessionState},
logical_expr::LogicalPlan as DatafusionLogicalPlan,
physical_plan::ExecutionPlan,
logical_expr::{
Explain, LogicalPlan as DFLogicalPlan, PlanType as DFPlanType, ToStringifiedPlan,
},
physical_plan::{displayable, explain::ExplainExec, ExecutionPlan},
physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner},
};
use optd_core::{
Expand Down Expand Up @@ -115,30 +117,60 @@ impl OptdQueryPlanner {
/// datafusion.
async fn create_physical_plan_inner(
&self,
logical_plan: &DatafusionLogicalPlan,
logical_plan: &DFLogicalPlan,
session_state: &SessionState,
) -> anyhow::Result<Arc<dyn ExecutionPlan>> {
// Fallback to the datafusion planner for DML/DDL operations. optd cannot handle this.
if let DatafusionLogicalPlan::Dml(_)
| DatafusionLogicalPlan::Ddl(_)
| DatafusionLogicalPlan::EmptyRelation(_) = logical_plan
if let DFLogicalPlan::Dml(_) | DFLogicalPlan::Ddl(_) | DFLogicalPlan::EmptyRelation(_) =
logical_plan
{
let planner = DefaultPhysicalPlanner::default();
return Ok(planner
.create_physical_plan(logical_plan, session_state)
.await?);
}

let (logical_plan, _verbose, mut explains) = match logical_plan {
DFLogicalPlan::Explain(Explain { plan, verbose, .. }) => {
(plan.as_ref(), *verbose, Some(Vec::new()))
}
_ => (logical_plan, false, None),
};

if let Some(explains) = &mut explains {
explains.push(
logical_plan.to_stringified(DFPlanType::OptimizedLogicalPlan {
optimizer_name: "datafusion".to_string(),
}),
);
}

let mut converter = ConversionContext::new(session_state);
// convert the logical plan to optd
let logical_plan = converter.conv_df_to_optd_relational(logical_plan)?;
// run the optd optimizer
let optd_optimized_physical_plan = self.optimizer.mock_optimize(&logical_plan).await?;
// convert the physical plan to optd
converter
let physical_plan = converter
.conv_optd_to_df_relational(&optd_optimized_physical_plan)
.await
.map_err(|e| anyhow::anyhow!(e))
.map_err(|e| anyhow::anyhow!(e))?;

if let Some(explains) = &mut explains {
explains.push(
displayable(&*physical_plan).to_stringified(false, DFPlanType::FinalPhysicalPlan),
);
}

if let Some(explains) = explains {
Ok(Arc::new(ExplainExec::new(
DFLogicalPlan::explain_schema(),
explains,
true,
)))
} else {
Ok(physical_plan)
}
}
}

Expand Down Expand Up @@ -170,7 +202,7 @@ impl QueryPlanner for OptdQueryPlanner {
/// Also see [`OptdQueryPlanner::create_physical_plan`]
async fn create_physical_plan(
&self,
datafusion_logical_plan: &DatafusionLogicalPlan,
datafusion_logical_plan: &DFLogicalPlan,
session_state: &SessionState,
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
self.create_physical_plan_inner(datafusion_logical_plan, session_state)
Expand Down

0 comments on commit cd7ef16

Please sign in to comment.