Skip to content

Commit 7e0bdf0

Browse files
committed
#529 WIP propvalsub index
1 parent 3b67266 commit 7e0bdf0

File tree

8 files changed

+274
-178
lines changed

8 files changed

+274
-178
lines changed

lib/src/atoms.rs

-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
use crate::{errors::AtomicResult, values::Value};
44

55
/// The Atom is the (non-validated) string representation of a piece of data.
6-
/// It's RichAtom sibling provides some extra methods.
76
#[derive(Clone, Debug)]
87
pub struct Atom {
98
/// The URL where the resource is located

lib/src/commit.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -352,10 +352,14 @@ impl Commit {
352352

353353
if update_index {
354354
for atom in remove_atoms {
355-
store.remove_atom_from_index(&atom, &resource_unedited)?;
355+
store
356+
.remove_atom_from_index(&atom, &resource_unedited)
357+
.map_err(|e| format!("Error removing atom from index: {e} Atom: {e}"))?
356358
}
357359
for atom in add_atoms {
358-
store.add_atom_to_index(&atom, &resource)?;
360+
store
361+
.add_atom_to_index(&atom, &resource)
362+
.map_err(|e| format!("Error adding atom to index: {e} Atom: {e}"))?;
359363
}
360364
}
361365
Ok(resource)

lib/src/db.rs

+26-51
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
//! Persistent, ACID compliant, threadsafe to-disk store.
22
//! Powered by Sled - an embedded database.
33
4+
mod migrations;
5+
mod prop_val_sub_index;
6+
mod query_index;
7+
mod reference_index;
8+
#[cfg(test)]
9+
pub mod test;
10+
411
use std::{
512
collections::{HashMap, HashSet},
613
sync::{Arc, Mutex},
@@ -10,6 +17,7 @@ use tracing::{instrument, trace};
1017

1118
use crate::{
1219
commit::CommitResponse,
20+
db::reference_index::key_to_atom,
1321
endpoints::{default_endpoints, Endpoint},
1422
errors::{AtomicError, AtomicResult},
1523
resources::PropVals,
@@ -19,20 +27,17 @@ use crate::{
1927

2028
use self::{
2129
migrations::migrate_maybe,
30+
prop_val_sub_index::add_atom_to_prop_val_sub_index,
2231
query_index::{
2332
atom_to_indexable_atoms, check_if_atom_matches_watched_query_filters, query_indexed,
24-
update_indexed_member, watch_collection, IndexAtom, QueryFilter, END_CHAR,
33+
update_indexed_member, watch_collection, QueryFilter, END_CHAR,
2534
},
35+
reference_index::{add_atom_to_reference_index, delete_atom_from_reference_index},
2636
};
2737

2838
// A function called by the Store when a Commit is accepted
2939
type HandleCommit = Box<dyn Fn(&CommitResponse) + Send + Sync>;
3040

31-
mod migrations;
32-
mod query_index;
33-
#[cfg(test)]
34-
pub mod test;
35-
3641
/// Inside the reference_index, each value is mapped to this type.
3742
/// The String on the left represents a Property URL, and the second one is the set of subjects.
3843
pub type PropSubjectMap = HashMap<String, HashSet<String>>;
@@ -54,13 +59,15 @@ pub struct Db {
5459
default_agent: Arc<Mutex<Option<crate::agents::Agent>>>,
5560
/// Stores all resources. The Key is the Subject as a `string.as_bytes()`, the value a [PropVals]. Propvals must be serialized using [bincode].
5661
resources: sled::Tree,
57-
/// Index for all AtomicURLs, indexed by their Value. Used to speed up TPF queries. See [key_for_reference_index]
62+
/// Index of all Atoms, sorted by {Value}-{Property}-{Subject}.
63+
/// See [reference_index]
5864
reference_index: sled::Tree,
65+
/// Index sorted by property + value.
66+
/// Used for TPF queries where the property is known.
67+
prop_val_sub_index: sled::Tree,
5968
/// Stores the members of Collections, easily sortable.
60-
/// See [collections_index]
61-
members_index: sled::Tree,
62-
/// A list of all the Collections currently being used. Is used to update `members_index`.
63-
/// See [collections_index]
69+
query_index: sled::Tree,
70+
/// A list of all the Collections currently being used. Is used to update `query_index`.
6471
watched_queries: sled::Tree,
6572
/// The address where the db will be hosted, e.g. http://localhost/
6673
server_url: String,
@@ -78,14 +85,16 @@ impl Db {
7885
let db = sled::open(path).map_err(|e|format!("Failed opening DB at this location: {:?} . Is another instance of Atomic Server running? {}", path, e))?;
7986
let resources = db.open_tree("resources_v1").map_err(|e|format!("Failed building resources. Your DB might be corrupt. Go back to a previous version and export your data. {}", e))?;
8087
let reference_index = db.open_tree("reference_index")?;
81-
let members_index = db.open_tree("members_index")?;
88+
let query_index = db.open_tree("members_index")?;
89+
let prop_val_sub_index = db.open_tree("prop_val_sub_index")?;
8290
let watched_queries = db.open_tree("watched_queries")?;
8391
let store = Db {
8492
db,
8593
default_agent: Arc::new(Mutex::new(None)),
8694
resources,
8795
reference_index,
88-
members_index,
96+
query_index,
97+
prop_val_sub_index,
8998
server_url,
9099
watched_queries,
91100
endpoints: default_endpoints(),
@@ -160,7 +169,7 @@ impl Db {
160169
/// Removes all values from the indexes.
161170
pub fn clear_index(&self) -> AtomicResult<()> {
162171
self.reference_index.clear()?;
163-
self.members_index.clear()?;
172+
self.query_index.clear()?;
164173
self.watched_queries.clear()?;
165174
Ok(())
166175
}
@@ -217,8 +226,8 @@ impl Storelike for Db {
217226
#[instrument(skip(self))]
218227
fn add_atom_to_index(&self, atom: &Atom, resource: &Resource) -> AtomicResult<()> {
219228
for index_atom in atom_to_indexable_atoms(atom)? {
220-
// It's OK if this overwrites a value
221229
add_atom_to_reference_index(&index_atom, self)?;
230+
add_atom_to_prop_val_sub_index(&index_atom, self)?;
222231
// Also update the query index to keep collections performant
223232
check_if_atom_matches_watched_query_filters(self, &index_atom, atom, false, resource)
224233
.map_err(|e| {
@@ -697,42 +706,8 @@ impl Storelike for Db {
697706
}
698707
}
699708

700-
#[instrument(skip(store))]
701-
fn add_atom_to_reference_index(index_atom: &IndexAtom, store: &Db) -> AtomicResult<()> {
702-
let _existing = store
703-
.reference_index
704-
.insert(key_for_reference_index(index_atom).as_bytes(), b"")?;
705-
Ok(())
706-
}
707-
708-
#[instrument(skip(store))]
709-
fn delete_atom_from_reference_index(index_atom: &IndexAtom, store: &Db) -> AtomicResult<()> {
710-
store
711-
.reference_index
712-
.remove(key_for_reference_index(index_atom).as_bytes())?;
713-
Ok(())
714-
}
715-
716-
/// Constructs the Key for the index_value cache.
717-
fn key_for_reference_index(atom: &IndexAtom) -> String {
718-
format!("{}\n{}\n{}", atom.value, atom.property, atom.subject)
719-
}
720-
721-
/// Parses a Value index key string, converts it into an atom. Note that the Value of the atom will allways be a single AtomicURL here.
722-
fn key_to_atom(key: &str) -> AtomicResult<Atom> {
723-
let mut parts = key.split('\n');
724-
let val = parts.next().ok_or("Invalid key for value index")?;
725-
let prop = parts.next().ok_or("Invalid key for value index")?;
726-
let subj = parts.next().ok_or("Invalid key for value index")?;
727-
Ok(Atom::new(
728-
subj.into(),
729-
prop.into(),
730-
Value::AtomicUrl(val.into()),
731-
))
732-
}
733-
734709
fn corrupt_db_message(subject: &str) -> String {
735-
format!("Could not deserialize item {} from database. DB is possibly corrupt, could be due to an update or a lack of migrations. Restore to a previous version, export / serialize your data and import your data again.", subject)
710+
format!("Could not deserialize item {} from database. DB is possibly corrupt, could be due to an update or a lack of migrations. Restore to a previous version, export your data and import your data again.", subject)
736711
}
737712

738-
const DB_CORRUPT_MSG: &str = "Could not deserialize item from database. DB is possibly corrupt, could be due to an update or a lack of migrations. Restore to a previous version, export / serialize your data and import your data again.";
713+
const DB_CORRUPT_MSG: &str = "Could not deserialize item from database. DB is possibly corrupt, could be due to an update or a lack of migrations. Restore to a previous version, export your data and import your data again.";

lib/src/db/prop_val_sub_index.rs

+76
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
//! Index sorted by {Property}-{Value}-{Subject}.
2+
3+
use tracing::instrument;
4+
5+
use crate::{errors::AtomicResult, Db};
6+
7+
use super::query_index::{IndexAtom, SEPARATION_BIT};
8+
9+
/// Finds all Atoms for a given {property}-{value} tuple.
10+
pub fn find_in_prop_val_sub_index(
11+
store: &Db,
12+
prop: &str,
13+
val: Option<&str>,
14+
) -> AtomicResult<sled::Iter> {
15+
let mut prefix: Vec<u8> = [prop.as_bytes(), &[SEPARATION_BIT]].concat();
16+
if let Some(value) = val {
17+
prefix.extend(value.as_bytes());
18+
prefix.extend(&[SEPARATION_BIT]);
19+
}
20+
Ok(store.prop_val_sub_index.scan_prefix(prefix))
21+
}
22+
23+
#[instrument(skip(store))]
24+
pub fn add_atom_to_prop_val_sub_index(index_atom: &IndexAtom, store: &Db) -> AtomicResult<()> {
25+
let _existing = store
26+
.prop_val_sub_index
27+
.insert(key_from_atom(index_atom), b"")?;
28+
Ok(())
29+
}
30+
31+
/// Constructs the Key for the prop_val_sub_index.
32+
fn key_from_atom(atom: &IndexAtom) -> Vec<u8> {
33+
[
34+
atom.property.as_bytes(),
35+
&[SEPARATION_BIT],
36+
atom.value.as_bytes(),
37+
&[SEPARATION_BIT],
38+
atom.subject.as_bytes(),
39+
]
40+
.concat()
41+
}
42+
43+
/// Parses a Value index key string, converts it into an atom.
44+
/// Note that the Value of the atom will always be a single AtomicURL here.
45+
fn key_to_index_atom(key: &[u8]) -> AtomicResult<IndexAtom> {
46+
let mut parts = key.split(|b| b == &SEPARATION_BIT);
47+
let prop = std::str::from_utf8(parts.next().ok_or("Invalid key for prop_val_sub_index")?)
48+
.map_err(|_| "Can't parse prop into string")?;
49+
let val = std::str::from_utf8(parts.next().ok_or("Invalid key for prop_val_sub_index")?)
50+
.map_err(|_| "Can't parse val into string")?;
51+
let sub = std::str::from_utf8(parts.next().ok_or("Invalid key for prop_val_sub_index")?)
52+
.map_err(|_| "Can't parse subject into string")?;
53+
Ok(IndexAtom {
54+
property: prop.into(),
55+
value: val.into(),
56+
subject: sub.into(),
57+
})
58+
}
59+
60+
#[cfg(test)]
61+
mod test {
62+
use super::*;
63+
use crate::db::query_index::IndexAtom;
64+
65+
#[test]
66+
fn round_trip() {
67+
let atom = IndexAtom {
68+
property: "http://example.com/prop".into(),
69+
value: "http://example.com/val \n hello \n".into(),
70+
subject: "http://example.com/subj".into(),
71+
};
72+
let key = key_from_atom(&atom);
73+
let atom2 = key_to_index_atom(&key).unwrap();
74+
assert_eq!(atom, atom2);
75+
}
76+
}

lib/src/db/query_index.rs

+13-14
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
//! The Collections Cache is used to speed up queries.
2-
//! It sorts Members by their Value, so we can quickly paginate and sort.
1+
//! The QueryIndex is used to speed up queries by persisting filtered, sorted collections.
32
//! It relies on lexicographic ordering of keys, which Sled utilizes using `scan_prefix` queries.
43
54
use crate::{
@@ -13,6 +12,7 @@ use serde::{Deserialize, Serialize};
1312
/// A subset of a full [Query].
1413
/// Represents a sorted filter on the Store.
1514
/// A Value in the `watched_collections`.
15+
/// Used as keys in the query_index.
1616
/// These are used to check whether collections have to be updated when values have changed.
1717
#[derive(Debug, Clone, Serialize, Deserialize)]
1818
pub struct QueryFilter {
@@ -34,10 +34,10 @@ impl From<&Query> for QueryFilter {
3434
}
3535
}
3636

37-
/// Differs from a Regular Atom, since the value here is always a string,
37+
/// Differs from a regular [Atom], since the value here is always a string,
3838
/// and in the case of ResourceArrays, only a _single_ subject is used for each atom.
3939
/// One IndexAtom for every member of the ResourceArray is created.
40-
#[derive(Debug, Clone)]
40+
#[derive(Debug, Clone, PartialEq, Eq)]
4141
pub struct IndexAtom {
4242
pub subject: String,
4343
pub property: String,
@@ -47,9 +47,12 @@ pub struct IndexAtom {
4747
/// Last character in lexicographic ordering
4848
pub const FIRST_CHAR: &str = "\u{0000}";
4949
pub const END_CHAR: &str = "\u{ffff}";
50+
/// We can only store one bytearray as a key in Sled.
51+
/// We separate the various items in it using this bit that's illegal in UTF-8.
52+
pub const SEPARATION_BIT: u8 = 0xff;
5053

5154
#[tracing::instrument(skip(store))]
52-
/// Performs a query on the `members_index` Tree, which is a lexicographic sorted list of all hits for QueryFilters.
55+
/// Performs a query on the `query_index` Tree, which is a lexicographic sorted list of all hits for QueryFilters.
5356
pub fn query_indexed(store: &Db, q: &Query) -> AtomicResult<QueryResult> {
5457
// When there is no explicit start / end value passed, we use the very first and last
5558
// lexicographic characters in existence to make the range practically encompass all values.
@@ -68,9 +71,9 @@ pub fn query_indexed(store: &Db, q: &Query) -> AtomicResult<QueryResult> {
6871

6972
let iter: Box<dyn Iterator<Item = std::result::Result<(sled::IVec, sled::IVec), sled::Error>>> =
7073
if q.sort_desc {
71-
Box::new(store.members_index.range(start_key..end_key).rev())
74+
Box::new(store.query_index.range(start_key..end_key).rev())
7275
} else {
73-
Box::new(store.members_index.range(start_key..end_key))
76+
Box::new(store.query_index.range(start_key..end_key))
7477
};
7578

7679
let mut subjects: Vec<String> = vec![];
@@ -293,18 +296,14 @@ pub fn update_indexed_member(
293296
Some(subject),
294297
)?;
295298
if delete {
296-
store.members_index.remove(key)?;
299+
store.query_index.remove(key)?;
297300
} else {
298-
store.members_index.insert(key, b"")?;
301+
store.query_index.insert(key, b"")?;
299302
}
300303
Ok(())
301304
}
302305

303-
/// We can only store one bytearray as a key in Sled.
304-
/// We separate the various items in it using this bit that's illegal in UTF-8.
305-
const SEPARATION_BIT: u8 = 0xff;
306-
307-
/// Maximum string length for values in the members_index. Should be long enough to contain pretty long URLs, but not very long documents.
306+
/// Maximum string length for values in the query_index. Should be long enough to contain pretty long URLs, but not very long documents.
308307
pub const MAX_LEN: usize = 120;
309308

310309
/// Creates a key for a collection + value combination.

lib/src/db/reference_index.rs

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
//! The ReferenceIndex is for quickly finding incoming links.
2+
//! It indexes all values in the database.
3+
4+
use super::query_index::IndexAtom;
5+
use crate::{errors::AtomicResult, Atom, Db, Value};
6+
use tracing::instrument;
7+
8+
#[instrument(skip(store))]
9+
pub fn add_atom_to_reference_index(index_atom: &IndexAtom, store: &Db) -> AtomicResult<()> {
10+
let _existing = store
11+
.reference_index
12+
.insert(key_from_atom(index_atom).as_bytes(), b"")?;
13+
Ok(())
14+
}
15+
16+
#[instrument(skip(store))]
17+
pub fn delete_atom_from_reference_index(index_atom: &IndexAtom, store: &Db) -> AtomicResult<()> {
18+
store
19+
.reference_index
20+
.remove(key_from_atom(index_atom).as_bytes())?;
21+
Ok(())
22+
}
23+
24+
/// Constructs the Key for the index_value cache.
25+
fn key_from_atom(atom: &IndexAtom) -> String {
26+
format!("{}\n{}\n{}", atom.value, atom.property, atom.subject)
27+
}
28+
29+
/// Parses a Value index key string, converts it into an atom.
30+
/// Note that the Value of the atom will always be a single AtomicURL here.
31+
pub fn key_to_atom(key: &str) -> AtomicResult<Atom> {
32+
let mut parts = key.split('\n');
33+
let val = parts.next().ok_or(ERR_MSG)?;
34+
let prop = parts.next().ok_or(ERR_MSG)?;
35+
let subj = parts.next().ok_or(ERR_MSG)?;
36+
Ok(Atom::new(
37+
subj.into(),
38+
prop.into(),
39+
Value::AtomicUrl(val.into()),
40+
))
41+
}
42+
43+
static ERR_MSG: &str = "Invalid key in value index. Try starting the server with --rebuild-index.";

0 commit comments

Comments
 (0)