Skip to content

Commit 20c92b4

Browse files
mamcxcloutiertyler
authored andcommitted
Fix bug for subcription that was not executing or returning if empty (#6)
* Fix bug for subcription that was not executing or returning if empty * Clippy * Filter out deleted rows * Recover the op_type (DELETED, INSERTED) flag for the row in the subscription
1 parent 62a94fc commit 20c92b4

File tree

2 files changed

+133
-42
lines changed

2 files changed

+133
-42
lines changed

crates/core/src/subscription/query.rs

Lines changed: 102 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ use crate::db::relational_db::RelationalDB;
22
use crate::error::{DBError, SubscriptionError};
33
use crate::host::module_host::DatabaseTableUpdate;
44
use crate::sql::execute::{compile_sql, execute_single_sql};
5-
use spacetimedb_sats::relation::MemTable;
5+
use spacetimedb_sats::relation::{Column, FieldName, MemTable};
6+
use spacetimedb_sats::AlgebraicType;
67
use spacetimedb_vm::expr::{Crud, CrudExpr, DbType, QueryExpr, SourceExpr};
78

89
pub enum QueryDef {
@@ -28,16 +29,25 @@ impl Query {
2829
}
2930
}
3031

32+
pub const OP_TYPE_FIELD_NAME: &str = "__op_type";
33+
34+
//HACK: To recover the `op_type` of this particular row I add a "hidden" column `OP_TYPE_FIELD_NAME`
3135
pub fn to_mem_table(of: QueryExpr, data: &DatabaseTableUpdate) -> QueryExpr {
3236
let mut q = of;
3337

3438
let mut t = match &q.source {
3539
SourceExpr::MemTable(x) => MemTable::new(&x.head, &[]),
3640
SourceExpr::DbTable(table) => MemTable::new(&table.head, &[]),
3741
};
42+
t.head.fields.push(Column::new(
43+
FieldName::named(&t.head.table_name, OP_TYPE_FIELD_NAME),
44+
AlgebraicType::U8,
45+
));
3846

3947
for row in &data.ops {
40-
t.data.push(row.row.clone());
48+
let mut new = row.row.clone();
49+
new.elements.push(row.op_type.into());
50+
t.data.push(new);
4151
}
4252

4353
q.source = SourceExpr::MemTable(t);
@@ -87,10 +97,11 @@ mod tests {
8797
use crate::subscription::subscription::QuerySet;
8898
use crate::vm::tests::create_table_from_program;
8999
use crate::vm::DbProgram;
100+
use itertools::Itertools;
90101
use spacetimedb_lib::data_key::ToDataKey;
91102
use spacetimedb_lib::error::ResultTest;
92103
use spacetimedb_sats::relation::FieldName;
93-
use spacetimedb_sats::{product, BuiltinType, ProductType, ProductValue};
104+
use spacetimedb_sats::{product, BuiltinType, ProductType};
94105
use spacetimedb_vm::dsl::{db_table, mem_table, scalar};
95106
use spacetimedb_vm::operator::OpCmp;
96107

@@ -102,6 +113,7 @@ mod tests {
102113
let p = &mut DbProgram::new(&db, &mut tx);
103114

104115
let head = ProductType::from_iter([("inventory_id", BuiltinType::U64), ("name", BuiltinType::String)]);
116+
105117
let row = product!(1u64, "health");
106118
let table = mem_table(head.clone(), [row.clone()]);
107119
let table_id = create_table_from_program(p, "inventory", head.clone(), &[row.clone()])?;
@@ -110,7 +122,7 @@ mod tests {
110122
db.commit_tx(tx)?;
111123

112124
let op = TableOp {
113-
op_type: 0,
125+
op_type: 1,
114126
row_pk: vec![],
115127
row,
116128
};
@@ -120,7 +132,13 @@ mod tests {
120132
table_name: "inventory".to_string(),
121133
ops: vec![op.clone()],
122134
};
123-
let q = QueryExpr::new(db_table((&schema).into(), "inventory", table_id));
135+
// For filtering out the hidden field `OP_TYPE_FIELD_NAME`
136+
let fields = &[
137+
FieldName::named("inventory", "inventory_id").into(),
138+
FieldName::named("inventory", "name").into(),
139+
];
140+
141+
let q = QueryExpr::new(db_table((&schema).into(), "inventory", table_id)).with_project(fields);
124142

125143
let q = to_mem_table(q, &data);
126144
let result = run_query(&db, &q)?;
@@ -136,16 +154,14 @@ mod tests {
136154
ops: vec![op],
137155
};
138156

139-
let q = QueryExpr::new(db_table((&schema).into(), "inventory", table_id)).with_select_cmp(
140-
OpCmp::Eq,
141-
FieldName::named("inventory", "inventory_id"),
142-
scalar(0),
143-
);
157+
let q = QueryExpr::new(db_table((&schema).into(), "inventory", table_id))
158+
.with_select_cmp(OpCmp::Eq, FieldName::named("inventory", "inventory_id"), scalar(1u64))
159+
.with_project(fields);
144160

145161
let q = to_mem_table(q, &data);
146162
let result = run_query(&db, &q)?;
147163

148-
let table = mem_table(head, Vec::<ProductValue>::new());
164+
let table = mem_table(head, vec![product!(1u64, "health")]);
149165
assert_eq!(
150166
Some(table.as_without_table_name()),
151167
result.first().map(|x| x.as_without_table_name())
@@ -192,8 +208,12 @@ mod tests {
192208
]);
193209

194210
let result = s.eval(&db)?;
195-
assert_eq!(result.tables.len(), 1, "Must return 1 table");
196-
assert_eq!(result.tables[0].ops.len(), 1, "Must return 1 row");
211+
assert_eq!(result.tables.len(), 3, "Must return 3 tables");
212+
assert_eq!(
213+
result.tables.iter().map(|x| x.ops.len()).sum::<usize>(),
214+
1,
215+
"Must return 1 row"
216+
);
197217
assert_eq!(result.tables[0].ops[0].row, row, "Must return the correct row");
198218

199219
Ok(())
@@ -251,10 +271,77 @@ mod tests {
251271
let update = DatabaseUpdate { tables: vec![data] };
252272

253273
let result = s.eval_incr(&db, &update)?;
254-
assert_eq!(result.tables.len(), 1, "Must return 1 table");
255-
assert_eq!(result.tables[0].ops.len(), 1, "Must return 1 row");
274+
assert_eq!(result.tables.len(), 3, "Must return 3 tables");
275+
assert_eq!(
276+
result.tables.iter().map(|x| x.ops.len()).sum::<usize>(),
277+
1,
278+
"Must return 1 row"
279+
);
256280
assert_eq!(result.tables[0].ops[0].row, row, "Must return the correct row");
257281

258282
Ok(())
259283
}
284+
285+
//Check that
286+
//```
287+
//SELECT * FROM table1
288+
//SELECT * FROM table2
289+
// =
290+
//SELECT * FROM table2
291+
//SELECT * FROM table1
292+
//```
293+
// return just one row irrespective of the order of the queries
294+
#[test]
295+
fn test_subscribe_commutative() -> ResultTest<()> {
296+
let (db, _tmp_dir) = make_test_db()?;
297+
let mut tx = db.begin_tx();
298+
let p = &mut DbProgram::new(&db, &mut tx);
299+
300+
let head_1 = ProductType::from_iter([("inventory_id", BuiltinType::U64), ("name", BuiltinType::String)]);
301+
let row_1 = product!(1u64, "health");
302+
let table_id_1 = create_table_from_program(p, "inventory", head_1.clone(), &[row_1.clone()])?;
303+
304+
let head_2 = ProductType::from_iter([("player_id", BuiltinType::U64), ("name", BuiltinType::String)]);
305+
let row_2 = product!(2u64, "jhon doe");
306+
let table_id_2 = create_table_from_program(p, "player", head_2, &[row_2.clone()])?;
307+
308+
let schema_1 = db.schema_for_table(&tx, table_id_1).unwrap();
309+
let schema_2 = db.schema_for_table(&tx, table_id_2).unwrap();
310+
db.commit_tx(tx)?;
311+
312+
let q_1 = QueryExpr::new(db_table((&schema_1).into(), "inventory", table_id_1));
313+
let q_2 = QueryExpr::new(db_table((&schema_2).into(), "player", table_id_2));
314+
315+
let s = QuerySet(vec![
316+
Query {
317+
queries: vec![q_1.clone()],
318+
},
319+
Query {
320+
queries: vec![q_2.clone()],
321+
},
322+
]);
323+
324+
let result_1 = s.eval(&db)?;
325+
326+
let s = QuerySet(vec![
327+
Query {
328+
queries: vec![q_2.clone()],
329+
},
330+
Query { queries: vec![q_1] },
331+
]);
332+
333+
let result_2 = s.eval(&db)?;
334+
let to_row = |of: DatabaseUpdate| {
335+
of.tables
336+
.iter()
337+
.map(|x| x.ops.iter().map(|x| x.row.clone()))
338+
.flatten()
339+
.sorted()
340+
.collect::<Vec<_>>()
341+
};
342+
343+
assert_eq!(to_row(result_1), to_row(result_2));
344+
345+
Ok(())
346+
}
260347
}

crates/core/src/subscription/subscription.rs

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1+
use spacetimedb_sats::{AlgebraicValue, BuiltinValue};
12
use std::collections::HashSet;
23

34
use super::query::Query;
45
use crate::error::DBError;
5-
use crate::subscription::query::run_query;
6+
use crate::subscription::query::{run_query, OP_TYPE_FIELD_NAME};
67
use crate::{
78
client::{ClientActorId, ClientConnectionSender},
89
db::relational_db::RelationalDB,
@@ -60,32 +61,35 @@ impl QuerySet {
6061
let mut seen = HashSet::new();
6162

6263
for query in &self.0 {
63-
for table in &database_update.tables {
64-
let mut rows = Vec::with_capacity(table.ops.len());
65-
for row in &table.ops {
66-
if seen.contains(&(table.table_id, row.row_pk.clone())) {
67-
continue;
68-
}
69-
seen.insert((table.table_id, row.row_pk.clone()));
70-
rows.push(row.clone());
71-
}
72-
73-
if rows.is_empty() {
74-
continue;
75-
}
64+
for table in database_update.tables.iter().cloned() {
65+
for q in query.queries_of_table_id(&table) {
66+
if let Some(result) = run_query(relational_db, &q)?.into_iter().find(|x| !x.data.is_empty()) {
67+
let mut table_row_operations = table.clone();
68+
table_row_operations.ops.clear();
69+
for mut row in result.data {
70+
//Hack: remove the hidden field OP_TYPE_FIELD_NAME. see `to_mem_table`
71+
// needs to be done before calculate the PK
72+
let op_type =
73+
if let Some(AlgebraicValue::Builtin(BuiltinValue::U8(op))) = row.elements.pop() {
74+
op
75+
} else {
76+
panic!("Fail to extract {OP_TYPE_FIELD_NAME}")
77+
};
78+
79+
let row_pk = RelationalDB::pk_for_row(&row);
80+
81+
//Skip rows that are already resolved in a previous subscription...
82+
if seen.contains(&(table.table_id, row_pk)) {
83+
continue;
84+
}
7685

77-
let table = DatabaseTableUpdate {
78-
table_id: table.table_id,
79-
table_name: table.table_name.clone(),
80-
ops: rows,
81-
};
86+
seen.insert((table.table_id, row_pk));
8287

83-
for q in query.queries_of_table_id(&table) {
84-
let result = run_query(relational_db, &q)?.into_iter().find(|x| !x.data.is_empty());
85-
if result.is_none() {
86-
continue;
88+
let row_pk = row_pk.to_bytes();
89+
table_row_operations.ops.push(TableOp { op_type, row_pk, row });
90+
}
91+
output.tables.push(table_row_operations);
8792
}
88-
output.tables.push(table.clone());
8993
}
9094
}
9195
}
@@ -128,9 +132,9 @@ impl QuerySet {
128132
});
129133
}
130134

131-
if table_row_operations.is_empty() {
132-
continue;
133-
}
135+
// if table_row_operations.is_empty() {
136+
// continue;
137+
// }
134138

135139
database_update.tables.push(DatabaseTableUpdate {
136140
table_id: t.table_id,

0 commit comments

Comments
 (0)