Skip to content

Commit a21b1bc

Browse files
gefjonCentril
andauthored
Nuke to_mem_table_with_op_type (#990)
* Nuke `to_mem_table_with_op_type` Rather than annotating rows with `__op_type` during `eval_incr` of selects, partition the rows before evaluation, then merge after. * Add historical comment. Co-authored-by: Mazdak Farrokhzad <[email protected]> Signed-off-by: Phoebe Goldman <[email protected]> * Remove `_replaced_source_id` --------- Signed-off-by: Phoebe Goldman <[email protected]> Co-authored-by: Mazdak Farrokhzad <[email protected]>
1 parent e270d97 commit a21b1bc

File tree

2 files changed

+69
-123
lines changed

2 files changed

+69
-123
lines changed

crates/core/src/subscription/execution_unit.rs

Lines changed: 60 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::query::{self, find_op_type_col_pos, Supported, OP_TYPE_FIELD_NAME};
1+
use super::query::{self, Supported};
22
use super::subscription::{IncrementalJoin, SupportedQuery};
33
use crate::db::relational_db::{RelationalDB, Tx};
44
use crate::error::DBError;
@@ -9,11 +9,11 @@ use crate::vm::{build_query, TxMode};
99
use spacetimedb_client_api_messages::client_api::{TableRowOperation, TableUpdate};
1010
use spacetimedb_lib::bsatn::to_writer;
1111
use spacetimedb_primitives::TableId;
12-
use spacetimedb_sats::relation::{DbTable, Header};
12+
use spacetimedb_sats::relation::DbTable;
1313
use spacetimedb_vm::eval::IterRows;
1414
use spacetimedb_vm::expr::{Query, QueryExpr, SourceExpr, SourceSet};
1515
use spacetimedb_vm::rel_ops::RelOps;
16-
use spacetimedb_vm::relation::RelValue;
16+
use spacetimedb_vm::relation::{MemTable, RelValue};
1717
use std::hash::Hash;
1818

1919
/// A hash for uniquely identifying query execution units,
@@ -58,7 +58,7 @@ enum EvalIncrPlan {
5858
Semijoin(IncrementalJoin),
5959

6060
/// For single-table selects, store only one version of the plan,
61-
/// which has a single source, a [`MemTable`] produced by [`query::to_mem_table_with_op_type`].
61+
/// which has a single source, a [`MemTable`], produced by [`query::query_to_mem_table`].
6262
Select(QueryExpr),
6363
}
6464

@@ -100,7 +100,6 @@ impl From<SupportedQuery> for ExecutionUnit {
100100

101101
impl ExecutionUnit {
102102
/// Pre-compute a plan for `eval_incr` which reads from a `MemTable`
103-
/// whose rows are augmented with an `__op_type` column,
104103
/// rather than re-planning on every incremental update.
105104
fn compile_select_eval_incr(expr: &QueryExpr) -> QueryExpr {
106105
let source = expr
@@ -126,7 +125,7 @@ impl ExecutionUnit {
126125
// Some day down the line, when we have a real query planner,
127126
// we may need to provide a row count estimation that is, if not accurate,
128127
// at least less specifically inaccurate.
129-
let (eval_incr_plan, _source_set) = query::to_mem_table(expr.clone(), &table_update);
128+
let (eval_incr_plan, _source_set) = query::query_to_mem_table(expr.clone(), &table_update);
130129
debug_assert_eq!(_source_set.len(), 1);
131130

132131
eval_incr_plan
@@ -278,6 +277,21 @@ impl ExecutionUnit {
278277
}))
279278
}
280279

280+
fn eval_query_expr_against_memtable<'a>(
281+
ctx: &'a ExecutionContext,
282+
db: &'a RelationalDB,
283+
tx: &'a TxMode,
284+
mem_table: MemTable,
285+
eval_incr_plan: &'a QueryExpr,
286+
) -> Result<Box<IterRows<'a>>, DBError> {
287+
// Build a `SourceSet` containing the updates from `table`.
288+
let mut sources = SourceSet::default();
289+
sources.add_mem_table(mem_table);
290+
// Evaluate the saved plan against the new `SourceSet`,
291+
// returning an iterator over the selected rows.
292+
build_query(ctx, db, tx, eval_incr_plan, &mut sources).map_err(Into::into)
293+
}
294+
281295
fn eval_incr_query_expr<'a>(
282296
db: &RelationalDB,
283297
tx: &Tx,
@@ -289,51 +303,64 @@ impl ExecutionUnit {
289303
let tx: TxMode = tx.into();
290304

291305
let SourceExpr::MemTable {
292-
source_id: _source_id,
293306
ref header,
294307
table_access,
295308
..
296309
} = eval_incr_plan.source
297310
else {
298311
panic!("Expected MemTable in `eval_incr_plan`, but found `DbTable`");
299312
};
313+
314+
// Partition the `update` into two `MemTable`s, `(inserts, deletes)`,
315+
// so that we can remember which are which without adding a column to each row.
316+
// Previously, we used to add such a column `"__op_type: AlgebraicType::U8"`.
317+
let partition_updates = |update: &DatabaseTableUpdate| -> (Option<MemTable>, Option<MemTable>) {
318+
// Pre-allocate with capacity given by an upper bound,
319+
// because realloc is worse than over-allocing.
320+
let mut inserts = Vec::with_capacity(update.ops.len());
321+
let mut deletes = Vec::with_capacity(update.ops.len());
322+
for op in update.ops.iter() {
323+
// 0 = delete, 1 = insert
324+
if op.op_type == 0 { &mut deletes } else { &mut inserts }.push(op.row.clone());
325+
}
326+
(
327+
(!inserts.is_empty()).then(|| MemTable::new(header.clone(), table_access, inserts)),
328+
(!deletes.is_empty()).then(|| MemTable::new(header.clone(), table_access, deletes)),
329+
)
330+
};
331+
300332
let mut ops = Vec::new();
301333

302334
for table in tables.filter(|table| table.table_id == return_table) {
303-
// Build a `SourceSet` containing the updates from `table`.
304-
let mem_table = query::to_mem_table_with_op_type(header.clone(), table_access, table);
305-
let mut sources = SourceSet::default();
306-
let _source_expr = sources.add_mem_table(mem_table);
307-
debug_assert_eq!(_source_expr.source_id(), Some(_source_id));
308-
// Evaluate the saved plan against the new `SourceSet`
309-
// and capture the new row operations.
310-
let query = build_query(&ctx, db, &tx, eval_incr_plan, &mut sources)?;
311-
Self::collect_rows_remove_table_ops(&mut ops, query, header)?;
335+
// Evaluate the query separately against inserts and deletes,
336+
// so that we can pass each row to the query engine unaltered,
337+
// without forgetting which are inserts and which are deletes.
338+
// Then, collect the rows into the single `ops` vec,
339+
// restoring the appropriate `op_type`.
340+
let (inserts, deletes) = partition_updates(table);
341+
if let Some(inserts) = inserts {
342+
let query = Self::eval_query_expr_against_memtable(&ctx, db, &tx, inserts, eval_incr_plan)?;
343+
// op_type 1: insert
344+
Self::collect_rows_with_table_op(&mut ops, query, 1)?;
345+
}
346+
if let Some(deletes) = deletes {
347+
let query = Self::eval_query_expr_against_memtable(&ctx, db, &tx, deletes, eval_incr_plan)?;
348+
// op_type 0: delete
349+
Self::collect_rows_with_table_op(&mut ops, query, 0)?;
350+
}
312351
}
313352
Ok(ops)
314353
}
315354

