Skip to content

Commit

Permalink
fix clippy
Browse files Browse the repository at this point in the history
Signed-off-by: Yuchen Liang <[email protected]>
  • Loading branch information
yliang412 committed Feb 11, 2025
1 parent a806f07 commit 0d94f92
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 94 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ resolver = "2"
anyhow = "1.0"
trait-variant = "0.1.2"
async-recursion = "1.1.1"
tokio = { version = "1.43.0", features = ["full"] }
tokio = { version = "1.43.0", features = ["full"] }
# Pin more recent versions for `-Zminimal-versions`.
proc-macro2 = "1.0.60" # For a missing feature (https://github.com/rust-lang/rust/issues/113152).
3 changes: 1 addition & 2 deletions optd-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ version = "0.1.0"
edition = "2021"

[dependencies]
# Pin more recent versions for `-Zminimal-versions`.
proc-macro2 = "1.0.60" # For a missing feature (https://github.com/rust-lang/rust/issues/113152).
tokio.workspace = true
anyhow.workspace = true
async-recursion.workspace = true
trait-variant.workspace = true
proc-macro2.workspace = true
sqlx = { version = "0.8", features = [ "sqlite", "runtime-tokio", "migrate" ] }
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1", features = ["raw_value"] }
Expand Down
6 changes: 3 additions & 3 deletions optd-core/src/cascades/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,16 +142,16 @@ async fn mock_optimize_relation_expr(
LogicalExpression::Scan(scan) => {
let physical_expr = PhysicalExpression::TableScan(TableScan {
table_name: scan.table_name.clone(),
predicate: scan.predicate.clone(),
predicate: scan.predicate,
});
memo.add_physical_expr_to_group(&physical_expr, group_id)
.await?;
mock_optimize_scalar_group(memo, scan.predicate).await?;
}
LogicalExpression::Filter(filter) => {
let physical_expr = PhysicalExpression::Filter(PhysicalFilter {
child: filter.child.clone(),
predicate: filter.predicate.clone(),
child: filter.child,
predicate: filter.predicate,
});
memo.add_physical_expr_to_group(&physical_expr, group_id)
.await?;
Expand Down
9 changes: 5 additions & 4 deletions optd-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ version = "0.1.0"
edition = "2021"

[dependencies]
optd-core = { path = "../optd-core" }
anyhow.workspace = true
tokio.workspace = true
trait-variant.workspace = true
async-recursion.workspace = true
proc-macro2.workspace = true
async-trait = "0.1.85"
datafusion = "45.0.0"
futures = "0.3.31"
itertools = "0.14.0"
tokio.workspace = true
trait-variant.workspace = true
optd-core = { path = "../optd-core" }
async-recursion.workspace = true

34 changes: 13 additions & 21 deletions optd-datafusion/src/converter/from_optd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@ impl ConversionContext<'_> {
}
PhysicalOperator::Filter(filter) => {
let input_exec = self.conv_optd_to_df_relational(&filter.child).await?;
let physical_expr = self
.conv_optd_to_df_scalar(&filter.predicate, &input_exec.schema())
.clone();
let physical_expr =
Self::conv_optd_to_df_scalar(&filter.predicate, &input_exec.schema()).clone();
Ok(
Arc::new(datafusion::physical_plan::filter::FilterExec::try_new(
physical_expr,
Expand All @@ -59,12 +58,9 @@ impl ConversionContext<'_> {
let input_exec = self.conv_optd_to_df_relational(&project.child).await?;
let physical_exprs = project
.fields
.to_vec()
.into_iter()
.map(|field| {
self.conv_optd_to_df_scalar(&field, &input_exec.schema())
.clone()
})
.iter()
.cloned()
.map(|field| Self::conv_optd_to_df_scalar(&field, &input_exec.schema()))
.enumerate()
.map(|(idx, expr)| (expr, format!("col{}", idx)))
.collect::<Vec<(Arc<dyn PhysicalExpr>, String)>>();
Expand All @@ -89,7 +85,7 @@ impl ConversionContext<'_> {
};

let physical_expr =
self.conv_optd_to_df_scalar(&join.condition, &Arc::new(filter_schema.clone()));
Self::conv_optd_to_df_scalar(&join.condition, &Arc::new(filter_schema.clone()));

let join_type = JoinType::from_str(join.join_type.as_str().unwrap())?;

Expand Down Expand Up @@ -126,11 +122,7 @@ impl ConversionContext<'_> {
}
}

pub fn conv_optd_to_df_scalar(
&self,
pred: &ScalarPlan,
context: &SchemaRef,
) -> Arc<dyn PhysicalExpr> {
pub fn conv_optd_to_df_scalar(pred: &ScalarPlan, context: &SchemaRef) -> Arc<dyn PhysicalExpr> {
match &pred.operator {
ScalarOperator::ColumnRef(column_ref) => {
let idx = column_ref.column_index.as_i64().unwrap() as usize;
Expand All @@ -149,20 +141,20 @@ impl ConversionContext<'_> {
Arc::new(Literal::new(value))
}
ScalarOperator::And(and) => {
let left = self.conv_optd_to_df_scalar(&and.left, context);
let right = self.conv_optd_to_df_scalar(&and.right, context);
let left = Self::conv_optd_to_df_scalar(&and.left, context);
let right = Self::conv_optd_to_df_scalar(&and.right, context);
let op = Operator::And;
Arc::new(BinaryExpr::new(left, op, right)) as Arc<dyn PhysicalExpr>
}
ScalarOperator::Add(add) => {
let left = self.conv_optd_to_df_scalar(&add.left, context);
let right = self.conv_optd_to_df_scalar(&add.right, context);
let left = Self::conv_optd_to_df_scalar(&add.left, context);
let right = Self::conv_optd_to_df_scalar(&add.right, context);
let op = Operator::Plus;
Arc::new(BinaryExpr::new(left, op, right)) as Arc<dyn PhysicalExpr>
}
ScalarOperator::Equal(equal) => {
let left = self.conv_optd_to_df_scalar(&equal.left, context);
let right = self.conv_optd_to_df_scalar(&equal.right, context);
let left = Self::conv_optd_to_df_scalar(&equal.left, context);
let right = Self::conv_optd_to_df_scalar(&equal.right, context);
let op = Operator::Eq;
Arc::new(BinaryExpr::new(left, op, right)) as Arc<dyn PhysicalExpr>
}
Expand Down
56 changes: 27 additions & 29 deletions optd-datafusion/src/converter/into_optd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use super::ConversionContext;
impl ConversionContext<'_> {
/// The col_offset is an offset added to the column index for all column references. It is useful for joins.
pub fn conv_df_to_optd_scalar(
&self,
df_expr: &Expr,
context: &DFSchema,
col_offset: usize,
Expand All @@ -39,20 +38,20 @@ impl ConversionContext<'_> {
Expr::Literal(scalar_value) => match scalar_value {
datafusion::scalar::ScalarValue::Boolean(val) => {
ScalarOperator::Constant(Constant {
value: OptdValue::Bool(val.clone().unwrap()),
value: OptdValue::Bool((*val).unwrap()),
})
}
datafusion::scalar::ScalarValue::Int64(val) => {
ScalarOperator::Constant(Constant::new(OptdValue::Int64(val.clone().unwrap())))
ScalarOperator::Constant(Constant::new(OptdValue::Int64((*val).unwrap())))
}
datafusion::scalar::ScalarValue::Utf8(val) => {
ScalarOperator::Constant(Constant::new(OptdValue::String(val.clone().unwrap())))
}
_ => panic!("OptD Only supports a limited number of literals"),
_ => panic!("optd Only supports a limited number of literals"),
},
Expr::BinaryExpr(binary_expr) => {
let left = self.conv_df_to_optd_scalar(&binary_expr.left, context, col_offset)?;
let right = self.conv_df_to_optd_scalar(&binary_expr.right, context, col_offset)?;
let left = Self::conv_df_to_optd_scalar(&binary_expr.left, context, col_offset)?;
let right = Self::conv_df_to_optd_scalar(&binary_expr.right, context, col_offset)?;
match binary_expr.op {
Operator::Plus => ScalarOperator::Add(Add { left, right }),
// Operator::And => ScalarOperator::And(Add { left, right }),
Expand All @@ -61,7 +60,7 @@ impl ConversionContext<'_> {
}
}
Expr::Cast(cast) => {
return self.conv_df_to_optd_scalar(&cast.expr, context, col_offset);
return Self::conv_df_to_optd_scalar(&cast.expr, context, col_offset);
}
_ => panic!(
"optd does not support this scalar expression: {:#?}",
Expand Down Expand Up @@ -93,23 +92,20 @@ impl ConversionContext<'_> {
df_logical_plan: &DatafusionLogicalPlan,
) -> anyhow::Result<Arc<LogicalPlan>> {
let operator = match df_logical_plan {
DatafusionLogicalPlan::Filter(df_filter) => {
let filter = LogicalOperator::Filter(Filter {
child: self.conv_df_to_optd_relational(&df_filter.input)?,
predicate: self.conv_df_to_optd_scalar(
&df_filter.predicate,
df_filter.input.schema(),
0,
)?,
});
filter
}
DatafusionLogicalPlan::Filter(df_filter) => LogicalOperator::Filter(Filter {
child: self.conv_df_to_optd_relational(&df_filter.input)?,
predicate: Self::conv_df_to_optd_scalar(
&df_filter.predicate,
df_filter.input.schema(),
0,
)?,
}),
DatafusionLogicalPlan::Join(join) => {
let mut join_cond = Vec::new();
for (left, right) in &join.on {
let left = self.conv_df_to_optd_scalar(left, join.left.schema(), 0)?;
let left = Self::conv_df_to_optd_scalar(left, join.left.schema(), 0)?;
let offset = join.left.schema().fields().len();
let right = self.conv_df_to_optd_scalar(right, join.right.schema(), offset)?;
let right = Self::conv_df_to_optd_scalar(right, join.right.schema(), offset)?;
join_cond.push(Arc::new(ScalarPlan {
operator: ScalarOperator::Equal(Equal { left, right }),
}));
Expand All @@ -122,13 +118,12 @@ impl ConversionContext<'_> {
}));
}

let join = LogicalOperator::Join(Join::new(
LogicalOperator::Join(Join::new(
&join.join_type.to_string(),
self.conv_df_to_optd_relational(&join.left)?,
self.conv_df_to_optd_relational(&join.right)?,
Self::flatten_scalar_as_conjunction(join_cond, 0),
));
join
))
}
DatafusionLogicalPlan::TableScan(table_scan) => {
let table_name = table_scan.table_name.to_quoted_string();
Expand All @@ -139,7 +134,7 @@ impl ConversionContext<'_> {
match combine_filters {
Some(df_expr) => {
let schema = DFSchema::try_from(table_scan.source.schema()).unwrap();
self.conv_df_to_optd_scalar(&df_expr, &schema, 0)?
Self::conv_df_to_optd_scalar(&df_expr, &schema, 0)?
}
None => Arc::new(ScalarPlan {
operator: ScalarOperator::Constant(Constant {
Expand All @@ -157,14 +152,17 @@ impl ConversionContext<'_> {
let input = self.conv_df_to_optd_relational(projection.input.as_ref())?;
let mut exprs = Vec::new();
for expr in &projection.expr {
exprs.push(self.conv_df_to_optd_scalar(expr, projection.input.schema(), 0)?);
exprs.push(Self::conv_df_to_optd_scalar(
expr,
projection.input.schema(),
0,
)?);
}
let project = LogicalOperator::Project(Project {

LogicalOperator::Project(Project {
child: input,
fields: exprs,
});

project
})
}
_ => bail!("optd does not support this operator"),
};
Expand Down
2 changes: 1 addition & 1 deletion optd-datafusion/src/converter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ pub mod from_optd;
pub mod into_optd;

/// A context for converting between optd and datafusion.
/// It stores a map from table names to table sources and a session state.
/// The map is used to lookup table sources when converting TableScan operators from optd to datafusion.
pub struct ConversionContext<'a> {
/// Maps table names to table sources.
pub tables: HashMap<String, Arc<dyn TableSource>>,
pub session_state: &'a SessionState,
}
Expand Down
39 changes: 6 additions & 33 deletions optd-datafusion/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,33 +50,6 @@ impl OptdOptimizer {
///
/// # Returns
/// * `PhysicalPlan` - The optimized physical plan.
// pub fn mock_optimize(&self, logical_plan: &LogicalPlan) -> Arc<PhysicalPlan> {
// let operator = match &logical_plan.operator {
// LogicalOperator::Scan(scan) => PhysicalOperator::TableScan(TableScan {
// table_name: scan.table_name.clone(),
// predicate: scan.predicate.clone(),
// }),
// LogicalOperator::Filter(filter) => PhysicalOperator::Filter(PhysicalFilter {
// child: self.mock_optimize(&filter.child),
// predicate: filter.predicate.clone(),
// }),
// LogicalOperator::Project(_project) => {
// Arc::new(PhysicalOperator::Project(PhysicalProject {
// child: self.mock_optimize(project.child.clone()),
// fields: project.fields.clone(),
// }))
// todo!()
// }
// LogicalOperator::Join(join) => PhysicalOperator::NestedLoopJoin(NestedLoopJoin {
// join_type: join.join_type.clone(),
// outer: self.mock_optimize(&join.left),
// inner: self.mock_optimize(&join.right),
// condition: join.condition.clone(),
// }),
// };
// Arc::new(PhysicalPlan { operator })
// }

pub async fn mock_optimize(
&self,
logical_plan: &LogicalPlan,
Expand All @@ -94,9 +67,9 @@ impl OptdOptimizer {

/// A struct that implements the `QueryPlanner` trait for the `OptdQueryPlanner`.
/// This trait is used to create a physical plan for a given logical plan.
/// The physical plan is created by converting the logical plan to an OptD logical plan,
/// The physical plan is created by converting the logical plan to an optd logical plan,
/// and then running the optd optimizer on the logical plan and then converting it back.
/// This is the entry point for OptD.
/// This is the entry point for optd.
#[derive(Debug)]
pub struct OptdQueryPlanner {
pub optimizer: Arc<OptdOptimizer>,
Expand Down Expand Up @@ -127,7 +100,7 @@ impl OptdQueryPlanner {
///
/// 1. Check if the logical plan is a DML/DDL operation. If it is, fall back
/// to the datafusion planner.
/// 2. Convert the logical plan to an OptD logical plan.
/// 2. Convert the logical plan to an optd logical plan.
/// 3. Run the optd optimizer on the logical plan.
/// 4. Convert the physical plan to a physical plan that can be executed by
/// datafusion.
Expand Down Expand Up @@ -157,11 +130,11 @@ impl OptdQueryPlanner {
}

let mut converter = ConversionContext::new(session_state);
// convert the logical plan to OptD
// convert the logical plan to optd
let logical_plan = converter.conv_df_to_optd_relational(logical_plan)?;
// run the optd optimizer
let optd_optimized_physical_plan = self.optimizer.mock_optimize(&logical_plan).await?;
// convert the physical plan to OptD
// convert the physical plan to optd
converter
.conv_optd_to_df_relational(&optd_optimized_physical_plan)
.await
Expand All @@ -180,7 +153,7 @@ impl QueryPlanner for OptdQueryPlanner {
///
/// 1. Check if the logical plan is a DML/DDL operation. If it is, fall back
/// to the datafusion planner.
/// 2. Convert the logical plan to an OptD logical plan.
/// 2. Convert the logical plan to an optd logical plan.
/// 3. Run the optd optimizer on the logical plan.
/// 4. Convert the physical plan to a physical plan that can be executed by
/// datafusion.
Expand Down

0 comments on commit 0d94f92

Please sign in to comment.