Skip to content

Commit 2eb6930

Browse files
committed
send queue: add an own transaction id for dependent events
The previously named `transaction_id` is also renamed to `parent_transaction_id` to make it clearer.
1 parent 0246863 commit 2eb6930

File tree

7 files changed

+119
-85
lines changed

7 files changed

+119
-85
lines changed

crates/matrix-sdk-base/src/store/integration_tests.rs

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1498,14 +1498,21 @@ impl StateStoreIntegrationTests for DynStateStore {
14981498
assert!(self.list_dependent_send_queue_events(room_id).await.unwrap().is_empty());
14991499

15001500
// Save a redaction for that event.
1501-
self.save_dependent_send_queue_event(room_id, &txn0, DependentQueuedEventKind::Redact)
1502-
.await
1503-
.unwrap();
1501+
let child_txn = TransactionId::new();
1502+
self.save_dependent_send_queue_event(
1503+
room_id,
1504+
&txn0,
1505+
child_txn.clone(),
1506+
DependentQueuedEventKind::Redact,
1507+
)
1508+
.await
1509+
.unwrap();
15041510

15051511
// It worked.
15061512
let dependents = self.list_dependent_send_queue_events(room_id).await.unwrap();
15071513
assert_eq!(dependents.len(), 1);
1508-
assert_eq!(dependents[0].transaction_id, txn0);
1514+
assert_eq!(dependents[0].parent_transaction_id, txn0);
1515+
assert_eq!(dependents[0].own_transaction_id, child_txn);
15091516
assert!(dependents[0].event_id.is_none());
15101517
assert_matches!(dependents[0].kind, DependentQueuedEventKind::Redact);
15111518

@@ -1518,13 +1525,16 @@ impl StateStoreIntegrationTests for DynStateStore {
15181525
// It worked.
15191526
let dependents = self.list_dependent_send_queue_events(room_id).await.unwrap();
15201527
assert_eq!(dependents.len(), 1);
1521-
assert_eq!(dependents[0].transaction_id, txn0);
1528+
assert_eq!(dependents[0].parent_transaction_id, txn0);
1529+
assert_eq!(dependents[0].own_transaction_id, child_txn);
15221530
assert_eq!(dependents[0].event_id.as_ref(), Some(&event_id));
15231531
assert_matches!(dependents[0].kind, DependentQueuedEventKind::Redact);
15241532

15251533
// Now remove it.
1526-
let removed =
1527-
self.remove_dependent_send_queue_event(room_id, dependents[0].id).await.unwrap();
1534+
let removed = self
1535+
.remove_dependent_send_queue_event(room_id, &dependents[0].own_transaction_id)
1536+
.await
1537+
.unwrap();
15281538
assert!(removed);
15291539

15301540
// It worked.
@@ -1538,14 +1548,20 @@ impl StateStoreIntegrationTests for DynStateStore {
15381548
.unwrap();
15391549
self.save_send_queue_event(room_id, txn1.clone(), event1).await.unwrap();
15401550

1541-
self.save_dependent_send_queue_event(room_id, &txn0, DependentQueuedEventKind::Redact)
1542-
.await
1543-
.unwrap();
1551+
self.save_dependent_send_queue_event(
1552+
room_id,
1553+
&txn0,
1554+
TransactionId::new(),
1555+
DependentQueuedEventKind::Redact,
1556+
)
1557+
.await
1558+
.unwrap();
15441559
assert_eq!(self.list_dependent_send_queue_events(room_id).await.unwrap().len(), 1);
15451560

15461561
self.save_dependent_send_queue_event(
15471562
room_id,
15481563
&txn1,
1564+
TransactionId::new(),
15491565
DependentQueuedEventKind::Edit {
15501566
new_content: SerializableEventContent::new(
15511567
&RoomMessageEventContent::text_plain("edit").into(),

crates/matrix-sdk-base/src/store/memory_store.rs

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
use std::{
1616
collections::{BTreeMap, BTreeSet, HashMap},
1717
num::NonZeroUsize,
18-
sync::{Mutex, RwLock as StdRwLock},
18+
sync::RwLock as StdRwLock,
1919
};
2020

2121
use async_trait::async_trait;
@@ -92,7 +92,6 @@ pub struct MemoryStore {
9292
custom: StdRwLock<HashMap<Vec<u8>, Vec<u8>>>,
9393
send_queue_events: StdRwLock<BTreeMap<OwnedRoomId, Vec<QueuedEvent>>>,
9494
dependent_send_queue_events: StdRwLock<BTreeMap<OwnedRoomId, Vec<DependentQueuedEvent>>>,
95-
dependent_send_queue_event_next_id: Mutex<usize>,
9695
}
9796

9897
// SAFETY: `new_unchecked` is safe because 20 is not zero.
@@ -124,7 +123,6 @@ impl Default for MemoryStore {
124123
custom: Default::default(),
125124
send_queue_events: Default::default(),
126125
dependent_send_queue_events: Default::default(),
127-
dependent_send_queue_event_next_id: Mutex::new(0),
128126
}
129127
}
130128
}
@@ -1005,39 +1003,31 @@ impl StateStore for MemoryStore {
10051003
async fn save_dependent_send_queue_event(
10061004
&self,
10071005
room: &RoomId,
1008-
transaction_id: &TransactionId,
1006+
parent_transaction_id: &TransactionId,
1007+
own_transaction_id: OwnedTransactionId,
10091008
content: DependentQueuedEventKind,
10101009
) -> Result<(), Self::Error> {
1011-
let id = {
1012-
let mut next_id = self.dependent_send_queue_event_next_id.lock().unwrap();
1013-
// Don't tell anyone, but sometimes I miss C++'s `x++` operator.
1014-
let id = *next_id;
1015-
*next_id += 1;
1016-
id
1017-
};
1018-
10191010
self.dependent_send_queue_events.write().unwrap().entry(room.to_owned()).or_default().push(
10201011
DependentQueuedEvent {
1021-
id,
10221012
kind: content,
1023-
transaction_id: transaction_id.to_owned(),
1013+
parent_transaction_id: parent_transaction_id.to_owned(),
1014+
own_transaction_id,
10241015
event_id: None,
10251016
},
10261017
);
1027-
10281018
Ok(())
10291019
}
10301020

10311021
async fn update_dependent_send_queue_event(
10321022
&self,
10331023
room: &RoomId,
1034-
transaction_id: &TransactionId,
1024+
parent_txn_id: &TransactionId,
10351025
event_id: OwnedEventId,
10361026
) -> Result<usize, Self::Error> {
10371027
let mut dependent_send_queue_events = self.dependent_send_queue_events.write().unwrap();
10381028
let dependents = dependent_send_queue_events.entry(room.to_owned()).or_default();
10391029
let mut num_updated = 0;
1040-
for d in dependents.iter_mut().filter(|item| item.transaction_id == transaction_id) {
1030+
for d in dependents.iter_mut().filter(|item| item.parent_transaction_id == parent_txn_id) {
10411031
d.event_id = Some(event_id.clone());
10421032
num_updated += 1;
10431033
}
@@ -1047,11 +1037,11 @@ impl StateStore for MemoryStore {
10471037
async fn remove_dependent_send_queue_event(
10481038
&self,
10491039
room: &RoomId,
1050-
id: usize,
1040+
txn_id: &TransactionId,
10511041
) -> Result<bool, Self::Error> {
10521042
let mut dependent_send_queue_events = self.dependent_send_queue_events.write().unwrap();
10531043
let dependents = dependent_send_queue_events.entry(room.to_owned()).or_default();
1054-
if let Some(pos) = dependents.iter().position(|item| item.id == id) {
1044+
if let Some(pos) = dependents.iter().position(|item| item.own_transaction_id == txn_id) {
10551045
dependents.remove(pos);
10561046
Ok(true)
10571047
} else {

crates/matrix-sdk-base/src/store/traits.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,8 @@ pub trait StateStore: AsyncTraitDeps {
459459
async fn save_dependent_send_queue_event(
460460
&self,
461461
room_id: &RoomId,
462-
transaction_id: &TransactionId,
462+
parent_txn_id: &TransactionId,
463+
own_txn_id: OwnedTransactionId,
463464
content: DependentQueuedEventKind,
464465
) -> Result<(), Self::Error>;
465466

@@ -470,7 +471,7 @@ pub trait StateStore: AsyncTraitDeps {
470471
async fn update_dependent_send_queue_event(
471472
&self,
472473
room_id: &RoomId,
473-
transaction_id: &TransactionId,
474+
parent_txn_id: &TransactionId,
474475
event_id: OwnedEventId,
475476
) -> Result<usize, Self::Error>;
476477

@@ -480,7 +481,7 @@ pub trait StateStore: AsyncTraitDeps {
480481
async fn remove_dependent_send_queue_event(
481482
&self,
482483
room: &RoomId,
483-
id: usize,
484+
own_txn_id: &TransactionId,
484485
) -> Result<bool, Self::Error>;
485486

486487
/// List all the dependent send queue events.
@@ -767,33 +768,34 @@ impl<T: StateStore> StateStore for EraseStateStoreError<T> {
767768
async fn save_dependent_send_queue_event(
768769
&self,
769770
room_id: &RoomId,
770-
transaction_id: &TransactionId,
771+
parent_txn_id: &TransactionId,
772+
own_txn_id: OwnedTransactionId,
771773
content: DependentQueuedEventKind,
772774
) -> Result<(), Self::Error> {
773775
self.0
774-
.save_dependent_send_queue_event(room_id, transaction_id, content)
776+
.save_dependent_send_queue_event(room_id, parent_txn_id, own_txn_id, content)
775777
.await
776778
.map_err(Into::into)
777779
}
778780

779781
async fn update_dependent_send_queue_event(
780782
&self,
781783
room_id: &RoomId,
782-
transaction_id: &TransactionId,
784+
parent_txn_id: &TransactionId,
783785
event_id: OwnedEventId,
784786
) -> Result<usize, Self::Error> {
785787
self.0
786-
.update_dependent_send_queue_event(room_id, transaction_id, event_id)
788+
.update_dependent_send_queue_event(room_id, parent_txn_id, event_id)
787789
.await
788790
.map_err(Into::into)
789791
}
790792

791793
async fn remove_dependent_send_queue_event(
792794
&self,
793795
room_id: &RoomId,
794-
id: usize,
796+
own_txn_id: &TransactionId,
795797
) -> Result<bool, Self::Error> {
796-
self.0.remove_dependent_send_queue_event(room_id, id).await.map_err(Into::into)
798+
self.0.remove_dependent_send_queue_event(room_id, own_txn_id).await.map_err(Into::into)
797799
}
798800

799801
async fn list_dependent_send_queue_events(
@@ -1262,7 +1264,7 @@ pub struct DependentQueuedEvent {
12621264
/// Unique identifier for this dependent queued event.
12631265
///
12641266
/// Useful for deletion.
1265-
pub id: usize,
1267+
pub own_transaction_id: OwnedTransactionId,
12661268

12671269
/// The kind of user intent.
12681270
pub kind: DependentQueuedEventKind,
@@ -1272,7 +1274,7 @@ pub struct DependentQueuedEvent {
12721274
/// Note: this is the transaction id used for the depended-on event, i.e.
12731275
/// the one that was originally sent and that's being modified with this
12741276
/// dependent event.
1275-
pub transaction_id: OwnedTransactionId,
1277+
pub parent_transaction_id: OwnedTransactionId,
12761278

12771279
/// If the parent event has been sent, the parent's event identifier
12781280
/// returned by the server once the local echo has been sent out.

crates/matrix-sdk-indexeddb/src/state_store/mod.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1575,7 +1575,8 @@ impl_state_store!({
15751575
async fn save_dependent_send_queue_event(
15761576
&self,
15771577
room_id: &RoomId,
1578-
transaction_id: &TransactionId,
1578+
parent_txn_id: &TransactionId,
1579+
own_txn_id: OwnedTransactionId,
15791580
content: DependentQueuedEventKind,
15801581
) -> Result<()> {
15811582
let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
@@ -1596,14 +1597,11 @@ impl_state_store!({
15961597
|val| self.deserialize_value::<Vec<DependentQueuedEvent>>(&val),
15971598
)?;
15981599

1599-
// Find the next id by taking the biggest ID we had before, and add 1.
1600-
let next_id = prev.iter().fold(0, |max, item| item.id.max(max)) + 1;
1601-
16021600
// Push the new event.
16031601
prev.push(DependentQueuedEvent {
1604-
id: next_id,
16051602
kind: content,
1606-
transaction_id: transaction_id.to_owned(),
1603+
parent_transaction_id: parent_txn_id.to_owned(),
1604+
own_transaction_id: own_txn_id,
16071605
event_id: None,
16081606
});
16091607

@@ -1618,7 +1616,7 @@ impl_state_store!({
16181616
async fn update_dependent_send_queue_event(
16191617
&self,
16201618
room_id: &RoomId,
1621-
transaction_id: &TransactionId,
1619+
parent_txn_id: &TransactionId,
16221620
event_id: OwnedEventId,
16231621
) -> Result<usize> {
16241622
let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
@@ -1641,7 +1639,7 @@ impl_state_store!({
16411639

16421640
// Modify all events that match.
16431641
let mut num_updated = 0;
1644-
for entry in prev.iter_mut().filter(|entry| entry.transaction_id == transaction_id) {
1642+
for entry in prev.iter_mut().filter(|entry| entry.parent_transaction_id == parent_txn_id) {
16451643
entry.event_id = Some(event_id.clone());
16461644
num_updated += 1;
16471645
}
@@ -1654,7 +1652,11 @@ impl_state_store!({
16541652
Ok(num_updated)
16551653
}
16561654

1657-
async fn remove_dependent_send_queue_event(&self, room_id: &RoomId, id: usize) -> Result<bool> {
1655+
async fn remove_dependent_send_queue_event(
1656+
&self,
1657+
room_id: &RoomId,
1658+
txn_id: &TransactionId,
1659+
) -> Result<bool> {
16581660
let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
16591661

16601662
let tx = self.inner.transaction_on_one_with_mode(
@@ -1668,7 +1670,7 @@ impl_state_store!({
16681670
// Reload the previous vector for this room.
16691671
if let Some(val) = obj.get(&encoded_key)?.await? {
16701672
let mut prev = self.deserialize_value::<Vec<DependentQueuedEvent>>(&val)?;
1671-
if let Some(pos) = prev.iter().position(|item| item.id == id) {
1673+
if let Some(pos) = prev.iter().position(|item| item.own_transaction_id == txn_id) {
16721674
prev.remove(pos);
16731675

16741676
if prev.is_empty() {

crates/matrix-sdk-sqlite/migrations/state_store/005_send_queue_dependent_events.sql

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,12 @@ CREATE TABLE "dependent_send_queue_events" (
44
"room_id" BLOB NOT NULL,
55

66
-- This is used as both a key and a value, thus neither encrypted/decrypted/hashed.
7-
"transaction_id" BLOB NOT NULL,
7+
-- This is the transaction id for the *parent* transaction, not our own.
8+
"parent_transaction_id" BLOB NOT NULL,
9+
10+
-- This is used as both a key and a value, thus neither encrypted/decrypted/hashed.
11+
-- This is a transaction id used for the dependent event itself, not the parent.
12+
"own_transaction_id" BLOB NOT NULL,
813

914
-- Used as a value (thus encrypted/decrypted), can be null.
1015
"event_id" BLOB NULL,

0 commit comments

Comments
 (0)