Skip to content

Commit 2a40cf5

Browse files
committed
first draft of an datafusion logical to optd logical plan conversion function
1 parent 635692f commit 2a40cf5

File tree

7 files changed

+71
-20
lines changed

7 files changed

+71
-20
lines changed

infra/src/lib.rs

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use datafusion::execution::SessionStateBuilder;
1717
use datafusion::logical_expr::{Explain, LogicalPlan, PlanType, TableSource, ToStringifiedPlan};
1818
use datafusion::physical_plan::ExecutionPlan;
1919
use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
20-
use datafusion::prelude::{SessionConfig, SessionContext};
20+
use datafusion::prelude::{Expr, SessionConfig, SessionContext};
2121

2222
/// TODO make distinction between relational groups and scalar groups.
2323
#[repr(transparent)]
@@ -29,7 +29,11 @@ pub struct GroupId(u64);
2929
pub struct ExprId(u64);
3030

3131
mod types;
32-
use types::plan::logical_plan::LogicalPlan as OptDLogicalPlan;
32+
use types::operator::logical::{
33+
LogicalFilterOperator, LogicalJoinOperator, LogicalOperator, LogicalScanOperator,
34+
};
35+
use types::operator::ScalarOperator;
36+
use types::plan::logical_plan::{LogicalLink, LogicalPlan as OptDLogicalPlan, ScalarLink};
3337

3438
struct OptdOptimizer {}
3539

@@ -38,14 +42,47 @@ pub struct OptdQueryPlanner {
3842
}
3943

4044
impl OptdQueryPlanner {
41-
fn convert_into_optd_logical(plan_node: LogicalPlan) -> OptDLogicalPlan {
42-
match plan_node {
43-
LogicalPlan::Filter(filter) => todo!(),
44-
LogicalPlan::Join(join) => todo!(),
45-
LogicalPlan::TableScan(table_scan) => todo!(),
45+
fn convert_into_optd_scalar(predicate_expr: Expr) -> Arc<ScalarOperator<ScalarLink>> {
46+
// TODO: Implement the conversion logic here
47+
Arc::new(ScalarOperator::new())
48+
}
49+
50+
fn convert_into_optd_logical(plan_node: Arc<LogicalPlan>) -> Arc<LogicalOperator<LogicalLink>> {
51+
match &*plan_node {
52+
LogicalPlan::Filter(filter) => {
53+
Arc::new(LogicalOperator::Filter(LogicalFilterOperator {
54+
child: LogicalLink::LogicalNode(Self::convert_into_optd_logical(
55+
filter.input.clone(),
56+
)),
57+
predicate: LogicalLink::ScalarNode(Self::convert_into_optd_scalar(
58+
filter.predicate.clone(),
59+
)),
60+
}))
61+
}
62+
63+
LogicalPlan::Join(join) => Arc::new(LogicalOperator::Join(
64+
(LogicalJoinOperator {
65+
join_type: (),
66+
left: LogicalLink::LogicalNode(Self::convert_into_optd_logical(
67+
join.left.clone(),
68+
)),
69+
right: LogicalLink::LogicalNode(Self::convert_into_optd_logical(
70+
join.right.clone(),
71+
)),
72+
condition: LogicalLink::ScalarNode(Arc::new(todo!())),
73+
}),
74+
)),
75+
76+
LogicalPlan::TableScan(table_scan) => Arc::new(LogicalOperator::Scan(
77+
(LogicalScanOperator {
78+
table_name: table_scan.table_name.to_quoted_string(),
79+
predicate: None, // TODO fix this: there are multiple predicates in the scan but our IR only accepts one
80+
}),
81+
)),
4682
_ => panic!("OptD does not support this type of query yet"),
4783
}
4884
}
85+
4986
async fn create_physical_plan_inner(
5087
&self,
5188
logical_plan: &LogicalPlan,

infra/src/main.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ use datafusion::physical_plan::ExecutionPlanProperties;
66
use datafusion::physical_plan::Partitioning;
77
use datafusion::prelude::SessionConfig;
88
use futures::StreamExt;
9-
use std::{io, time::SystemTime};
109
use infra::create_df_context;
10+
use std::{io, time::SystemTime};
1111

1212
#[tokio::main]
1313
async fn main() -> Result<()> {
@@ -16,13 +16,9 @@ async fn main() -> Result<()> {
1616

1717
let session_config = SessionConfig::from_env()?.with_information_schema(true);
1818

19-
let ctx = crate::create_df_context(
20-
Some(session_config.clone()),
21-
Some(rt_config.clone()),
22-
None
23-
)
24-
.await
25-
.unwrap();
19+
let ctx = crate::create_df_context(Some(session_config.clone()), Some(rt_config.clone()), None)
20+
.await
21+
.unwrap();
2622

2723
// Create a DataFrame with the input query
2824
let queries = io::read_to_string(io::stdin())?;

infra/src/types/memo/rule.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use super::Memo;
22
use crate::{
33
types::expression::{relational::logical::LogicalExpr, Expr},
4-
types::plan::{partial_logical_plan::PartialLogicalPlan, partial_physical_plan::PartialPhysicalPlan},
4+
types::plan::{
5+
partial_logical_plan::PartialLogicalPlan, partial_physical_plan::PartialPhysicalPlan,
6+
},
57
};
68

79
#[trait_variant::make(Send)]

infra/src/types/operator/logical.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ pub enum LogicalOperator<Link> {
2525
/// TODO Add docs.
2626
pub struct LogicalScanOperator<Link> {
2727
pub table_name: String,
28-
pub predicate: Link,
28+
pub predicate: Option<Link>,
2929
}
3030

3131
/// TODO Add docs.

infra/src/types/operator/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use std::{marker::PhantomData, sync::Arc};
22

3+
use super::plan::logical_plan::ScalarLink;
4+
35
pub mod logical;
46
pub mod physical;
57

@@ -10,3 +12,12 @@ pub mod physical;
1012
pub struct ScalarOperator<Link> {
1113
_phantom: PhantomData<Link>,
1214
}
15+
16+
impl ScalarOperator<ScalarLink> {
17+
// Add a public constructor
18+
pub fn new() -> Self {
19+
ScalarOperator {
20+
_phantom: std::marker::PhantomData,
21+
}
22+
}
23+
}

infra/src/types/plan/partial_physical_plan.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use crate::types::operator::{logical::LogicalOperator, physical::PhysicalOperator, ScalarOperator};
1+
use crate::types::operator::{
2+
logical::LogicalOperator, physical::PhysicalOperator, ScalarOperator,
3+
};
24
use crate::GroupId;
35
use std::sync::Arc;
46

infra/src/types/plan/physical_plan.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,15 @@ use crate::types::operator::{
44
ScalarOperator,
55
};
66
use datafusion::{
7-
common::{arrow::datatypes::Schema, JoinType}, datasource::physical_plan::{CsvExecBuilder, FileScanConfig}, execution::object_store::ObjectStoreUrl, physical_plan::{
7+
common::{arrow::datatypes::Schema, JoinType},
8+
datasource::physical_plan::{CsvExecBuilder, FileScanConfig},
9+
execution::object_store::ObjectStoreUrl,
10+
physical_plan::{
811
expressions::NoOp,
912
filter::FilterExec,
1013
joins::{HashJoinExec, PartitionMode},
1114
ExecutionPlan,
12-
}
15+
},
1316
};
1417
use std::sync::Arc;
1518

0 commit comments

Comments
 (0)