Skip to content

Commit 5cbdcae

Browse files
authored
incr-join, find_updates: avoid unncecessary clones & use partition (#988)
* incr-join, find_updates: avoid unncecessary clones & use partition * JoinSide: store 'Vec<PV>'s instead * address joshua & phoebe's reviews
1 parent a21b1bc commit 5cbdcae

File tree

1 file changed

+56
-101
lines changed

1 file changed

+56
-101
lines changed

crates/core/src/subscription/subscription.rs

Lines changed: 56 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use spacetimedb_lib::identity::AuthCtx;
4040
use spacetimedb_lib::ProductValue;
4141
use spacetimedb_primitives::TableId;
4242
use spacetimedb_sats::db::auth::{StAccess, StTableType};
43-
use spacetimedb_sats::relation::{DbTable, Header};
43+
use spacetimedb_sats::relation::DbTable;
4444
use spacetimedb_vm::expr::{self, IndexJoin, Query, QueryExpr, SourceSet};
4545
use spacetimedb_vm::rel_ops::RelOps;
4646
use spacetimedb_vm::relation::MemTable;
@@ -209,32 +209,21 @@ pub struct IncrementalJoin {
209209

210210
/// One side of an [`IncrementalJoin`].
211211
///
212-
/// Holds the "physical" [`DbTable`] this side of the join operates on, as well
213-
/// as the [`DatabaseTableUpdate`]s pertaining that table.
212+
/// Holds the updates pertaining to a table on one side of the join.
214213
struct JoinSide {
215-
table_id: TableId,
216-
table_name: String,
217-
inserts: Vec<TableOp>,
218-
deletes: Vec<TableOp>,
214+
inserts: Vec<ProductValue>,
215+
deletes: Vec<ProductValue>,
219216
}
220217

221218
impl JoinSide {
222-
/// Return a [`DatabaseTableUpdate`] consisting of only insert operations.
223-
pub fn inserts(&self) -> DatabaseTableUpdate {
224-
DatabaseTableUpdate {
225-
table_id: self.table_id,
226-
table_name: self.table_name.clone(),
227-
ops: self.inserts.to_vec(),
228-
}
219+
/// Return a list of updates consisting of only insert operations.
220+
pub fn inserts(&self) -> Vec<ProductValue> {
221+
self.inserts.clone()
229222
}
230223

231-
/// Return a [`DatabaseTableUpdate`] with only delete operations.
232-
pub fn deletes(&self) -> DatabaseTableUpdate {
233-
DatabaseTableUpdate {
234-
table_id: self.table_id,
235-
table_name: self.table_name.clone(),
236-
ops: self.deletes.to_vec(),
237-
}
224+
/// Return a list of updates with only delete operations.
225+
pub fn deletes(&self) -> Vec<ProductValue> {
226+
self.deletes.clone()
238227
}
239228

240229
/// Does this table update include inserts?
@@ -249,18 +238,6 @@ impl JoinSide {
249238
}
250239

251240
impl IncrementalJoin {
252-
/// Construct an empty [`DatabaseTableUpdate`] with the schema of `table`
253-
/// to use as a source when pre-compiling `eval_incr` queries.
254-
fn dummy_table_update(table: &DbTable) -> DatabaseTableUpdate {
255-
let table_id = table.table_id;
256-
let table_name = table.head.table_name.clone();
257-
DatabaseTableUpdate {
258-
table_id,
259-
table_name,
260-
ops: vec![],
261-
}
262-
}
263-
264241
fn optimize_query(join: IndexJoin) -> QueryExpr {
265242
let expr = QueryExpr::from(join);
266243
// Because (at least) one of the two tables will be a `MemTable`,
@@ -313,21 +290,15 @@ impl IncrementalJoin {
313290
.context("expected a physical database table")?
314291
.clone();
315292

316-
let (virtual_index_plan, _sources) =
317-
with_delta_table(join.clone(), Some(Self::dummy_table_update(&index_table)), None);
293+
let (virtual_index_plan, _sources) = with_delta_table(join.clone(), Some(Vec::new()), None);
318294
debug_assert_eq!(_sources.len(), 1);
319295
let virtual_index_plan = Self::optimize_query(virtual_index_plan);
320296

321-
let (virtual_probe_plan, _sources) =
322-
with_delta_table(join.clone(), None, Some(Self::dummy_table_update(&probe_table)));
297+
let (virtual_probe_plan, _sources) = with_delta_table(join.clone(), None, Some(Vec::new()));
323298
debug_assert_eq!(_sources.len(), 1);
324299
let virtual_probe_plan = Self::optimize_query(virtual_probe_plan);
325300

326-
let (virtual_plan, _sources) = with_delta_table(
327-
join.clone(),
328-
Some(Self::dummy_table_update(&index_table)),
329-
Some(Self::dummy_table_update(&probe_table)),
330-
);
301+
let (virtual_plan, _sources) = with_delta_table(join.clone(), Some(Vec::new()), Some(Vec::new()));
331302
debug_assert_eq!(_sources.len(), 2);
332303
let virtual_plan = virtual_plan.to_inner_join();
333304

@@ -360,46 +331,53 @@ impl IncrementalJoin {
360331
&self,
361332
updates: impl IntoIterator<Item = &'a DatabaseTableUpdate>,
362333
) -> Option<(JoinSide, JoinSide)> {
363-
let mut lhs_ops = Vec::new();
364-
let mut rhs_ops = Vec::new();
334+
let mut lhs_inserts = Vec::new();
335+
let mut lhs_deletes = Vec::new();
336+
let mut rhs_inserts = Vec::new();
337+
let mut rhs_deletes = Vec::new();
338+
339+
// Partitions `updates` into `deletes` and `inserts`.
340+
let partition_into = |deletes: &mut Vec<_>, inserts: &mut Vec<_>, updates: &DatabaseTableUpdate| {
341+
for update in &updates.ops {
342+
if update.op_type == 0 {
343+
&mut *deletes
344+
} else {
345+
&mut *inserts
346+
}
347+
.push(update.row.clone());
348+
}
349+
};
365350

351+
// Partitions all updates into the `(l|r)hs_(insert|delete)_ops` above.
366352
for update in updates {
367353
if update.table_id == self.lhs.table_id {
368-
lhs_ops.extend(update.ops.iter().cloned());
354+
partition_into(&mut lhs_deletes, &mut lhs_inserts, update);
369355
} else if update.table_id == self.rhs.table_id {
370-
rhs_ops.extend(update.ops.iter().cloned());
356+
partition_into(&mut rhs_deletes, &mut rhs_inserts, update);
371357
}
372358
}
373359

374-
if lhs_ops.is_empty() && rhs_ops.is_empty() {
360+
// No updates at all? Return `None`.
361+
if [&lhs_inserts, &lhs_deletes, &rhs_inserts, &rhs_deletes]
362+
.iter()
363+
.all(|ops| ops.is_empty())
364+
{
375365
return None;
376366
}
377367

378-
let lhs = JoinSide {
379-
table_id: self.lhs.table_id,
380-
table_name: self.lhs.head.table_name.clone(),
381-
inserts: lhs_ops.iter().filter(|op| op.op_type == 1).cloned().collect(),
382-
deletes: lhs_ops.iter().filter(|op| op.op_type == 0).cloned().collect(),
383-
};
384-
385-
let rhs = JoinSide {
386-
table_id: self.rhs.table_id,
387-
table_name: self.rhs.head.table_name.clone(),
388-
inserts: rhs_ops.iter().filter(|op| op.op_type == 1).cloned().collect(),
389-
deletes: rhs_ops.iter().filter(|op| op.op_type == 0).cloned().collect(),
390-
};
391-
392-
Some((lhs, rhs))
368+
// Stich together the `JoinSide`s.
369+
let join_side = |deletes, inserts| JoinSide { deletes, inserts };
370+
Some((join_side(lhs_deletes, lhs_inserts), join_side(rhs_deletes, rhs_inserts)))
393371
}
394372

395373
/// Evaluate join plan for lhs updates.
396374
fn eval_lhs(
397375
&self,
398376
db: &RelationalDB,
399377
tx: &Tx,
400-
lhs: DatabaseTableUpdate,
378+
lhs: Vec<ProductValue>,
401379
) -> Result<impl Iterator<Item = ProductValue>, DBError> {
402-
let lhs = to_mem_table(self.lhs.head.clone(), self.lhs.table_access, lhs);
380+
let lhs = MemTable::new(self.lhs.head.clone(), self.lhs.table_access, lhs);
403381
let mut sources = SourceSet::default();
404382
sources.add_mem_table(lhs);
405383
eval_updates(db, tx, self.plan_for_delta_lhs(), sources)
@@ -410,9 +388,9 @@ impl IncrementalJoin {
410388
&self,
411389
db: &RelationalDB,
412390
tx: &Tx,
413-
rhs: DatabaseTableUpdate,
391+
rhs: Vec<ProductValue>,
414392
) -> Result<impl Iterator<Item = ProductValue>, DBError> {
415-
let rhs = to_mem_table(self.rhs.head.clone(), self.rhs.table_access, rhs);
393+
let rhs = MemTable::new(self.rhs.head.clone(), self.rhs.table_access, rhs);
416394
let mut sources = SourceSet::default();
417395
sources.add_mem_table(rhs);
418396
eval_updates(db, tx, self.plan_for_delta_rhs(), sources)
@@ -423,11 +401,11 @@ impl IncrementalJoin {
423401
&self,
424402
db: &RelationalDB,
425403
tx: &Tx,
426-
lhs: DatabaseTableUpdate,
427-
rhs: DatabaseTableUpdate,
404+
lhs: Vec<ProductValue>,
405+
rhs: Vec<ProductValue>,
428406
) -> Result<impl Iterator<Item = ProductValue>, DBError> {
429-
let lhs = to_mem_table(self.lhs.head.clone(), self.lhs.table_access, lhs);
430-
let rhs = to_mem_table(self.rhs.head.clone(), self.rhs.table_access, rhs);
407+
let lhs = MemTable::new(self.lhs.head.clone(), self.lhs.table_access, lhs);
408+
let rhs = MemTable::new(self.rhs.head.clone(), self.rhs.table_access, rhs);
431409
let mut sources = SourceSet::default();
432410
let (index_side, probe_side) = if self.return_index_rows { (lhs, rhs) } else { (rhs, lhs) };
433411
sources.add_mem_table(index_side);
@@ -571,39 +549,25 @@ impl IncrementalJoin {
571549
}
572550
}
573551

574-
/// Construct a [`MemTable`] containing the updates from `delta`,
575-
/// which must be derived from a table with `head` and `table_access`.
576-
fn to_mem_table(head: Arc<Header>, table_access: StAccess, delta: DatabaseTableUpdate) -> MemTable {
577-
MemTable::new(
578-
head,
579-
table_access,
580-
delta.ops.into_iter().map(|op| op.row).collect::<Vec<_>>(),
581-
)
582-
}
583-
584552
/// Replace an [IndexJoin]'s scan or fetch operation with a delta table.
585553
/// A delta table consists purely of updates or changes to the base table.
586554
fn with_delta_table(
587555
mut join: IndexJoin,
588-
index_side: Option<DatabaseTableUpdate>,
589-
probe_side: Option<DatabaseTableUpdate>,
556+
index_side: Option<Vec<ProductValue>>,
557+
probe_side: Option<Vec<ProductValue>>,
590558
) -> (IndexJoin, SourceSet) {
591559
let mut sources = SourceSet::default();
592560

593561
if let Some(index_side) = index_side {
594562
let head = join.index_side.head().clone();
595563
let table_access = join.index_side.table_access();
596-
let mem_table = to_mem_table(head, table_access, index_side);
597-
let source_expr = sources.add_mem_table(mem_table);
598-
join.index_side = source_expr;
564+
join.index_side = sources.add_mem_table(MemTable::new(head, table_access, index_side));
599565
}
600566

601567
if let Some(probe_side) = probe_side {
602568
let head = join.probe_side.source.head().clone();
603569
let table_access = join.probe_side.source.table_access();
604-
let mem_table = to_mem_table(head, table_access, probe_side);
605-
let source_expr = sources.add_mem_table(mem_table);
606-
join.probe_side.source = source_expr;
570+
join.probe_side.source = sources.add_mem_table(MemTable::new(head, table_access, probe_side));
607571
}
608572

609573
(join, sources)
@@ -720,7 +684,6 @@ pub(crate) fn get_all(relational_db: &RelationalDB, tx: &Tx, auth: &AuthCtx) ->
720684
mod tests {
721685
use super::*;
722686
use crate::db::relational_db::tests_utils::make_test_db;
723-
use crate::host::module_host::TableOp;
724687
use crate::sql::compiler::compile_sql;
725688
use spacetimedb_lib::error::ResultTest;
726689
use spacetimedb_sats::relation::{DbTable, FieldName};
@@ -736,7 +699,7 @@ mod tests {
736699
// Create table [lhs] with index on [b]
737700
let schema = &[("a", AlgebraicType::U64), ("b", AlgebraicType::U64)];
738701
let indexes = &[(1.into(), "b")];
739-
let lhs_id = db.create_table_for_test("lhs", schema, indexes)?;
702+
let _ = db.create_table_for_test("lhs", schema, indexes)?;
740703

741704
// Create table [rhs] with index on [b, c]
742705
let schema = &[
@@ -766,11 +729,7 @@ mod tests {
766729
};
767730

768731
// Create an insert for an incremental update.
769-
let delta = DatabaseTableUpdate {
770-
table_id: lhs_id,
771-
table_name: String::from("lhs"),
772-
ops: vec![TableOp::insert(product![0u64, 0u64])],
773-
};
732+
let delta = vec![product![0u64, 0u64]];
774733

775734
// Optimize the query plan for the incremental update.
776735
let (expr, _sources) = with_delta_table(join, Some(delta), None);
@@ -834,7 +793,7 @@ mod tests {
834793
("d", AlgebraicType::U64),
835794
];
836795
let indexes = &[(0.into(), "b"), (1.into(), "c")];
837-
let rhs_id = db.create_table_for_test("rhs", schema, indexes)?;
796+
let _ = db.create_table_for_test("rhs", schema, indexes)?;
838797

839798
let tx = db.begin_tx();
840799
// Should generate an index join since there is an index on `lhs.b`.
@@ -855,11 +814,7 @@ mod tests {
855814
};
856815

857816
// Create an insert for an incremental update.
858-
let delta = DatabaseTableUpdate {
859-
table_id: rhs_id,
860-
table_name: String::from("rhs"),
861-
ops: vec![TableOp::insert(product![0u64, 0u64, 0u64])],
862-
};
817+
let delta = vec![product![0u64, 0u64, 0u64]];
863818

864819
// Optimize the query plan for the incremental update.
865820
let (expr, _sources) = with_delta_table(join, None, Some(delta));

0 commit comments

Comments
 (0)