Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Built some initial e2e datafusion infrastructure #10

Closed
wants to merge 36 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
c523704
Built some initial e2e datafusion infrastructure
SarveshOO7 Jan 20, 2025
7bb93f6
Cleanup and add optd types
SarveshOO7 Jan 27, 2025
33a026b
Moved from crate to submodules
SarveshOO7 Jan 27, 2025
6e77e54
move to types submodule
connortsui20 Jan 27, 2025
a5ca041
Cleanup of code base
SarveshOO7 Jan 27, 2025
469b36c
fix plan types
connortsui20 Jan 27, 2025
0fa7b5d
add boilerplate for physical to execution plan
connortsui20 Jan 28, 2025
54f6f19
better panic messages
connortsui20 Jan 28, 2025
1cf2c82
Move setup into lib.rs
SarveshOO7 Jan 28, 2025
635692f
Merge branch 'sarvesh/infra' of https://github.com/cmu-db/optd into s…
SarveshOO7 Jan 28, 2025
2a40cf5
first draft of an datafusion logical to optd logical plan conversion …
SarveshOO7 Jan 28, 2025
3e9a7cb
replace arc with references and add another helper function
SarveshOO7 Jan 28, 2025
d28b46a
Conversion function type checks
SarveshOO7 Jan 28, 2025
0bc6a13
Merged Connor's type updates: remove partial physical plan and fix sc…
SarveshOO7 Jan 28, 2025
cbfbb88
Change join condition to match datafusion and fix type errors from pr…
SarveshOO7 Jan 28, 2025
28cb5dc
pull in the types from main
SarveshOO7 Jan 30, 2025
5c4ea49
fixed imports
SarveshOO7 Jan 30, 2025
ebd8675
Completed basic outline that type checks
SarveshOO7 Jan 31, 2025
ab06687
Merge remote-tracking branch 'origin/main' into sarvesh/infra
SarveshOO7 Jan 31, 2025
ae97fec
Renamed to optd-datafusion
SarveshOO7 Jan 31, 2025
238a210
Implement datafusion to optd conversion for table scan
SarveshOO7 Jan 31, 2025
1802923
Add the And type and also create conversion context
SarveshOO7 Jan 31, 2025
5e626e7
setup more skeleton code for the optd to datafusion conversion
SarveshOO7 Jan 31, 2025
8587134
implmeneted colref to and from optd
SarveshOO7 Feb 3, 2025
a99f8f2
Select * from Foo works but does not have column names
SarveshOO7 Feb 3, 2025
2056342
rearange code into separate files
SarveshOO7 Feb 3, 2025
ebcaf7c
code cleanup
SarveshOO7 Feb 3, 2025
0ac2460
setup unit testing of sql queries
SarveshOO7 Feb 3, 2025
b4adffc
Add column ids for the output
SarveshOO7 Feb 3, 2025
0f6e838
Filters do not work right now because Expr::Cast has not been impleme…
SarveshOO7 Feb 4, 2025
e38a741
Conversion implementation for NestedLoopJoin
SarveshOO7 Feb 4, 2025
01eed48
partially handled cast without casting
SarveshOO7 Feb 4, 2025
3e4511e
Nested loop join conversions work correctly now
SarveshOO7 Feb 4, 2025
069d612
code cleanup
SarveshOO7 Feb 4, 2025
1563faa
removed float from constants
SarveshOO7 Feb 4, 2025
f04bfa4
added comments for planner and for conversion context
SarveshOO7 Feb 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3,117 changes: 3,103 additions & 14 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[workspace]
members = ["optd-core"]
members = [ "optd-datafusion","optd-core"]
resolver = "2"
6 changes: 6 additions & 0 deletions optd-core/src/operator/scalar/and.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
/// Binary And expression for scalar values.
#[derive(Clone)]
pub struct And<Scalar> {
pub left: Scalar,
pub right: Scalar,
}
5 changes: 4 additions & 1 deletion optd-core/src/operator/scalar/column_ref.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
/// Column reference using position index (e.g. #0 for first column)
#[derive(Clone)]
pub struct ColumnRef(()); // TODO(alexis): Mocked for now.
pub struct ColumnRef {
/// Index of the column
pub column_idx: usize,
}
2 changes: 0 additions & 2 deletions optd-core/src/operator/scalar/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ pub enum Constant {
String(String),
/// Integer constant (e.g. 42).
Integer(i64),
/// Floating point constant (e.g. 3.14).
Float(f64),
/// Boolean constant (e.g. true, false).
Boolean(bool),
}
6 changes: 6 additions & 0 deletions optd-core/src/operator/scalar/equal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
/// Binary And expression for scalar values.
#[derive(Clone)]
pub struct Equal<Scalar> {
pub left: Scalar,
pub right: Scalar,
}
6 changes: 6 additions & 0 deletions optd-core/src/operator/scalar/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
//! Type definitions for scalar operators.
pub mod add;
pub mod and;
pub mod column_ref;
pub mod constants;
pub mod equal;

