Skip to content

Commit

Permalink
add more docs to storage layer
Browse files Browse the repository at this point in the history
Signed-off-by: Yuchen Liang <[email protected]>
  • Loading branch information
yliang412 committed Feb 8, 2025
1 parent b5150ca commit 758671f
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 36 deletions.
18 changes: 12 additions & 6 deletions optd-core/src/cascades/memo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,51 @@ use anyhow::Result;

#[trait_variant::make(Send)]
pub trait Memoize: Send + Sync + 'static {
/// Gets all logical expressions in a group.
async fn get_all_logical_exprs_in_group(
&self,
group_id: RelationalGroupId,
) -> Result<Vec<(LogicalExpressionId, Arc<LogicalExpression>)>>;

// Returns the group id of new group if merge happened.
/// Adds a logical expression to an existing group.
/// Returns the group id of new group if merge happened.
async fn add_logical_expr_to_group(
&self,
logical_expr: &LogicalExpression,
group_id: RelationalGroupId,
) -> Result<RelationalGroupId>;

// Returns the group id of group if already exists, otherwise creates a new group.
/// Adds a logical expression to the memo table.
/// Returns the group id of group if already exists, otherwise creates a new group.
async fn add_logical_expr(&self, logical_expr: &LogicalExpression)
-> Result<RelationalGroupId>;

/// Gets all scalar expressions in a group.
async fn get_all_scalar_exprs_in_group(
&self,
group_id: ScalarGroupId,
) -> Result<Vec<(ScalarExpressionId, Arc<ScalarExpression>)>>;

// Returns the group id of new group if merge happened.
/// Adds a scalar expression to an existing group.
/// Returns the group id of new group if merge happened.
async fn add_scalar_expr_to_group(
&self,
scalar_expr: &ScalarExpression,
group_id: ScalarGroupId,
) -> Result<ScalarGroupId>;

// Returns the group id of group if already exists, otherwise creates a new group.
/// Adds a scalar expression to the memo table.
/// Returns the group id of group if already exists, otherwise creates a new group.
async fn add_scalar_expr(&self, scalar_expr: &ScalarExpression) -> Result<ScalarGroupId>;

// Merges two relational groups and returns the new group id.
/// Merges two relational groups and returns the new group id.
async fn merge_relation_group(
&self,
from: RelationalGroupId,
to: RelationalGroupId,
) -> Result<RelationalGroupId>;

// Merges two scalar groups and returns the new group id.
/// Merges two scalar groups and returns the new group id.
async fn merge_scalar_group(
&self,
from: ScalarGroupId,
Expand Down
28 changes: 23 additions & 5 deletions optd-core/src/storage/memo.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! An implementation of the memo table using SQLite.
use std::{str::FromStr, sync::Arc, time::Duration};

use super::transaction::Transaction;
Expand Down Expand Up @@ -184,8 +186,9 @@ impl Memoize for SqliteMemo {
}
}

