Skip to content

feat(core): new task graph without rules and merging #55

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 73 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
2fb0c7b
refactor Message
yliang412 Apr 1, 2025
b9fc601
add some new tasks
yliang412 Apr 1, 2025
7ea3aa0
wip
yliang412 Apr 1, 2025
af4f0aa
changed the process new_physical_partial and new_logical_partial
SarveshOO7 Apr 1, 2025
6b0f15e
mark uncompiled part of process_new_physical_partial todo
yliang412 Apr 2, 2025
ad5a2d8
rename task to be created to avoid confusion
yliang412 Apr 2, 2025
4bffa2e
WIP
SarveshOO7 Apr 2, 2025
39b5ad0
tasks refactor WIP
yliang412 Apr 2, 2025
a4d924d
Finished some explore group functions
SarveshOO7 Apr 2, 2025
ee6793a
task graph WIP
yliang412 Apr 3, 2025
920d5e4
remove subscriber; clarified memo.get_best_optimized_expr
yliang412 Apr 3, 2025
b10a989
remove one todo
yliang412 Apr 3, 2025
1773cf5
complete group + goal subscription
yliang412 Apr 3, 2025
2c914f5
completed process_new_physical_partial
SarveshOO7 Apr 3, 2025
9326e26
wip: not sure whether we should do this
SarveshOO7 Apr 3, 2025
7cb4d5c
register the optimize_goal_in and cost_expr_in in `receive_new_goal_m…
yliang412 Apr 3, 2025
d79c2de
remove logical property derivation during ingest
yliang412 Apr 3, 2025
7913d13
forward best costed to goal
yliang412 Apr 4, 2025
5d70451
add cost to continue_with_costed task
yliang412 Apr 4, 2025
d8f9d64
implement create_optimize_plan_task
yliang412 Apr 4, 2025
62bfac6
finish optimize_plan and add a new client API
yliang412 Apr 5, 2025
b5b5a54
fix infinite future error
yliang412 Apr 5, 2025
8146ee7
finish derive/retrieve property
yliang412 Apr 5, 2025
67fd5e0
rename message_tx/rx to engine_tx/rx to making things explcit
yliang412 Apr 5, 2025
f7b49f2
add memo only test
yliang412 Apr 5, 2025
1d3559b
make update_physical_expr_cost eager
yliang412 Apr 5, 2025
3946265
turn off and on optimizer
yliang412 Apr 5, 2025
41d12f6
fix clippy
yliang412 Apr 5, 2025
35d7d11
add forward_result
yliang412 Apr 6, 2025
2e5cbad
function for handling forwarding to each new optimize goal's newly re…
SarveshOO7 Apr 6, 2025
6135360
some minor changes
SarveshOO7 Apr 6, 2025
031883c
first draft of merge code
SarveshOO7 Apr 6, 2025
3626cf8
handle_forward_result WIP
yliang412 Apr 6, 2025
f422a0e
fix compilation error except for all_exprs
yliang412 Apr 6, 2025
d485a4b
clean up
yliang412 Apr 6, 2025
20bc704
changes I forgot to push earlier
SarveshOO7 Apr 7, 2025
940f170
Merge branch 'yuchen/optimizer-with-new-task-graph' of https://github…
SarveshOO7 Apr 7, 2025
32d35df
fix compilation error
yliang412 Apr 7, 2025
f20cd1d
refactor memo to output ForwardResult
yliang412 Apr 7, 2025
d7c84ca
update memo
yliang412 Apr 11, 2025
4976273
Merge branch 'main' into yuchen/optimizer-with-new-task-graph
yliang412 Apr 15, 2025
71d02ec
fix build error
yliang412 Apr 15, 2025
0254895
Merge branch 'main' into yuchen/optimizer-with-new-task-graph
yliang412 Apr 15, 2025
ccceab3
fix merge conflict
yliang412 Apr 15, 2025
a725123
apply group merges #1
yliang412 Apr 15, 2025
c8e9a74
apply group merges #2
yliang412 Apr 15, 2025
f67e323
apply group merges #3 + fix #1
yliang412 Apr 15, 2025
15ec949
apply goal merges WIP #1
yliang412 Apr 16, 2025
4fc6bfe
merged @yliang412 stuff. still got a lot of errors
SarveshOO7 Apr 20, 2025
1b39290
completed cascading merges and goal merges in memo table
SarveshOO7 Apr 21, 2025
f9e70fa
whoops! didn't need that
SarveshOO7 Apr 21, 2025
b642931
handle physical stale physical expression whose cost tasks need to be…
SarveshOO7 Apr 21, 2025
42844dc
fixed asyn recursion; in-memory memo complete
SarveshOO7 Apr 21, 2025
87822de
remove stuff to get a buildable version for now
SarveshOO7 Apr 22, 2025
ac851a1
deletion of tasks and a mini rewrite of the merge result
SarveshOO7 Apr 27, 2025
f35cc73
deduplicate the publishers
SarveshOO7 Apr 27, 2025
10d41ac
small issue in group merge fixed. plus goal merge result updated
SarveshOO7 Apr 27, 2025
92f0b49
typo
SarveshOO7 Apr 27, 2025
235c939
typo 2.0
SarveshOO7 Apr 27, 2025
6c65828
goal creation can have no source
SarveshOO7 Apr 27, 2025
5e73313
merge code, tehcnically complete
SarveshOO7 Apr 30, 2025
ca4b48f
merge goals while being able to delete stuff and not have cyclic goal…
SarveshOO7 Apr 30, 2025
96b325d
rename types (#94)
connortsui20 May 1, 2025
c6e2f3d
copied the structure and code over from main
SarveshOO7 May 1, 2025
1bbbd88
fixed all compiler errors in the optimizer from merging
SarveshOO7 May 1, 2025
2299d94
created a mock catalog
SarveshOO7 May 1, 2025
bed789f
wrote the simple HIR for the test
SarveshOO7 May 1, 2025
c15b26c
rename to PropagateBestExpression
connortsui20 May 2, 2025
0494252
move memo module out of core
connortsui20 May 2, 2025
dcd694b
Merge branch 'main' into yuchen/optimizer-with-new-task-graph
connortsui20 May 2, 2025
70ab35c
fix merge conflicts
connortsui20 May 2, 2025
85b064f
format
connortsui20 May 2, 2025
839d8cd
fix clippy
connortsui20 May 2, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 87 additions & 87 deletions Cargo.lock

Large diffs are not rendered by default.

Binary file added optd/src/core/.DS_Store
Binary file not shown.
23 changes: 23 additions & 0 deletions optd/src/core/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,30 @@
use crate::memo::MemoizeError;
use futures::channel::{mpsc, oneshot::Canceled};

/// all optd-core errors, defined in there respective modules,
/// but everyone uses this common Error type for simplicity.
#[derive(Debug)]
#[allow(dead_code)]
pub enum Error {
Placeholder,
Channel(Box<dyn std::error::Error + Send + Sync>),
Memo(MemoizeError),
}

impl From<mpsc::SendError> for Error {
fn from(value: mpsc::SendError) -> Self {
Error::Channel(Box::new(value))
}
}

impl From<Canceled> for Error {
fn from(value: Canceled) -> Self {
Error::Channel(Box::new(value))
}
}

impl From<MemoizeError> for Error {
fn from(value: MemoizeError) -> Self {
Error::Memo(value)
}
}
2 changes: 1 addition & 1 deletion optd/src/core/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub mod bridge;
pub mod cir;
pub mod error;
pub mod memo;
pub mod optimizer;
117 changes: 117 additions & 0 deletions optd/src/core/optimizer/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
use futures::{
SinkExt, StreamExt,
channel::{mpsc, oneshot},
};

use crate::{
core::cir::{LogicalPlan, PhysicalPlan},
core::error::Error,
memo::Memoize,
};

/// Unique identifier for a query instance.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct QueryInstanceId(pub i64);

pub struct QueryInstance {
/// The logical plan associated with this query instance.
id: QueryInstanceId,
logical_plan: LogicalPlan,
client_tx: mpsc::Sender<ClientMessage>,
physical_plan_rx: mpsc::Receiver<PhysicalPlan>,
}

impl QueryInstance {
/// Gets the id of this query instance.
pub fn id(&self) -> QueryInstanceId {
self.id
}

/// Gets the logical plan for this query instance.
pub fn logical_plan(&self) -> &LogicalPlan {
&self.logical_plan
}

/// Receives the best physical plan for this query instance.
pub async fn recv_best_plan(&mut self) -> Result<PhysicalPlan, Error> {
let physical_plan = self
.physical_plan_rx
.next()
.await
.ok_or_else(|| Error::Placeholder)?;
Ok(physical_plan)
}
}

impl Drop for QueryInstance {
fn drop(&mut self) {
let message = ClientMessage::Complete {
query_instance_id: self.id,
};
let mut client_tx = self.client_tx.clone();
tokio::spawn(async move {
if let Err(e) = client_tx.send(message).await {
eprintln!("Failed to send complete message: {}", e);
}
});
}
}

pub struct Client<M: Memoize> {
tx: mpsc::Sender<ClientMessage>,
handle: tokio::task::JoinHandle<M>,
}

impl<M: Memoize> Client<M> {
pub fn new(tx: mpsc::Sender<ClientMessage>, handle: tokio::task::JoinHandle<M>) -> Self {
Self { tx, handle }
}

pub async fn create_query_instance(
&mut self,
logical_plan: LogicalPlan,
) -> Result<QueryInstance, Error> {
let (id_tx, id_rx) = oneshot::channel();
// Need this to buffer > 0 so we do not block the optimizer.
let (physical_plan_tx, physical_plan_rx) = mpsc::channel(1);
let message = ClientMessage::Init {
logical_plan: logical_plan.clone(),
physical_plan_tx,
id_tx,
};
self.tx.send(message).await?;
let id = id_rx.await?;

Ok(QueryInstance {
id,
logical_plan,
physical_plan_rx,
client_tx: self.tx.clone(),
})
}

/// Shutdown the optimizer, returning the memo table.
pub async fn shutdown(mut self) -> Result<M, Error> {
self.tx.send(ClientMessage::Shutdown).await?;
let memo = self.handle.await.map_err(|_| Error::Placeholder)?;
Ok(memo)
}
}

pub enum ClientMessage {
/// Message to initiate optimization of a query instance.
Init {
/// The logical plan to be ingested.
logical_plan: LogicalPlan,

physical_plan_tx: mpsc::Sender<PhysicalPlan>,
/// The channel to send the query instance ID back to the caller.
id_tx: oneshot::Sender<QueryInstanceId>,
},
/// Message to indicate that the client is done with the query instance.
Complete {
/// The query instance ID to be completed.
query_instance_id: QueryInstanceId,
},
Shutdown,
}
203 changes: 203 additions & 0 deletions optd/src/core/optimizer/egest.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
use std::sync::Arc;

use async_recursion::async_recursion;
use futures::future::try_join_all;

use crate::{
core::cir::{
Child, GoalMemberId, Operator, PartialPhysicalPlan, PhysicalExpressionId, PhysicalPlan,
},
core::error::Error,
memo::Memoize,
};

use super::Optimizer;

impl<M: Memoize> Optimizer<M> {
/// Recursively transforms a physical expression ID in the memo into a complete physical plan.
///
/// This function retrieves the physical expression from the memo and recursively
/// transforms any child goal members into their corresponding best physical plans.
///
/// # Parameters
/// * `expression_id` - ID of the physical expression to transform into a complete plan.
///
/// # Returns
/// * `Ok(Some(PhysicalPlan))` if all child plans were successfully constructed from their IDs.
/// * `Ok(None)` if any goal ID lacks a best expression ID.
/// * `Err(Error)` if a memo operation fails.
#[async_recursion]
pub(super) async fn egest_best_plan(
&self,
expression_id: PhysicalExpressionId,
) -> Result<Option<PhysicalPlan>, Error> {
let expression = self.memo.materialize_physical_expr(expression_id).await?;

let child_results = try_join_all(
expression
.children
.iter()
.map(|child| self.egest_child_plan(child)),
)
.await?;

let child_plans = match child_results.into_iter().collect::<Option<Vec<_>>>() {
Some(plans) => plans,
None => return Ok(None),
};

Ok(Some(PhysicalPlan(Operator {
tag: expression.tag,
data: expression.data,
children: child_plans,
})))
}

/// Converts a physical expression ID to a partial physical plan.
///
/// This method materializes the expression and recursively processes its children,
/// preserving goal references as unmaterialized plans.
///
/// # Parameters
/// * `expression_id` - ID of the physical expression to convert to a partial plan.
///
/// # Returns
/// * `PartialPhysicalPlan` - The materialized partial plan.
/// * `Err(Error)` if a memo operation fails.
pub(super) async fn egest_partial_plan(
&self,
expression_id: PhysicalExpressionId,
) -> Result<PartialPhysicalPlan, Error> {
let expression = self.memo.materialize_physical_expr(expression_id).await?;

let children = try_join_all(
expression
.children
.iter()
.map(|child| self.egest_partial_child(child.clone())),
)
.await?;

Ok(PartialPhysicalPlan::Materialized(Operator {
tag: expression.tag,
data: expression.data,
children,
}))
}

async fn egest_child_plan(
&self,
child: &Child<GoalMemberId>,
) -> Result<Option<Child<Arc<PhysicalPlan>>>, Error> {
match child {
Child::Singleton(member) => {
let plan = match self.process_goal_member(*member).await? {
Some(plan) => plan,
None => return Ok(None),
};
Ok(Some(Child::Singleton(plan.into())))
}
Child::VarLength(members) => {
let futures = members.iter().map(|member| async move {
let plan = match self.process_goal_member(*member).await? {
Some(plan) => plan,
None => return Ok::<_, Error>(None),
};
Ok(Some(plan.into()))
});

let result_plans = match try_join_all(futures).await?.into_iter().collect() {
Some(plans) => plans,
None => return Ok(None),
};

Ok(Some(Child::VarLength(result_plans)))
}
}
}

async fn process_goal_member(
&self,
member: GoalMemberId,
) -> Result<Option<PhysicalPlan>, Error> {
match member {
GoalMemberId::PhysicalExpressionId(expr_id) => self.egest_best_plan(expr_id).await,
GoalMemberId::GoalId(goal_id) => {
let (best_expr_id, _) =
match self.memo.get_best_optimized_physical_expr(goal_id).await? {
Some(expr) => expr,
None => return Ok(None),
};

self.egest_best_plan(best_expr_id).await
}
}
}

async fn egest_partial_child(
&self,
child: Child<GoalMemberId>,
) -> Result<Child<Arc<PartialPhysicalPlan>>, Error> {
match child {
Child::Singleton(member) => match member {
GoalMemberId::GoalId(goal_id) => {
let goal = self.memo.materialize_goal(goal_id).await?;
Ok(Child::Singleton(
PartialPhysicalPlan::UnMaterialized(goal).into(),
))
}
GoalMemberId::PhysicalExpressionId(expr_id) => {
let expr = self.memo.materialize_physical_expr(expr_id).await?;

let children = try_join_all(
expr.children
.iter()
.map(|child| self.egest_partial_child(child.clone())),
)
.await?;

let op = Operator {
tag: expr.tag,
data: expr.data,
children,
};

Ok(Child::Singleton(
PartialPhysicalPlan::Materialized(op).into(),
))
}
},
Child::VarLength(members) => {
let goals = try_join_all(members.into_iter().map(|member| async move {
match member {
GoalMemberId::GoalId(goal_id) => {
let goal = self.memo.materialize_goal(goal_id).await?;
Ok::<_, Error>(PartialPhysicalPlan::UnMaterialized(goal).into())
}
GoalMemberId::PhysicalExpressionId(expr_id) => {
let expr = self.memo.materialize_physical_expr(expr_id).await?;

let children = try_join_all(
expr.children
.iter()
.map(|child| self.egest_partial_child(child.clone())),
)
.await?;

let op = Operator {
tag: expr.tag,
data: expr.data,
children,
};

Ok(PartialPhysicalPlan::Materialized(op).into())
}
}
}))
.await?;

Ok(Child::VarLength(goals))
}
}
}
}
Loading
Loading