use add::Add;
use and::And;
use column_ref::ColumnRef;
use constants::Constant;
use equal::Equal;

/// Each variant of `ScalarOperator` represents a specific kind of scalar operator.
///
Expand All @@ -20,7 +24,9 @@ use constants::Constant;
/// [`PartialLogicalPlan`]: crate::plan::partial_logical_plan::PartialLogicalPlan
#[derive(Clone)]
pub enum ScalarOperator<Scalar> {
And(And<Scalar>),
Add(Add<Scalar>),
ColumnRef(ColumnRef),
Constant(Constant),
Equal(Equal<Scalar>),
}
21 changes: 21 additions & 0 deletions optd-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[package]
name = "optd-datafusion"
version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1.0.95"
arrow = "54.0.0"
async-trait = "0.1.85"
datafusion = "44.0.0"
futures = "0.3.31"
itertools = "0.14.0"
tokio = { version = "1.43", default-features = false, features = [
"rt",
"rt-multi-thread",
"macros",
] }
trait-variant = "0.1.2"
optd-core = { path = "../optd-core" }
async-recursion = "1"

22 changes: 22 additions & 0 deletions optd-datafusion/sql/test_filter.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
CREATE TABLE employees (
id BIGINT,
name TEXT,
department_id BIGINT
);

CREATE TABLE departments (
id BIGINT,
department_name TEXT
);

INSERT INTO employees VALUES
(1, 'Alice', 1),
(2, 'Bob', 2),
(3, 'Charlie', 1);

INSERT INTO departments VALUES
(1, 'Engineering'),
(2, 'Marketing');


SELECT * FROM employees WHERE id = 2;
22 changes: 22 additions & 0 deletions optd-datafusion/sql/test_join.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
CREATE TABLE employees (
id INTEGER,
name TEXT,
department_id INTEGER
);

CREATE TABLE departments (
id INTEGER,
department_name TEXT
);

INSERT INTO employees VALUES
(1, 'Alice', 1),
(2, 'Bob', 2),
(3, 'Charlie', 1);

INSERT INTO departments VALUES
(1, 'Engineering'),
(2, 'Marketing');


SELECT * FROM employees JOIN departments WHERE employees.department_id = departments.id;
22 changes: 22 additions & 0 deletions optd-datafusion/sql/test_scan.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
CREATE TABLE employees (
id INTEGER,
name TEXT,
department_id INTEGER
);

CREATE TABLE departments (
id INTEGER,
department_name TEXT
);

INSERT INTO employees VALUES
(1, 'Alice', 1),
(2, 'Bob', 2),
(3, 'Charlie', 1);

INSERT INTO departments VALUES
(1, 'Engineering'),
(2, 'Marketing');


SELECT * FROM employees;
174 changes: 174 additions & 0 deletions optd-datafusion/src/converter/from_optd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
use std::{collections::HashMap, str::FromStr, sync::Arc};

use async_recursion::async_recursion;
use datafusion::{
arrow::datatypes::{Schema, SchemaRef},
common::JoinType,
datasource::source_as_provider,
logical_expr::Operator,
physical_plan::{
expressions::{BinaryExpr, Column, Literal},
joins::utils::{ColumnIndex, JoinFilter},
projection::ProjectionExec,
ExecutionPlan, PhysicalExpr,
},
scalar::ScalarValue,
};
use optd_core::{
operator::{
relational::physical::PhysicalOperator,
scalar::{constants::Constant, ScalarOperator},
},
plan::{physical_plan::PhysicalPlan, scalar_plan::ScalarPlan},
};

use super::ConversionContext;