// Memoize helpers
// Helper functions for implementing the `Memoize` trait.
impl SqliteMemo {
/// Gets the representative group id of a relational group.
async fn get_representative_group_id(
&self,
db: &mut SqliteConnection,
Expand All @@ -199,6 +202,7 @@ impl SqliteMemo {
Ok(representative_group_id)
}

/// Sets the representative group id of a relational group.
async fn set_representative_group_id(
&self,
db: &mut SqliteConnection,
Expand All @@ -213,6 +217,7 @@ impl SqliteMemo {
Ok(())
}

/// Gets the representative group id of a scalar group.
async fn get_representative_scalar_group_id(
&self,
db: &mut SqliteConnection,
Expand All @@ -226,6 +231,7 @@ impl SqliteMemo {
Ok(representative_group_id)
}

/// Sets the representative group id of a scalar group.
async fn set_representative_scalar_group_id(
&self,
db: &mut SqliteConnection,
Expand All @@ -240,6 +246,10 @@ impl SqliteMemo {
Ok(())
}

/// Inserts a scalar expression into the database. If the `add_to_group_id` is `Some`,
/// we will attempt to add the scalar expression to the specified group.
/// If the scalar expression already exists in the database, the existing group id will be returned.
/// Otherwise, a new group id will be created.
async fn add_scalar_expr_to_group_inner(
&self,
scalar_expr: &ScalarExpression,
Expand Down Expand Up @@ -304,9 +314,6 @@ impl SqliteMemo {
ScalarOperatorKind::Add,
)
.await?;
println!("add: {:?}", add);
println!("scalar_expr_id: {:?}", scalar_expr_id);
println!("group_id: {:?}", group_id);

sqlx::query_scalar("INSERT INTO scalar_adds (scalar_expression_id, group_id, left_group_id, right_group_id) VALUES ($1, $2, $3, $4) ON CONFLICT DO UPDATE SET group_id = group_id RETURNING group_id")
.bind(scalar_expr_id)
Expand Down Expand Up @@ -351,7 +358,7 @@ impl SqliteMemo {
Ok(inserted_group_id)
}

/// Inserts a scalar expression into the database.
/// Inserts an entry into the `scalar_expressions` table.
async fn insert_into_scalar_expressions(
db: &mut SqliteConnection,
scalar_expr_id: ScalarExpressionId,
Expand All @@ -368,6 +375,7 @@ impl SqliteMemo {
Ok(())
}

/// Removes a dangling scalar expression from the `scalar_expressions` table.
async fn remove_dangling_scalar_expr(
&self,
db: &mut SqliteConnection,
Expand All @@ -380,6 +388,10 @@ impl SqliteMemo {
Ok(())
}

/// Inserts a logical expression into the memo table. If the `add_to_group_id` is `Some`,
/// we will attempt to add the logical expression to the specified group.
/// If the logical expression already exists in the database, the existing group id will be returned.
/// Otherwise, a new group id will be created.
async fn add_logical_expr_to_group_inner(
&self,
logical_expr: &LogicalExpression,
Expand Down Expand Up @@ -477,6 +489,7 @@ impl SqliteMemo {
Ok(inserted_group_id)
}

/// Inserts an entry into the `logical_expressions` table.
async fn insert_into_logical_expressions(
txn: &mut SqliteConnection,
logical_expr_id: LogicalExpressionId,
Expand All @@ -493,6 +506,7 @@ impl SqliteMemo {
Ok(())
}

/// Removes a dangling logical expression from the `logical_expressions` table.
async fn remove_dangling_logical_expr(
&self,
db: &mut SqliteConnection,
Expand All @@ -507,6 +521,8 @@ impl SqliteMemo {
}

/// The SQL query to get all logical expressions in a group.
/// For each of the operators, the logical_expression_id is selected,
/// as well as the data fields in json form.
const fn get_all_logical_exprs_in_group_query() -> &'static str {
concat!(
"SELECT logical_expression_id, json_object('Scan', json_object('table_name', json(table_name), 'predicate', predicate_group_id)) as data FROM scans WHERE group_id = $1",
Expand All @@ -518,6 +534,8 @@ const fn get_all_logical_exprs_in_group_query() -> &'static str {
}

/// The SQL query to get all scalar expressions in a group.
/// For each of the operators, the scalar_expression_id is selected,
/// as well as the data fields in json form.
const fn get_all_scalar_exprs_in_group_query() -> &'static str {
concat!(
"SELECT scalar_expression_id, json_object('Constant', json(value)) as data FROM scalar_constants WHERE group_id = $1",
Expand Down
56 changes: 31 additions & 25 deletions optd-core/src/storage/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! A transaction that wraps a SQLite transaction while making it easy to generate new identifiers.
use sqlx::SqliteConnection;
use std::ops::{Deref, DerefMut};

Expand All @@ -6,38 +8,18 @@ use crate::cascades::{
groups::{RelationalGroupId, ScalarGroupId},
};

/// Sequence is a unique generator for the entities in the optd storage layer.
struct Sequence;

impl Sequence {
/// Returns the current value in the sequence.
pub async fn value(db: &mut SqliteConnection) -> anyhow::Result<i64> {
let value = sqlx::query_scalar!("SELECT current_value FROM id_sequences WHERE id = 0")
.fetch_one(db)
.await?;
Ok(value)
}

/// Sets the current value of the sequence to the given value.
pub async fn set_value(db: &mut SqliteConnection, value: i64) -> anyhow::Result<()> {
sqlx::query!(
"UPDATE id_sequences SET current_value = ? WHERE id = 0",
value
)
.execute(db)
.await?;
Ok(())
}
}

/// A transaction that wraps a SQLite transaction.
pub struct Transaction<'c> {
/// An active SQLite transaction.
txn: sqlx::Transaction<'c, sqlx::Sqlite>,
/// The current value of the sequence in the transaction.
/// The value is read from the database on transaction start and
/// persisted back to the database on commit.
current_value: i64,
}

impl Transaction<'_> {
/// Creates a new transaction.
pub async fn new(
mut txn: sqlx::Transaction<'_, sqlx::Sqlite>,
) -> anyhow::Result<Transaction<'_>> {
Expand Down Expand Up @@ -102,14 +84,38 @@ impl DerefMut for Transaction<'_> {
}
}

// TODO(alexis): This just checks the sequencing logic.
/// Sequence is a unique generator for the entities in the optd storage layer.
struct Sequence;

impl Sequence {
/// Returns the current value in the sequence.
pub async fn value(db: &mut SqliteConnection) -> anyhow::Result<i64> {
let value = sqlx::query_scalar!("SELECT current_value FROM id_sequences WHERE id = 0")
.fetch_one(db)
.await?;
Ok(value)
}

/// Sets the current value of the sequence to the given value.
pub async fn set_value(db: &mut SqliteConnection, value: i64) -> anyhow::Result<()> {
sqlx::query!(
"UPDATE id_sequences SET current_value = ? WHERE id = 0",
value
)
.execute(db)
.await?;
Ok(())
}
}

#[cfg(test)]
mod tests {

use crate::storage::memo::SqliteMemo;

use super::*;

/// Test if the sequence is working correctly with the transaction.
#[tokio::test]
async fn test_sequence() -> anyhow::Result<()> {
let storage = SqliteMemo::new_in_memory().await?;
Expand Down

0 comments on commit 758671f

Please sign in to comment.