316-
/// Convert a set of rows annotated with the `__op_type` fields into a set of [`TableOp`]s,
317-
/// and collect them into a vec `into`.
318-
fn collect_rows_remove_table_ops(
355+
/// Collect the results of `query` into a vec `into`,
356+
/// annotating each as a `TableOp` with the `op_type`.
357+
fn collect_rows_with_table_op(
319358
into: &mut Vec<TableOp>,
320359
mut query: Box<IterRows<'_>>,
321-
header: &Header,
360+
op_type: u8,
322361
) -> Result<(), DBError> {
323-
let pos_op_type = find_op_type_col_pos(header).unwrap_or_else(|| {
324-
panic!(
325-
"Failed to locate `{OP_TYPE_FIELD_NAME}` in `{}`, fields: {:?}",
326-
header.table_name,
327-
header.fields.iter().map(|x| &x.field).collect::<Vec<_>>()
328-
)
329-
});
330-
let pos_op_type = pos_op_type.idx();
331362
while let Some(row_ref) = query.next()? {
332-
let mut row = row_ref.into_product_value();
333-
let op_type =
334-
row.elements.remove(pos_op_type).into_u8().unwrap_or_else(|_| {
335-
panic!("Failed to extract `{OP_TYPE_FIELD_NAME}` from `{}`", header.table_name)
336-
});
363+
let row = row_ref.into_product_value();
337364
into.push(TableOp::new(op_type, row));
338365
}
339366
Ok(())

crates/core/src/subscription/query.rs

Lines changed: 9 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,8 @@ use once_cell::sync::Lazy;
99
use regex::Regex;
1010
use spacetimedb_lib::identity::AuthCtx;
1111
use spacetimedb_lib::Address;
12-
use spacetimedb_primitives::ColId;
13-
use spacetimedb_sats::db::auth::StAccess;
14-
use spacetimedb_sats::relation::{Column, FieldName, Header};
15-
use spacetimedb_sats::AlgebraicType;
1612
use spacetimedb_vm::expr::{self, Crud, CrudExpr, DbType, QueryExpr, SourceSet};
1713
use spacetimedb_vm::relation::MemTable;
18-
use std::sync::Arc;
1914
use std::time::Instant;
2015

2116
use super::subscription::get_all;
@@ -28,90 +23,14 @@ pub enum QueryDef {
2823
Sql(String),
2924
}
3025

31-
pub const OP_TYPE_FIELD_NAME: &str = "__op_type";
32-
33-
/// Locate the `__op_type` column in the table described by `header`,
34-
/// if it exists.
35-
///
36-
/// The current version of this function depends on the fact that
37-
/// the `__op_type` column is always the final column in the schema.
38-
/// This is true because the `__op_type` column is added by [`to_mem_table_with_op_type`] at the end,
39-
/// and never originates anywhere else.
40-
///
41-
/// If we ever change to having the `__op_type` column in any other position,
42-
/// e.g. by projecting together two `MemTables` from [`to_mem_table_with_op_type`],
43-
/// this function may need to change, possibly to:
44-
/// ```ignore
45-
/// header.find_pos_by_name(OP_TYPE_FIELD_NAME)
46-
/// ```
47-
pub fn find_op_type_col_pos(header: &Header) -> Option<ColId> {
48-
if let Some(last_col) = header.fields.last() {
49-
if last_col.field.field_name() == Some(OP_TYPE_FIELD_NAME) {
50-
return Some(ColId((header.fields.len() - 1) as u32));
51-
}
52-
}
53-
None
54-
}
55-
56-
/// Create a virtual table from a sequence of table updates.
57-
/// Add a special column __op_type to distinguish inserts and deletes.
58-
pub fn to_mem_table_with_op_type(head: Arc<Header>, table_access: StAccess, data: &DatabaseTableUpdate) -> MemTable {
59-
let mut t = MemTable::new(head, table_access, Vec::with_capacity(data.ops.len()));
60-
61-
if let Some(pos) = find_op_type_col_pos(&t.head) {
62-
for op in data.ops.iter().map(|row| {
63-
let mut new = row.row.clone();
64-
65-
match new.elements.len().cmp(&pos.idx()) {
66-
std::cmp::Ordering::Equal => {
67-
// When we enter through `ExecutionUnit::eval_incr`,
68-
// we will have a `head` computed by a previous call to `to_mem_table`,
69-
// and therefore will have an op_type column,
70-
// but the `data` will be fresh for a newly committed transaction,
71-
// and therefore the rows will not include the op_type column.
72-
// In that case, push the op_type onto the end of each row.
73-
new.elements.push(row.op_type.into());
74-
}
75-
std::cmp::Ordering::Greater => {
76-
new.elements[pos.idx()] = row.op_type.into();
77-
}
78-
std::cmp::Ordering::Less => {
79-
panic!(
80-
"Expected {} either in-bounds or as the last column, but found at position {} in {:?}",
81-
OP_TYPE_FIELD_NAME, pos, t.head,
82-
);
83-
}
84-
}
85-
86-
new
87-
}) {
88-
t.data.push(op);
89-
}
90-
} else {
91-
// TODO(perf): Eliminate this `clone_for_error` call, as we're not in an error path.
92-
let mut head = t.head.clone_for_error();
93-
head.fields.push(Column::new(
94-
FieldName::named(&t.head.table_name, OP_TYPE_FIELD_NAME),
95-
AlgebraicType::U8,
96-
t.head.fields.len().into(),
97-
));
98-
t.head = Arc::new(head);
99-
for row in &data.ops {
100-
let mut new = row.row.clone();
101-
new.elements.push(row.op_type.into());
102-
t.data.push(new);
103-
}
104-
}
105-
t
106-
}
107-
10826
/// Replace the primary (ie. `source`) table of the given [`QueryExpr`] with
10927
/// a virtual [`MemTable`] consisting of the rows in [`DatabaseTableUpdate`].
110-
///
111-
/// To be able to reify the `op_type` of the individual operations in the update,
112-
/// each virtual row is extended with a column [`OP_TYPE_FIELD_NAME`].
113-
pub fn to_mem_table(mut of: QueryExpr, data: &DatabaseTableUpdate) -> (QueryExpr, SourceSet) {
114-
let mem_table = to_mem_table_with_op_type(of.source.head().clone(), of.source.table_access(), data);
28+
pub fn query_to_mem_table(mut of: QueryExpr, data: &DatabaseTableUpdate) -> (QueryExpr, SourceSet) {
29+
let mem_table = MemTable::new(
30+
of.source.head().clone(),
31+
of.source.table_access(),
32+
data.ops.iter().map(|op| op.row.clone()).collect(),
33+
);
11534
let mut sources = SourceSet::default();
11635
let source_expr = sources.add_mem_table(mem_table);
11736
of.source = source_expr;
@@ -247,7 +166,7 @@ mod tests {
247166
use spacetimedb_sats::db::auth::{StAccess, StTableType};
248167
use spacetimedb_sats::db::def::*;
249168
use spacetimedb_sats::relation::FieldName;
250-
use spacetimedb_sats::{product, ProductType, ProductValue};
169+
use spacetimedb_sats::{product, AlgebraicType, ProductType, ProductValue};
251170
use spacetimedb_vm::dsl::{mem_table, scalar};
252171
use spacetimedb_vm::operator::OpCmp;
253172

@@ -366,7 +285,7 @@ mod tests {
366285
q: &QueryExpr,
367286
data: &DatabaseTableUpdate,
368287
) -> ResultTest<()> {
369-
let (q, sources) = to_mem_table(q.clone(), data);
288+
let (q, sources) = query_to_mem_table(q.clone(), data);
370289
let result = run_query(
371290
&ExecutionContext::default(),
372291
db,
@@ -556,7 +475,7 @@ mod tests {
556475

557476
let q = QueryExpr::new(&schema);
558477

559-
let (q, sources) = to_mem_table(q, &data);
478+
let (q, sources) = query_to_mem_table(q, &data);
560479
//Try access the private table
561480
match run_query(
562481
&ExecutionContext::default(),

0 commit comments

Comments
 (0)