impl ConversionContext<'_> {
#[async_recursion]
pub async fn conv_optd_to_df_relational(
&self,
optimized_plan: &PhysicalPlan,
) -> anyhow::Result<Arc<dyn ExecutionPlan>> {
match &*optimized_plan.node {
PhysicalOperator::TableScan(table_scan) => {
let source = self.tables.get(&table_scan.table_name).unwrap();
let provider = source_as_provider(source)?;
let filters = if let Some(ref _pred) = table_scan.predicate {
// split_binary_owned(pred, Operator::And)
todo!("Optd does not support filters inside table scan")
} else {
vec![]
};
let plan = provider
.scan(self.session_state, None, &filters, None)
.await?;
Ok(plan)
}
PhysicalOperator::Filter(filter) => {
let input_exec = self.conv_optd_to_df_relational(&filter.child).await?;
let physical_expr = self
.conv_optd_to_df_scalar(&filter.predicate, &input_exec.schema())
.clone();
Ok(
Arc::new(datafusion::physical_plan::filter::FilterExec::try_new(
physical_expr,
input_exec,
)?) as Arc<dyn ExecutionPlan + 'static>,
)
}
PhysicalOperator::Project(project) => {
let input_exec = self.conv_optd_to_df_relational(&project.child).await?;
let physical_exprs = project
.fields
.to_vec()
.into_iter()
.map(|field| {
self.conv_optd_to_df_scalar(&field, &input_exec.schema())
.clone()
})
.enumerate()
.map(|(idx, expr)| (expr, format!("col{}", idx)))
.collect::<Vec<(Arc<dyn PhysicalExpr>, String)>>();

Ok(
Arc::new(ProjectionExec::try_new(physical_exprs, input_exec)?)
as Arc<dyn ExecutionPlan + 'static>,
)
}
PhysicalOperator::NestedLoopJoin(nested_loop_join) => {
let left_exec = self
.conv_optd_to_df_relational(&nested_loop_join.outer)
.await?;
let right_exec = self
.conv_optd_to_df_relational(&nested_loop_join.inner)
.await?;
let filter_schema = {
let fields = left_exec
.schema()
.fields()
.into_iter()
.chain(right_exec.schema().fields().into_iter())
.cloned()
.collect::<Vec<_>>();
Schema::new_with_metadata(fields, HashMap::new())
};

let physical_expr = self.conv_optd_to_df_scalar(
&nested_loop_join.condition,
&Arc::new(filter_schema.clone()),
);

let join_type = JoinType::from_str(&nested_loop_join.join_type)?;

let mut column_idxs = vec![];
for i in 0..left_exec.schema().fields().len() {
column_idxs.push(ColumnIndex {
index: i,
side: datafusion::common::JoinSide::Left,
});
}
for i in 0..right_exec.schema().fields().len() {
column_idxs.push(ColumnIndex {
index: i,
side: datafusion::common::JoinSide::Right,
});
}

Ok(Arc::new(
datafusion::physical_plan::joins::NestedLoopJoinExec::try_new(
left_exec,
right_exec,
Some(JoinFilter::new(physical_expr, column_idxs, filter_schema)),
&join_type,
)?,
) as Arc<dyn ExecutionPlan + 'static>)
}
PhysicalOperator::HashJoin(_hash_join) => todo!(),
PhysicalOperator::SortMergeJoin(_merge_join) => todo!(),
}
}

pub fn conv_optd_to_df_scalar(
&self,
pred: &ScalarPlan,
context: &SchemaRef,
) -> Arc<dyn PhysicalExpr> {
match &*pred.node {
ScalarOperator::ColumnRef(column_ref) => {
let idx = column_ref.column_idx;
Arc::new(
// Datafusion checks if col expr name matches the schema, so we have to supply the name inferred by datafusion,
// instead of using out own logical properties
Column::new(context.fields()[idx].name(), idx),
)
}
ScalarOperator::Constant(constant) => {
let value = match constant {
Constant::String(value) => ScalarValue::Utf8(Some(value.clone())),
Constant::Integer(value) => ScalarValue::Int64(Some(value.clone())),
Constant::Boolean(value) => ScalarValue::Boolean(Some(value.clone())),
};
Arc::new(Literal::new(value))
}
ScalarOperator::And(and) => {
let left = self.conv_optd_to_df_scalar(&and.left, context);
let right = self.conv_optd_to_df_scalar(&and.right, context);
let op = Operator::And;
Arc::new(BinaryExpr::new(left, op, right)) as Arc<dyn PhysicalExpr>
}
ScalarOperator::Add(add) => {
let left = self.conv_optd_to_df_scalar(&add.left, context);
let right = self.conv_optd_to_df_scalar(&add.right, context);
let op = Operator::Plus;
Arc::new(BinaryExpr::new(left, op, right)) as Arc<dyn PhysicalExpr>
}
ScalarOperator::Equal(equal) => {
let left = self.conv_optd_to_df_scalar(&equal.left, context);
let right = self.conv_optd_to_df_scalar(&equal.right, context);
let op = Operator::Eq;
Arc::new(BinaryExpr::new(left, op, right)) as Arc<dyn PhysicalExpr>
}
}
}
}
Loading