Skip to content

Commit

Permalink
use trigger to merge groups
Browse files Browse the repository at this point in the history
Signed-off-by: Yuchen Liang <[email protected]>
  • Loading branch information
yliang412 committed Feb 3, 2025
1 parent 2f00009 commit 00be567
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 105 deletions.
70 changes: 36 additions & 34 deletions optd-core/src/memo/persistent_memo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,27 @@ pub struct PersistentMemo {
}

impl PersistentMemo {
/// Create a new persistent memo table backed by a SQLite database at the given URL.
pub async fn new(database_url: &str) -> anyhow::Result<Self> {

Check warning on line 17 in optd-core/src/memo/persistent_memo.rs

View workflow job for this annotation

GitHub Actions / clippy

[clippy] optd-core/src/memo/persistent_memo.rs#L17

warning: docs for function returning `Result` missing `# Errors` section --> optd-core/src/memo/persistent_memo.rs:17:5 | 17 | pub async fn new(database_url: &str) -> anyhow::Result<Self> { | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#missing_errors_doc note: the lint level is defined here --> optd-core/src/lib.rs:5:9 | 5 | #![warn(clippy::missing_errors_doc)] | ^^^^^^^^^^^^^^^^^^^^^^^^^^
Raw output
optd-core/src/memo/persistent_memo.rs:17:5:w:warning: docs for function returning `Result` missing `# Errors` section
  --> optd-core/src/memo/persistent_memo.rs:17:5
   |
17 |     pub async fn new(database_url: &str) -> anyhow::Result<Self> {
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   |
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#missing_errors_doc
note: the lint level is defined here
  --> optd-core/src/lib.rs:5:9
   |
5  | #![warn(clippy::missing_errors_doc)]
   |         ^^^^^^^^^^^^^^^^^^^^^^^^^^


__END__
let storage = StorageManager::new(database_url).await?;
storage.migrate().await?;
let get_all_logical_exprs_in_group_query = get_all_logical_exprs_in_group_query();
storage
.db()
.await?
.prepare(&get_all_logical_exprs_in_group_query)
.await?;
Ok(Self {
storage,
get_all_logical_exprs_in_group_query,
})
}

/// Create a new persistent memo table backed by an in-memory SQLite database.
pub async fn new_in_memory() -> anyhow::Result<Self> {

Check warning on line 33 in optd-core/src/memo/persistent_memo.rs

View workflow job for this annotation

GitHub Actions / clippy

[clippy] optd-core/src/memo/persistent_memo.rs#L33

warning: docs for function returning `Result` missing `# Errors` section --> optd-core/src/memo/persistent_memo.rs:33:5 | 33 | pub async fn new_in_memory() -> anyhow::Result<Self> { | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#missing_errors_doc
Raw output
optd-core/src/memo/persistent_memo.rs:33:5:w:warning: docs for function returning `Result` missing `# Errors` section
  --> optd-core/src/memo/persistent_memo.rs:33:5
   |
33 |     pub async fn new_in_memory() -> anyhow::Result<Self> {
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   |
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#missing_errors_doc


__END__
Self::new("sqlite::memory:").await
}

/// Creates a new scalar group for testing purposes.
#[cfg(test)]
pub async fn new_scalar_group_for_test(&self) -> anyhow::Result<ScalarGroupId> {
Expand All @@ -30,20 +51,6 @@ impl PersistentMemo {
Ok(scalar_group_id)
}

/// Create a new persistent memo table.
pub async fn new(storage: StorageManager) -> anyhow::Result<Self> {
let get_all_logical_exprs_in_group_query = get_all_logical_exprs_in_group_query();
storage
.db()
.await?
.prepare(&get_all_logical_exprs_in_group_query)
.await?;
Ok(Self {
storage,
get_all_logical_exprs_in_group_query,
})
}

async fn get_representative_group_id(
&self,
db: &mut SqliteConnection,
Expand Down Expand Up @@ -287,34 +294,35 @@ fn get_all_logical_exprs_in_group_query() -> String {

#[cfg(test)]
mod tests {
use crate::operator::relational::{logical::join::JoinType, RelationChildren};
use crate::operator::relational::{
logical::{self, join::JoinType},
RelationChildren,
};

use super::*;

#[tokio::test]
async fn test_insert_logical_expr_with_memo() -> anyhow::Result<()> {
let storage = StorageManager::new_in_memory().await?;
storage.migrate().await?;
let memo = PersistentMemo::new(storage).await?;
let memo = PersistentMemo::new_in_memory().await?;

let predicate_group_id = memo.new_scalar_group_for_test().await?;
let scan1 = LogicalExpression::scan("t1", predicate_group_id);
let scan1 = logical::scan("t1", predicate_group_id);
let scan1_group = memo.add_logical_expr(&scan1).await?;
let dup_scan1_group = memo.add_logical_expr(&scan1).await?;
assert_eq!(scan1_group, dup_scan1_group);
let status = memo.get_group_exploration_status(scan1_group).await?;
assert_eq!(status, ExplorationStatus::Unexplored);

let predicate_group_id = memo.new_scalar_group_for_test().await?;
let scan2 = LogicalExpression::scan("t2", predicate_group_id);
let scan2 = logical::scan("t2", predicate_group_id);
let scan2_group = memo.add_logical_expr(&scan2).await?;
let dup_scan2_group = memo.add_logical_expr(&scan2).await?;
assert_eq!(scan2_group, dup_scan2_group);
let status = memo.get_group_exploration_status(scan1_group).await?;
assert_eq!(status, ExplorationStatus::Unexplored);

let join_cond_group_id = memo.new_scalar_group_for_test().await?;
let join = LogicalExpression::join(
let join = logical::join(
JoinType::Inner,
scan1_group,
scan2_group,
Expand All @@ -329,7 +337,7 @@ mod tests {
memo.set_group_exploration_status(join_group, ExplorationStatus::Exploring)
.await?;

let join_alt = LogicalExpression::join(
let join_alt = logical::join(
JoinType::Inner,
scan2_group,
scan1_group,
Expand Down Expand Up @@ -371,32 +379,26 @@ mod tests {

#[tokio::test]
async fn test_merge_group() -> anyhow::Result<()> {
let storage = StorageManager::new("sqlite://memo.db?mode=rwc").await?;
storage.migrate().await?;
let memo = PersistentMemo::new(storage).await?;
let memo = PersistentMemo::new_in_memory().await?;

let true_predicate_group = memo.new_scalar_group_for_test().await?;
let scan = LogicalExpression::scan("t1", true_predicate_group);
let scan = logical::scan("t1", true_predicate_group);
let scan_group = memo.add_logical_expr(&scan).await?;

let filter = LogicalExpression::filter(scan_group, true_predicate_group);
let filter = logical::filter(scan_group, true_predicate_group);
let filter_group = memo.add_logical_expr(&filter).await?;
let one_equal_one_predicate_group = memo.new_scalar_group_for_test().await?;
let top_filter = LogicalExpression::filter(filter_group, one_equal_one_predicate_group);
let top_filter = logical::filter(filter_group, one_equal_one_predicate_group);
let top_filter_group = memo.add_logical_expr(&top_filter).await?;
let top_filter_2 = LogicalExpression::filter(scan_group, one_equal_one_predicate_group);
let top_filter_2 = logical::filter(scan_group, one_equal_one_predicate_group);
let top_filter_2_group = memo
.add_logical_expr_to_group(&top_filter_2, top_filter_group)
.await?;
assert_eq!(top_filter_group, top_filter_2_group);
let new_group = memo
let _ = memo
.add_logical_expr_to_group(&scan, top_filter_group)
.await?;

println!("new group: {new_group:?}");

// Filter: true
// - Scan: t1
Ok(())
}
}
45 changes: 27 additions & 18 deletions optd-core/src/operator/relational/logical/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,35 @@ pub enum LogicalOperatorKind {
Join,
}

impl<Relation, Scalar> LogicalOperator<Relation, Scalar> {
/// Creates a scan logical operator.
pub fn scan(table_name: &str, predicate: Scalar) -> Self {
Self::Scan(Scan::new(table_name, predicate))
}
/// Creates a scan logical operator.
pub fn scan<Relation, Scalar>(
table_name: &str,
predicate: Scalar,
) -> LogicalOperator<Relation, Scalar> {
LogicalOperator::Scan(Scan::new(table_name, predicate))
}

/// Creates a filter logical operator.
pub fn filter(child: Relation, predicate: Scalar) -> Self {
Self::Filter(Filter::new(child, predicate))
}
/// Creates a filter logical operator.
pub fn filter<Relation, Scalar>(
child: Relation,
predicate: Scalar,
) -> LogicalOperator<Relation, Scalar> {
LogicalOperator::Filter(Filter::new(child, predicate))
}

/// Creates a join logical operator.
pub fn join(join_type: JoinType, left: Relation, right: Relation, condition: Scalar) -> Self {
Self::Join(Join {
join_type,
left,
right,
condition,
})
}
/// Creates a join logical operator.
pub fn join<Relation, Scalar>(
join_type: JoinType,
left: Relation,
right: Relation,
condition: Scalar,
) -> LogicalOperator<Relation, Scalar> {
LogicalOperator::Join(Join {
join_type,
left,
right,
condition,
})
}

impl<Relation, Scalar> RelationChildren for LogicalOperator<Relation, Scalar>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,3 @@ CREATE TABLE relation_groups (
FOREIGN KEY (representative_group_id) REFERENCES relation_groups (id)
ON UPDATE CASCADE ON DELETE CASCADE
);

CREATE UNIQUE INDEX relation_groups_representative_group_id ON relation_groups (representative_group_id);
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,3 @@ CREATE TABLE scalar_groups (
FOREIGN KEY (representative_group_id) REFERENCES scalar_groups (id)
ON UPDATE CASCADE ON DELETE CASCADE
);

CREATE UNIQUE INDEX scalar_groups_representative_group_id ON scalar_groups (representative_group_id);
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ CREATE TABLE logical_expressions (
-- A unique identifier for a logical expression in the optimizer.
id INTEGER NOT NULL PRIMARY KEY,
-- The representative group that a logical expression belongs to.
group_id BIGINT NOT NULL,
group_id BIGINT NOT NULL ON CONFLICT REPLACE,
-- The kind of the logical operator.
operator_kind TEXT NOT NULL,
-- The exploration status of a logical expression.
Expand All @@ -11,6 +11,12 @@ CREATE TABLE logical_expressions (
-- The time at which the logical expression is created.
created_at TIMESTAMP DEFAULT (CURRENT_TIMESTAMP) NOT NULL,
-- When group merging happens, the group id of the logical expression is also updated.
FOREIGN KEY (group_id) REFERENCES relation_groups (representative_group_id)
FOREIGN KEY (group_id) REFERENCES relation_groups (id)
ON UPDATE CASCADE ON DELETE CASCADE
);

CREATE TRIGGER update_logical_expressions_relation_group_ids
AFTER UPDATE OF representative_group_id ON relation_groups
BEGIN
UPDATE OR REPLACE logical_expressions SET group_id = NEW.representative_group_id WHERE group_id = OLD.representative_group_id;
END;
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,23 @@ CREATE TABLE scans (
predicate_group_id BIGINT NOT NULL,
FOREIGN KEY (logical_expression_id) REFERENCES logical_expressions (id)
ON UPDATE CASCADE ON DELETE CASCADE,
FOREIGN KEY (group_id) REFERENCES relation_groups (representative_group_id)
FOREIGN KEY (predicate_group_id) REFERENCES scalar_groups (representative_group_id)
FOREIGN KEY (group_id) REFERENCES relation_groups (id)
ON UPDATE CASCADE ON DELETE CASCADE,
FOREIGN KEY (predicate_group_id) REFERENCES scalar_groups (id)
ON UPDATE CASCADE ON DELETE CASCADE
);

-- Unique index on scan's data fields.
CREATE UNIQUE INDEX scans_data_fields ON scans (table_name, predicate_group_id);
CREATE UNIQUE INDEX scans_data_fields ON scans (table_name, predicate_group_id);

CREATE TRIGGER update_scans_relation_group_ids
AFTER UPDATE OF representative_group_id ON relation_groups
BEGIN
UPDATE OR REPLACE scans SET group_id = NEW.representative_group_id WHERE group_id = OLD.representative_group_id;
END;

CREATE TRIGGER update_scans_scalar_group_ids
AFTER UPDATE OF representative_group_id ON scalar_groups
BEGIN
UPDATE OR REPLACE scans SET predicate_group_id = NEW.representative_group_id WHERE predicate_group_id = OLD.representative_group_id;
END;
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,27 @@ CREATE TABLE filters (

FOREIGN KEY (logical_expression_id) REFERENCES logical_expressions (id)
ON UPDATE CASCADE ON DELETE CASCADE,
FOREIGN KEY (group_id) REFERENCES relation_groups (representative_group_id)
FOREIGN KEY (group_id) REFERENCES relation_groups (id)
ON UPDATE CASCADE ON DELETE CASCADE,
FOREIGN KEY (child_group_id) REFERENCES relation_groups (representative_group_id)
FOREIGN KEY (child_group_id) REFERENCES relation_groups (id)
ON UPDATE CASCADE ON DELETE CASCADE,
FOREIGN KEY (predicate_group_id) REFERENCES scalar_groups (representative_group_id)
FOREIGN KEY (predicate_group_id) REFERENCES scalar_groups (id)
ON UPDATE CASCADE ON DELETE CASCADE
);

-- Unique index on join's data fields.
CREATE UNIQUE INDEX filters_data_fields ON filters (child_group_id, predicate_group_id);

CREATE TRIGGER update_filters_relation_group_ids
AFTER UPDATE OF representative_group_id ON relation_groups
BEGIN
UPDATE OR REPLACE filters SET group_id = NEW.representative_group_id WHERE group_id = OLD.representative_group_id;
UPDATE OR REPLACE filters SET child_group_id = NEW.representative_group_id WHERE child_group_id = OLD.representative_group_id;
END;


CREATE TRIGGER update_filters_scalar_group_ids
AFTER UPDATE OF representative_group_id ON scalar_groups
BEGIN
UPDATE OR REPLACE filters SET predicate_group_id = NEW.representative_group_id WHERE predicate_group_id = OLD.representative_group_id;
END;
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,29 @@ CREATE TABLE joins (

FOREIGN KEY (logical_expression_id) REFERENCES logical_expressions (id)
ON UPDATE CASCADE ON DELETE CASCADE,
FOREIGN KEY (group_id) REFERENCES relation_groups (representative_group_id),
FOREIGN KEY (left_group_id) REFERENCES relation_groups (representative_group_id)
FOREIGN KEY (group_id) REFERENCES relation_groups (id),
FOREIGN KEY (left_group_id) REFERENCES relation_groups (id)
ON UPDATE CASCADE ON DELETE CASCADE
FOREIGN KEY (right_group_id) REFERENCES relation_groups (representative_group_id)
FOREIGN KEY (right_group_id) REFERENCES relation_groups (id)
ON UPDATE CASCADE ON DELETE CASCADE
FOREIGN KEY (condition_group_id) REFERENCES scalar_groups (representative_group_id)
FOREIGN KEY (condition_group_id) REFERENCES scalar_groups (id)
ON UPDATE CASCADE ON DELETE CASCADE
);

-- Unique index on join's data fields.
CREATE UNIQUE INDEX joins_data_fields ON joins (join_type, left_group_id, right_group_id, condition_group_id);

CREATE TRIGGER update_joins_relation_group_ids
AFTER UPDATE OF representative_group_id ON relation_groups
BEGIN
UPDATE OR REPLACE joins SET group_id = NEW.representative_group_id WHERE group_id = OLD.representative_group_id;
UPDATE OR REPLACE joins SET left_group_id = NEW.representative_group_id WHERE left_group_id = OLD.representative_group_id;
UPDATE OR REPLACE joins SET right_group_id = NEW.representative_group_id WHERE right_group_id = OLD.representative_group_id;
END;


CREATE TRIGGER update_joins_scalar_group_ids
AFTER UPDATE OF representative_group_id ON scalar_groups
BEGIN
UPDATE OR REPLACE joins SET condition_group_id = NEW.representative_group_id WHERE condition_group_id = OLD.representative_group_id;
END;
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ CREATE TABLE projects (

FOREIGN KEY (logical_expression_id) REFERENCES logical_expressions (id)
ON UPDATE CASCADE ON DELETE CASCADE,
FOREIGN KEY (child_group_id) REFERENCES relation_groups (representative_group_id)
FOREIGN KEY (child_group_id) REFERENCES relation_groups (id)
ON UPDATE CASCADE ON DELETE CASCADE,
FOREIGN KEY (fields_id) REFERENCES _projection_fields (id)
ON UPDATE CASCADE ON DELETE CASCADE
Expand Down
Loading

0 comments on commit 00be567

Please sign in to comment.