Skip to content

Commit

Permalink
Change join condition to match datafusion and fix type errors from pr…
Browse files Browse the repository at this point in the history
…ev commit
  • Loading branch information
SarveshOO7 committed Jan 28, 2025
1 parent 0bc6a13 commit cbfbb88
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 49 deletions.
81 changes: 34 additions & 47 deletions infra/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@ use types::operator::logical::{
use types::operator::physical::{
HashJoinOperator, PhysicalFilterOperator, PhysicalOperator, TableScanOperator,
};
use types::operator::ScalarOperator;
use types::plan::logical_plan::{LogicalLink, LogicalPlan as OptDLogicalPlan, ScalarLink};
use types::plan::partial_physical_plan::PhysicalLink;
use types::plan::physical_plan::PhysicalPlan;
use types::operator::Scalar;
use types::plan::logical_plan::{LogicalLink, LogicalPlan as OptDLogicalPlan};
use types::plan::physical_plan::{PhysicalLink, PhysicalPlan};

struct OptdOptimizer {}

Expand All @@ -49,44 +48,27 @@ impl OptdOptimizer {
) -> Arc<PhysicalOperator<PhysicalLink>> {
match &*logical_node {
LogicalOperator::Scan(logical_scan_operator) => {
Arc::new(PhysicalOperator::TableScan(TableScanOperator::<
PhysicalLink,
> {
Arc::new(PhysicalOperator::TableScan(TableScanOperator {
table_name: logical_scan_operator.table_name.clone(),
predicate: None,
}))
}
LogicalOperator::Filter(logical_filter_operator) => {
let LogicalLink::LogicalNode(ref child) = logical_filter_operator.child else {
panic!("The child of filter is not a logical node")
};

let LogicalLink::ScalarNode(ref predicate) = logical_filter_operator.predicate
else {
panic!("The predicate of filter is not a scalar node")
};
let LogicalLink::LogicalNode(ref child) = logical_filter_operator.child;
let predicate = logical_filter_operator.predicate.clone();
Arc::new(PhysicalOperator::Filter(PhysicalFilterOperator::<
PhysicalLink,
> {
child: PhysicalLink::PhysicalNode(Self::conv_logical_to_physical(
child.clone(),
)),
predicate: PhysicalLink::ScalarNode(todo!()),
predicate: predicate,

Check warning on line 65 in infra/src/lib.rs

View workflow job for this annotation

GitHub Actions / clippy

[clippy] infra/src/lib.rs#L65

warning: redundant field names in struct initialization --> infra/src/lib.rs:65:21 | 65 | predicate: predicate, | ^^^^^^^^^^^^^^^^^^^^ help: replace it with: `predicate` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#redundant_field_names = note: `#[warn(clippy::redundant_field_names)]` on by default
Raw output
infra/src/lib.rs:65:21:w:warning: redundant field names in struct initialization
  --> infra/src/lib.rs:65:21
   |
65 |                     predicate: predicate,
   |                     ^^^^^^^^^^^^^^^^^^^^ help: replace it with: `predicate`
   |
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#redundant_field_names
   = note: `#[warn(clippy::redundant_field_names)]` on by default


__END__
}))
}
LogicalOperator::Join(logical_join_operator) => {
let LogicalLink::LogicalNode(ref left_join) = logical_join_operator.left else {
panic!("The left child of join is not a logical node")
};

let LogicalLink::LogicalNode(ref right_join) = logical_join_operator.right else {
panic!("The right child of join is not a logical node")
};

let LogicalLink::ScalarNode(ref condition) = logical_join_operator.condition else {
panic!("The condition child of join is not a Scalar Node")
};

let LogicalLink::LogicalNode(ref left_join) = logical_join_operator.left;
let LogicalLink::LogicalNode(ref right_join) = logical_join_operator.right;
let condition = logical_join_operator.condition.clone();
Arc::new(PhysicalOperator::HashJoin(
HashJoinOperator::<PhysicalLink> {
join_type: (),
Expand All @@ -96,7 +78,7 @@ impl OptdOptimizer {
right: PhysicalLink::PhysicalNode(Self::conv_logical_to_physical(
right_join.clone(),
)),
condition: PhysicalLink::ScalarNode(todo!()),
condition: condition,

Check warning on line 81 in infra/src/lib.rs

View workflow job for this annotation

GitHub Actions / clippy

[clippy] infra/src/lib.rs#L81

warning: redundant field names in struct initialization --> infra/src/lib.rs:81:25 | 81 | condition: condition, | ^^^^^^^^^^^^^^^^^^^^ help: replace it with: `condition` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#redundant_field_names
Raw output
infra/src/lib.rs:81:25:w:warning: redundant field names in struct initialization
  --> infra/src/lib.rs:81:25
   |
81 |                         condition: condition,
   |                         ^^^^^^^^^^^^^^^^^^^^ help: replace it with: `condition`
   |
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#redundant_field_names


__END__
},
))
}
Expand All @@ -112,37 +94,42 @@ pub struct OptdQueryPlanner {
}

impl OptdQueryPlanner {
fn convert_into_optd_scalar(predicate_expr: Expr) -> Arc<ScalarOperator<ScalarLink>> {
fn convert_into_optd_scalar(predicate_expr: Expr) -> Scalar {

Check warning on line 97 in infra/src/lib.rs

View workflow job for this annotation

GitHub Actions / clippy

[clippy] infra/src/lib.rs#L97

warning: unused variable: `predicate_expr` --> infra/src/lib.rs:97:33 | 97 | fn convert_into_optd_scalar(predicate_expr: Expr) -> Scalar { | ^^^^^^^^^^^^^^ help: if this is intentional, prefix it with an underscore: `_predicate_expr`
Raw output
infra/src/lib.rs:97:33:w:warning: unused variable: `predicate_expr`
  --> infra/src/lib.rs:97:33
   |
97 |     fn convert_into_optd_scalar(predicate_expr: Expr) -> Scalar {
   |                                 ^^^^^^^^^^^^^^ help: if this is intentional, prefix it with an underscore: `_predicate_expr`


__END__
// TODO: Implement the conversion logic here
Arc::new(ScalarOperator::new())
Scalar {}
}

fn convert_into_optd_logical(plan_node: &LogicalPlan) -> Arc<LogicalOperator<LogicalLink>> {
match plan_node {
LogicalPlan::Filter(filter) => {
Arc::new(LogicalOperator::Filter(LogicalFilterOperator {
child: LogicalLink::LogicalNode(Self::convert_into_optd_logical(&filter.input)),
predicate: LogicalLink::ScalarNode(Self::convert_into_optd_scalar(
filter.predicate.clone(),
)),
predicate: Self::convert_into_optd_scalar(filter.predicate.clone()),
}))
}

LogicalPlan::Join(join) => Arc::new(LogicalOperator::Join(
(LogicalJoinOperator {
join_type: (),
left: LogicalLink::LogicalNode(Self::convert_into_optd_logical(&join.left)),
right: LogicalLink::LogicalNode(Self::convert_into_optd_logical(&join.right)),
condition: LogicalLink::ScalarNode(Arc::new(todo!())),
}),
)),

LogicalPlan::TableScan(table_scan) => Arc::new(LogicalOperator::Scan(
(LogicalScanOperator {
LogicalPlan::Join(join) => Arc::new(LogicalOperator::Join(LogicalJoinOperator {
join_type: (),
left: LogicalLink::LogicalNode(Self::convert_into_optd_logical(&join.left)),
right: LogicalLink::LogicalNode(Self::convert_into_optd_logical(&join.right)),
condition: Arc::new(
join.on
.iter()
.map(|(left, right)| {
let left_scalar = Self::convert_into_optd_scalar(left.clone());
let right_scalar = Self::convert_into_optd_scalar(right.clone());
(left_scalar, right_scalar)
})
.collect(),
),
})),

LogicalPlan::TableScan(table_scan) => {
Arc::new(LogicalOperator::Scan(LogicalScanOperator {
table_name: table_scan.table_name.to_quoted_string(),
predicate: None, // TODO fix this: there are multiple predicates in the scan but our IR only accepts one
}),
)),
}))
}
_ => panic!("OptD does not support this type of query yet"),
}
}
Expand Down
4 changes: 3 additions & 1 deletion infra/src/types/operator/logical.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Type representations of logical operators in (materialized) query plans.
use std::sync::Arc;

use crate::types::operator::Scalar;

/// A type representing a logical operator in an input logical query plan.
Expand Down Expand Up @@ -40,5 +42,5 @@ pub struct LogicalJoinOperator<Link> {
pub join_type: (),
pub left: Link,
pub right: Link,
pub condition: Scalar,
pub condition: Arc<Vec<(Scalar, Scalar)>>,
}
4 changes: 3 additions & 1 deletion infra/src/types/operator/physical.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Type representations of physical operators in (materialized) query plans.
use std::sync::Arc;

use crate::types::operator::Scalar;

/// A type representing a physical operator in an output physical query execution plan.
Expand Down Expand Up @@ -41,5 +43,5 @@ pub struct HashJoinOperator<Link> {
pub join_type: (),
pub left: Link,
pub right: Link,
pub condition: Scalar,
pub condition: Arc<Vec<(Scalar, Scalar)>>,
}

0 comments on commit cbfbb88

Please sign in to comment.