Skip to content

Commit fe3ba2d

Browse files
committed
Fix storing of deltas by field
1 parent 5aa4424 commit fe3ba2d

File tree

2 files changed

+22
-17
lines changed

2 files changed

+22
-17
lines changed

Diff for: benches/query.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,10 @@ fn bench_apply_deltas(b: &mut Bencher) {
114114
ENGINE.store_deltas(&NAME, &scope, &deltas).await.unwrap();
115115
});
116116

117-
let scope = DeltaScope::date(DATE.next_day().unwrap());
118-
119117
b.iter(move || {
120118
tokio_test::block_on(async {
119+
let scope = DeltaScope::date(DATE.next_day().unwrap());
120+
121121
let query = QueryExecution::new()
122122
.with_scope(scope)
123123
.with_pagination(*PAGINATION);

Diff for: src/storage.rs

+20-15
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ pub struct EntityStorage {
131131
data: Database<OwnedType<DataItemId>, SerdeBincode<DataItem>>,
132132
indices: Database<Str, SerdeBincode<Index>>,
133133
documents: Database<Str, SerdeBincode<RoaringBitmap>>,
134-
deltas: Database<OwnedType<u64>, SerdeBincode<BTreeMap<i64, StoredDelta>>>,
134+
deltas: Database<OwnedType<u64>, SerdeBincode<BTreeMap<i64, HashMap<String, StoredDelta>>>>,
135135
index_descriptors: HashMap<String, TypeDescriptor>,
136136
}
137137

@@ -441,14 +441,16 @@ impl EntityStorage {
441441
.range((Bound::Unbounded, Bound::Included(scope.timestamp)))
442442
.fold(
443443
HashMap::<String, StoredDelta>::new(),
444-
|mut acc, (_, stored_delta)| {
445-
if let Some(aggregated_delta) = acc.get_mut(&stored_delta.field_name) {
446-
aggregated_delta.before.plus(&stored_delta.before);
447-
aggregated_delta.after.plus(&stored_delta.after);
448-
aggregated_delta.affected |= &stored_delta.affected;
449-
} else {
450-
acc.insert(stored_delta.field_name.clone(), stored_delta.clone());
451-
};
444+
|mut acc, (_, stored_deltas)| {
445+
for (field_name, stored_delta) in stored_deltas {
446+
if let Some(aggregated_delta) = acc.get_mut(field_name) {
447+
aggregated_delta.before.plus(&stored_delta.before);
448+
aggregated_delta.after.plus(&stored_delta.after);
449+
aggregated_delta.affected |= &stored_delta.affected;
450+
} else {
451+
acc.insert(stored_delta.field_name.clone(), stored_delta.clone());
452+
};
453+
}
452454

453455
acc
454456
},
@@ -539,6 +541,8 @@ impl EntityStorage {
539541
let scope_id = scope.get_id();
540542
let mut current = self.deltas.get(&txn, &scope_id)?.unwrap_or_default();
541543

544+
let stored_deltas = current.entry(scope.timestamp).or_default();
545+
542546
// Iterate over the deltas to create for each field name the before and after index
543547
for delta in deltas {
544548
let type_descriptor = self
@@ -551,12 +555,13 @@ impl EntityStorage {
551555
)
552556
});
553557

554-
let stored_delta = current
555-
.entry(scope.timestamp)
556-
.or_insert(StoredDelta::from_type(
557-
delta.field_name.clone(),
558-
type_descriptor,
559-
));
558+
let stored_delta =
559+
stored_deltas
560+
.entry(delta.field_name.clone())
561+
.or_insert(StoredDelta::from_type(
562+
delta.field_name.clone(),
563+
type_descriptor,
564+
));
560565

561566
let position = id_to_position(delta.id);
562567

0 commit comments

Comments
 (0)