Skip to content

Commit a6c1333

Browse files
committed
sdk: add a mechanism to get all room updates at once
(instead of having to subscribe to a single room in the event cache)
1 parent 6a81cec commit a6c1333

File tree

4 files changed

+261
-7
lines changed

4 files changed

+261
-7
lines changed

crates/matrix-sdk/src/client/mod.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@ use futures_core::Stream;
2727
#[cfg(feature = "e2e-encryption")]
2828
use matrix_sdk_base::crypto::store::LockableCryptoStore;
2929
use matrix_sdk_base::{
30-
store::DynStateStore, sync::Notification, BaseClient, RoomInfoUpdate, RoomState,
31-
RoomStateFilter, SendOutsideWasm, SessionMeta, SyncOutsideWasm,
30+
store::DynStateStore,
31+
sync::{Notification, RoomUpdates},
32+
BaseClient, RoomInfoUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
33+
SyncOutsideWasm,
3234
};
3335
use matrix_sdk_common::instant::Instant;
3436
#[cfg(feature = "e2e-encryption")]
@@ -250,6 +252,10 @@ pub(crate) struct ClientInner {
250252
/// The sender-side of channels used to receive room updates.
251253
pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
252254

255+
/// The sender-side of a channel used to observe all the room updates of a
256+
/// sync response.
257+
pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
258+
253259
/// Whether the client should update its homeserver URL with the discovery
254260
/// information present in the login response.
255261
respect_login_well_known: bool,
@@ -296,6 +302,9 @@ impl ClientInner {
296302
event_handlers: Default::default(),
297303
notification_handlers: Default::default(),
298304
room_update_channels: Default::default(),
305+
// A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
306+
// ballast for all observers to catch up.
307+
room_updates_sender: broadcast::Sender::new(32),
299308
respect_login_well_known,
300309
sync_beat: event_listener::Event::new(),
301310
#[cfg(feature = "e2e-encryption")]
@@ -840,6 +849,12 @@ impl Client {
840849
}
841850
}
842851

852+
/// Subscribe to all updates to all rooms, whenever any has been received in
853+
/// a sync response.
854+
pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
855+
self.inner.room_updates_sender.subscribe()
856+
}
857+
843858
pub(crate) async fn notification_handlers(
844859
&self,
845860
) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {

crates/matrix-sdk/src/sync.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,9 @@ impl Client {
159159
self.handle_sync_events(HandlerKind::Presence, None, presence).await?;
160160
self.handle_sync_events(HandlerKind::ToDevice, None, to_device).await?;
161161

162+
// Ignore errors when there are no receivers.
163+
let _ = self.inner.room_updates_sender.send(rooms.clone());
164+
162165
for (room_id, room_info) in &rooms.join {
163166
let Some(room) = self.get_room(room_id) else {
164167
error!(?room_id, "Can't call event handler, room not found");

crates/matrix-sdk/tests/integration/client.rs

Lines changed: 65 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,14 @@ use matrix_sdk::{
77
media::{MediaFormat, MediaRequest, MediaThumbnailSize},
88
sync::RoomUpdate,
99
};
10-
use matrix_sdk_base::RoomState;
10+
use matrix_sdk_base::{sync::RoomUpdates, RoomState};
1111
use matrix_sdk_test::{
12-
async_test, sync_state_event, test_json, JoinedRoomBuilder, SyncResponseBuilder,
13-
DEFAULT_TEST_ROOM_ID,
12+
async_test, sync_state_event,
13+
test_json::{
14+
self,
15+
sync::{MIXED_INVITED_ROOM_ID, MIXED_JOINED_ROOM_ID, MIXED_LEFT_ROOM_ID, MIXED_SYNC},
16+
},
17+
JoinedRoomBuilder, SyncResponseBuilder, DEFAULT_TEST_ROOM_ID,
1418
};
1519
use ruma::{
1620
api::client::{
@@ -414,7 +418,7 @@ async fn whoami() {
414418
}
415419

416420
#[async_test]
417-
async fn room_update_channel() {
421+
async fn test_room_update_channel() {
418422
let (client, server) = logged_in_client().await;
419423

420424
let mut rx = client.subscribe_to_room_updates(&DEFAULT_TEST_ROOM_ID);
@@ -438,6 +442,62 @@ async fn room_update_channel() {
438442
assert_eq!(updates.unread_notifications.notification_count, 11);
439443
}
440444

445+
#[async_test]
446+
async fn test_subscribe_all_room_updates() {
447+
let (client, server) = logged_in_client().await;
448+
449+
let mut rx = client.subscribe_to_all_room_updates();
450+
451+
mock_sync(&server, &*MIXED_SYNC, None).await;
452+
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
453+
client.sync_once(sync_settings).await.unwrap();
454+
455+
let room_updates = rx.recv().now_or_never().unwrap().unwrap();
456+
assert_let!(RoomUpdates { leave, join, invite } = room_updates);
457+
458+
// Check the left room updates.
459+
{
460+
assert_eq!(leave.len(), 1);
461+
462+
let (room_id, update) = leave.iter().next().unwrap();
463+
464+
assert_eq!(room_id, *MIXED_LEFT_ROOM_ID);
465+
assert!(update.state.is_empty());
466+
assert_eq!(update.timeline.events.len(), 1);
467+
assert!(update.account_data.is_empty());
468+
}
469+
470+
// Check the joined room updates.
471+
{
472+
assert_eq!(join.len(), 1);
473+
474+
let (room_id, update) = join.iter().next().unwrap();
475+
476+
assert_eq!(room_id, *MIXED_JOINED_ROOM_ID);
477+
478+
assert_eq!(update.account_data.len(), 1);
479+
assert_eq!(update.ephemeral.len(), 1);
480+
assert_eq!(update.state.len(), 1);
481+
482+
assert!(update.timeline.limited);
483+
assert_eq!(update.timeline.events.len(), 1);
484+
assert_eq!(update.timeline.prev_batch, Some("t392-516_47314_0_7_1_1_1_11444_1".to_owned()));
485+
486+
assert_eq!(update.unread_notifications.highlight_count, 0);
487+
assert_eq!(update.unread_notifications.notification_count, 11);
488+
}
489+
490+
// Check the invited room updates.
491+
{
492+
assert_eq!(invite.len(), 1);
493+
494+
let (room_id, update) = invite.iter().next().unwrap();
495+
496+
assert_eq!(room_id, *MIXED_INVITED_ROOM_ID);
497+
assert_eq!(update.invite_state.events.len(), 2);
498+
}
499+
}
500+
441501
// Check that the `Room::is_encrypted()` is properly deduplicated, meaning we
442502
// only make a single request to the server, and that multiple calls do return
443503
// the same result.
@@ -906,7 +966,7 @@ async fn create_dm_error() {
906966
}
907967

908968
#[async_test]
909-
async fn ambiguity_changes() {
969+
async fn test_ambiguity_changes() {
910970
let (client, server) = logged_in_client().await;
911971

912972
let example_id = user_id!("@example:localhost");

testing/matrix-sdk-test/src/test_json/sync.rs

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! Complete sync responses.
22
33
use once_cell::sync::Lazy;
4+
use ruma::{room_id, RoomId};
45
use serde_json::{json, Value as JsonValue};
56

67
use crate::DEFAULT_TEST_ROOM_ID;
@@ -1230,6 +1231,181 @@ pub static LEAVE_SYNC_EVENT: Lazy<JsonValue> = Lazy::new(|| {
12301231
})
12311232
});
12321233

1234+
/// In the [`MIXED_SYNC`], the room id of the joined room.
1235+
pub static MIXED_JOINED_ROOM_ID: Lazy<&RoomId> =
1236+
Lazy::new(|| room_id!("!SVkFJHzfwvuaIEawgC:localhost"));
1237+
/// In the [`MIXED_SYNC`], the room id of the left room.
1238+
pub static MIXED_LEFT_ROOM_ID: Lazy<&RoomId> =
1239+
Lazy::new(|| room_id!("!SVkFJHzfwvuaIEawgD:localhost"));
1240+
/// In the [`MIXED_SYNC`], the room id of the invited room.
1241+
pub static MIXED_INVITED_ROOM_ID: Lazy<&RoomId> =
1242+
Lazy::new(|| room_id!("!SVkFJHzfwvuaIEawgE:localhost"));
1243+
1244+
/// A sync that contains updates to joined/invited/left rooms.
1245+
pub static MIXED_SYNC: Lazy<JsonValue> = Lazy::new(|| {
1246+
json!({
1247+
"account_data": {
1248+
"events": []
1249+
},
1250+
"to_device": {
1251+
"events": []
1252+
},
1253+
"device_lists": {
1254+
"changed": [],
1255+
"left": []
1256+
},
1257+
"presence": {
1258+
"events": []
1259+
},
1260+
"rooms": {
1261+
"join": {
1262+
*MIXED_JOINED_ROOM_ID: {
1263+
"summary": {},
1264+
"account_data": {
1265+
"events": [
1266+
{
1267+
"content": {
1268+
"event_id": "$someplace:example.org"
1269+
},
1270+
"room_id": "!roomid:room.com",
1271+
"type": "m.fully_read"
1272+
}
1273+
]
1274+
},
1275+
"ephemeral": {
1276+
"events": [
1277+
{
1278+
"content": {
1279+
"$151680659217152dPKjd:localhost": {
1280+
"m.read": {
1281+
"@example:localhost": {
1282+
"ts": 151680989
1283+
}
1284+
}
1285+
}
1286+
},
1287+
"room_id": *MIXED_JOINED_ROOM_ID,
1288+
"type": "m.receipt"
1289+
},
1290+
]
1291+
},
1292+
"state": {
1293+
"events": [
1294+
{
1295+
"content": {
1296+
"alias": "#tutorial:localhost"
1297+
},
1298+
"event_id": "$15139375513VdeRF:localhost",
1299+
"origin_server_ts": 151393755000000_u64,
1300+
"sender": "@example:localhost",
1301+
"state_key": "",
1302+
"type": "m.room.canonical_alias",
1303+
"unsigned": {
1304+
"age": 703422
1305+
}
1306+
},
1307+
]
1308+
},
1309+
"timeline": {
1310+
"events": [
1311+
{
1312+
"content": {
1313+
"body": "baba",
1314+
"format": "org.matrix.custom.html",
1315+
"formatted_body": "<strong>baba</strong>",
1316+
"msgtype": "m.text"
1317+
},
1318+
"event_id": "$152037280074GZeOm:localhost",
1319+
"origin_server_ts": 152037280000000_u64,
1320+
"sender": "@example:localhost",
1321+
"type": "m.room.message",
1322+
"unsigned": {
1323+
"age": 598971425
1324+
}
1325+
}
1326+
],
1327+
"limited": true,
1328+
"prev_batch": "t392-516_47314_0_7_1_1_1_11444_1"
1329+
},
1330+
"unread_notifications": {
1331+
"highlight_count": 0,
1332+
"notification_count": 11
1333+
}
1334+
}
1335+
},
1336+
"invite": {
1337+
*MIXED_INVITED_ROOM_ID: {
1338+
"invite_state": {
1339+
"events": [
1340+
{
1341+
"sender": "@alice:example.com",
1342+
"type": "m.room.name",
1343+
"state_key": "",
1344+
"content": {
1345+
"name": "My Room Name"
1346+
}
1347+
},
1348+
{
1349+
"sender": "@alice:example.com",
1350+
"type": "m.room.member",
1351+
"state_key": "@bob:example.com",
1352+
"content": {
1353+
"membership": "invite"
1354+
}
1355+
}
1356+
]
1357+
}
1358+
}
1359+
},
1360+
"leave": {
1361+
*MIXED_LEFT_ROOM_ID: {
1362+
"timeline": {
1363+
"events": [
1364+
{
1365+
"content": {
1366+
"membership": "leave"
1367+
},
1368+
"origin_server_ts": 158957809000000_u64,
1369+
"sender": "@example:localhost",
1370+
"state_key": "@example:localhost",
1371+
"type": "m.room.member",
1372+
"unsigned": {
1373+
"replaces_state": "$blahblah",
1374+
"prev_content": {
1375+
"avatar_url": null,
1376+
"displayname": "me",
1377+
"membership": "invite"
1378+
},
1379+
"prev_sender": "@2example:localhost",
1380+
"age": 1757
1381+
},
1382+
"event_id": "$lQQ116Y-XqcjpSUGpuz36rNntUvOSpTjuaIvmtQ2AwA"
1383+
}
1384+
],
1385+
"prev_batch": "toktok",
1386+
"limited": false
1387+
},
1388+
"state": {
1389+
"events": []
1390+
},
1391+
"account_data": {
1392+
"events": []
1393+
}
1394+
}
1395+
}
1396+
},
1397+
"groups": {
1398+
"join": {},
1399+
"invite": {},
1400+
"leave": {}
1401+
},
1402+
"device_one_time_keys_count": {
1403+
"signed_curve25519": 50
1404+
},
1405+
"next_batch": "s1380317562_757269739_1655566_503953763_334052043_1209862_55290918_65705002_101146"
1406+
})
1407+
});
1408+
12331409
pub static VOIP_SYNC: Lazy<JsonValue> = Lazy::new(|| {
12341410
json!({
12351411
"device_one_time_keys_count": {},

0 commit comments

Comments
 (0)