Skip to content

ffi: Refactor timeline creation and lifetimes #5058

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
May 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
14 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions benchmarks/benches/room_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use matrix_sdk_base::{
};
use matrix_sdk_sqlite::SqliteStateStore;
use matrix_sdk_test::{event_factory::EventFactory, JoinedRoomBuilder, StateTestEvent};
use matrix_sdk_ui::{timeline::TimelineFocus, Timeline};
use matrix_sdk_ui::timeline::{TimelineBuilder, TimelineFocus};
use ruma::{
api::client::membership::get_member_events,
device_id,
Expand Down Expand Up @@ -182,7 +182,7 @@ pub fn load_pinned_events_benchmark(c: &mut Criterion) {
.await
.unwrap();

let timeline = Timeline::builder(&room)
let timeline = TimelineBuilder::new(&room)
.with_focus(TimelineFocus::PinnedEvents {
max_events_to_load: 100,
max_concurrent_requests: 10,
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/benches/timeline.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use matrix_sdk::test_utils::mocks::MatrixMockServer;
use matrix_sdk_test::{event_factory::EventFactory, JoinedRoomBuilder, StateTestEvent};
use matrix_sdk_ui::Timeline;
use matrix_sdk_ui::timeline::TimelineBuilder;
use ruma::{
events::room::message::RoomMessageEventContentWithoutRelation, owned_room_id, owned_user_id,
EventId,
Expand Down Expand Up @@ -102,7 +102,7 @@ pub fn create_timeline_with_initial_events(c: &mut Criterion) {
BenchmarkId::new("create_timeline_with_initial_events", format!("{NUM_EVENTS} events")),
|b| {
b.to_async(&runtime).iter(|| async {
let timeline = Timeline::builder(&room)
let timeline = TimelineBuilder::new(&room)
.track_read_marker_and_receipts()
.build()
.await
Expand Down
104 changes: 81 additions & 23 deletions bindings/matrix-sdk-ffi/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::{
collections::HashMap,
fmt::Debug,
sync::{Arc, RwLock},
sync::{Arc, OnceLock, RwLock},
time::Duration,
};

use anyhow::{anyhow, Context as _};
Expand Down Expand Up @@ -38,9 +39,12 @@ use matrix_sdk::{
store::RoomLoadSettings as SdkRoomLoadSettings,
AuthApi, AuthSession, Client as MatrixClient, SessionChange, SessionTokens,
};
use matrix_sdk_ui::notification_client::{
NotificationClient as MatrixNotificationClient,
NotificationProcessSetup as MatrixNotificationProcessSetup,
use matrix_sdk_ui::{
notification_client::{
NotificationClient as MatrixNotificationClient,
NotificationProcessSetup as MatrixNotificationProcessSetup,
},
unable_to_decrypt_hook::UtdHookManager,
};
use mime::Mime;
use ruma::{
Expand Down Expand Up @@ -94,6 +98,7 @@ use crate::{
},
sync_service::{SyncService, SyncServiceBuilder},
task_handle::TaskHandle,
utd::{UnableToDecryptDelegate, UtdHook},
utils::AsyncRuntimeDropped,
ClientError,
};
Expand Down Expand Up @@ -161,7 +166,6 @@ impl From<PushFormat> for RumaPushFormat {
#[matrix_sdk_ffi_macros::export(callback_interface)]
pub trait ClientDelegate: Sync + Send {
fn did_receive_auth_error(&self, is_soft_logout: bool);
fn did_refresh_tokens(&self);
}

#[matrix_sdk_ffi_macros::export(callback_interface)]
Expand Down Expand Up @@ -215,7 +219,8 @@ impl From<matrix_sdk::TransmissionProgress> for TransmissionProgress {
#[derive(uniffi::Object)]
pub struct Client {
pub(crate) inner: AsyncRuntimeDropped<MatrixClient>,
delegate: RwLock<Option<Arc<dyn ClientDelegate>>>,
delegate: OnceLock<Arc<dyn ClientDelegate>>,
utd_hook_manager: OnceLock<Arc<UtdHookManager>>,
session_verification_controller:
Arc<tokio::sync::RwLock<Option<SessionVerificationController>>>,
}
Expand Down Expand Up @@ -258,8 +263,9 @@ impl Client {
sdk_client.cross_process_store_locks_holder_name().to_owned();

let client = Client {
inner: AsyncRuntimeDropped::new(sdk_client),
delegate: RwLock::new(None),
inner: AsyncRuntimeDropped::new(sdk_client.clone()),
delegate: OnceLock::new(),
utd_hook_manager: OnceLock::new(),
session_verification_controller,
};

Expand Down Expand Up @@ -747,11 +753,20 @@ impl Client {
self.inner.available_sliding_sync_versions().await.into_iter().map(Into::into).collect()
}

/// Sets the [ClientDelegate] which will inform about authentication errors.
/// Returns an error if the delegate was already set.
pub fn set_delegate(
self: Arc<Self>,
delegate: Option<Box<dyn ClientDelegate>>,
) -> Option<Arc<TaskHandle>> {
delegate.map(|delegate| {
) -> Result<Option<Arc<TaskHandle>>, ClientError> {
if self.delegate.get().is_some() {
return Err(ClientError::Generic {
msg: "Delegate already initialized".to_owned(),
details: None,
});
}

Ok(delegate.map(|delegate| {
let mut session_change_receiver = self.inner.subscribe_to_session_changes();
let client_clone = self.clone();
let session_change_task = get_runtime_handle().spawn(async move {
Expand All @@ -767,9 +782,44 @@ impl Client {
}
});

*self.delegate.write().unwrap() = Some(Arc::from(delegate));
self.delegate.get_or_init(|| Arc::from(delegate));

Arc::new(TaskHandle::new(session_change_task))
})
}))
}

/// Sets the [UnableToDecryptDelegate] which will inform about UTDs.
/// Returns an error if the delegate was already set.
pub async fn set_utd_delegate(
self: Arc<Self>,
utd_delegate: Box<dyn UnableToDecryptDelegate>,
) -> Result<(), ClientError> {
if self.utd_hook_manager.get().is_some() {
return Err(ClientError::Generic {
msg: "UTD delegate already initialized".to_owned(),
details: None,
});
}

// UTDs detected before this duration may be reclassified as "late decryption"
// events (or discarded, if they get decrypted fast enough).
const UTD_HOOK_GRACE_PERIOD: Duration = Duration::from_secs(60);

let mut utd_hook_manager = UtdHookManager::new(
Arc::new(UtdHook { delegate: utd_delegate.into() }),
(*self.inner).clone(),
)
.with_max_delay(UTD_HOOK_GRACE_PERIOD);

if let Err(e) = utd_hook_manager.reload_from_store().await {
error!("Unable to reload UTD hook data from data store: {}", e);
// Carry on with the setup anyway; we shouldn't fail setup just
// because the UTD hook failed to load its data.
}

self.utd_hook_manager.get_or_init(|| Arc::new(utd_hook_manager));

Ok(())
}

pub fn session(&self) -> Result<Session, ClientError> {
Expand Down Expand Up @@ -1028,7 +1078,11 @@ impl Client {
}

pub fn rooms(&self) -> Vec<Arc<Room>> {
self.inner.rooms().into_iter().map(|room| Arc::new(Room::new(room))).collect()
self.inner
.rooms()
.into_iter()
.map(|room| Arc::new(Room::new(room, self.utd_hook_manager.get().cloned())))
.collect()
}

/// Get a room by its ID.
Expand All @@ -1045,14 +1099,17 @@ impl Client {
pub fn get_room(&self, room_id: String) -> Result<Option<Arc<Room>>, ClientError> {
let room_id = RoomId::parse(room_id)?;
let sdk_room = self.inner.get_room(&room_id);
let room = sdk_room.map(|room| Arc::new(Room::new(room)));

let room =
sdk_room.map(|room| Arc::new(Room::new(room, self.utd_hook_manager.get().cloned())));
Ok(room)
}

pub fn get_dm_room(&self, user_id: String) -> Result<Option<Arc<Room>>, ClientError> {
let user_id = UserId::parse(user_id)?;
let sdk_room = self.inner.get_dm_room(&user_id);
let dm = sdk_room.map(|room| Arc::new(Room::new(room)));
let dm =
sdk_room.map(|room| Arc::new(Room::new(room, self.utd_hook_manager.get().cloned())));
Ok(dm)
}

Expand Down Expand Up @@ -1159,7 +1216,7 @@ impl Client {
pub async fn join_room_by_id(&self, room_id: String) -> Result<Arc<Room>, ClientError> {
let room_id = RoomId::parse(room_id)?;
let room = self.inner.join_room_by_id(room_id.as_ref()).await?;
Ok(Arc::new(Room::new(room)))
Ok(Arc::new(Room::new(room, self.utd_hook_manager.get().cloned())))
}

/// Join a room by its ID or alias.
Expand All @@ -1180,7 +1237,7 @@ impl Client {
.collect::<Result<Vec<_>, _>>()?;
let room =
self.inner.join_room_by_id_or_alias(room_id.as_ref(), server_names.as_ref()).await?;
Ok(Arc::new(Room::new(room)))
Ok(Arc::new(Room::new(room, self.utd_hook_manager.get().cloned())))
}

/// Knock on a room to join it using its ID or alias.
Expand All @@ -1194,7 +1251,7 @@ impl Client {
let server_names =
server_names.iter().map(ServerName::parse).collect::<Result<Vec<_>, _>>()?;
let room = self.inner.knock(room_id, reason, server_names).await?;
Ok(Arc::new(Room::new(room)))
Ok(Arc::new(Room::new(room, self.utd_hook_manager.get().cloned())))
}

pub async fn get_recently_visited_rooms(&self) -> Result<Vec<String>, ClientError> {
Expand Down Expand Up @@ -1288,7 +1345,10 @@ impl Client {
/// or an externally set timeout happens.**
pub async fn await_room_remote_echo(&self, room_id: String) -> Result<Arc<Room>, ClientError> {
let room_id = RoomId::parse(room_id)?;
Ok(Arc::new(Room::new(self.inner.await_room_remote_echo(&room_id).await)))
Ok(Arc::new(Room::new(
self.inner.await_room_remote_echo(&room_id).await,
self.utd_hook_manager.get().cloned(),
)))
}

/// Lets the user know whether this is an `m.login.password` based
Expand Down Expand Up @@ -1451,15 +1511,13 @@ impl From<&search_users::v3::User> for UserProfile {

impl Client {
fn process_session_change(&self, session_change: SessionChange) {
if let Some(delegate) = self.delegate.read().unwrap().clone() {
if let Some(delegate) = self.delegate.get().cloned() {
debug!("Applying session change: {session_change:?}");
get_runtime_handle().spawn_blocking(move || match session_change {
SessionChange::UnknownToken { soft_logout } => {
delegate.did_receive_auth_error(soft_logout);
}
SessionChange::TokensRefreshed => {
delegate.did_refresh_tokens();
}
SessionChange::TokensRefreshed => {}
});
} else {
debug!(
Expand Down
1 change: 1 addition & 0 deletions bindings/matrix-sdk-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ mod sync_service;
mod task_handle;
mod timeline;
mod tracing;
mod utd;
mod utils;
mod widget;

Expand Down
2 changes: 1 addition & 1 deletion bindings/matrix-sdk-ffi/src/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl NotificationClient {
pub fn get_room(&self, room_id: String) -> Result<Option<Arc<Room>>, ClientError> {
let room_id = RoomId::parse(room_id)?;
let sdk_room = self.inner.get_room(&room_id);
let room = sdk_room.map(|room| Arc::new(Room::new(room)));
let room = sdk_room.map(|room| Arc::new(Room::new(room, None)));
Ok(room)
}

Expand Down
30 changes: 19 additions & 11 deletions bindings/matrix-sdk-ffi/src/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use matrix_sdk::{
ComposerDraft as SdkComposerDraft, ComposerDraftType as SdkComposerDraftType, EncryptionState,
RoomHero as SdkRoomHero, RoomMemberships, RoomState,
};
use matrix_sdk_ui::timeline::{default_event_filter, RoomExt};
use matrix_sdk_ui::{
timeline::{default_event_filter, RoomExt, TimelineBuilder},
unable_to_decrypt_hook::UtdHookManager,
};
use mime::Mime;
use ruma::{
assign,
Expand Down Expand Up @@ -75,16 +78,13 @@ pub(crate) type TimelineLock = Arc<RwLock<Option<Arc<Timeline>>>>;
#[derive(uniffi::Object)]
pub struct Room {
pub(super) inner: SdkRoom,
utd_hook_manager: Option<Arc<UtdHookManager>>,
timeline: TimelineLock,
}

impl Room {
pub(crate) fn new(inner: SdkRoom) -> Self {
Room { inner, timeline: Default::default() }
}

pub(crate) fn with_timeline(inner: SdkRoom, timeline: TimelineLock) -> Self {
Room { inner, timeline }
pub(crate) fn new(inner: SdkRoom, utd_hook_manager: Option<Arc<UtdHookManager>>) -> Self {
Room { inner, timeline: Default::default(), utd_hook_manager }
}
}

Expand Down Expand Up @@ -189,7 +189,7 @@ impl Room {
&self,
configuration: TimelineConfiguration,
) -> Result<Arc<Timeline>, ClientError> {
let mut builder = matrix_sdk_ui::timeline::Timeline::builder(&self.inner);
let mut builder = matrix_sdk_ui::timeline::TimelineBuilder::new(&self.inner);

builder = builder
.with_focus(configuration.focus.try_into()?)
Expand Down Expand Up @@ -233,6 +233,14 @@ impl Room {
builder = builder.with_internal_id_prefix(internal_id_prefix);
}

if configuration.report_utds {
if let Some(utd_hook_manager) = self.utd_hook_manager.clone() {
builder = builder.with_unable_to_decrypt_hook(utd_hook_manager);
} else {
return Err(ClientError::Generic { msg: "Failed creating timeline because the configuration is set to report UTDs but no hook manager is set".to_owned(), details: None });
}
}

let timeline = builder.build().await?;

Ok(Timeline::new(timeline))
Expand Down Expand Up @@ -656,11 +664,11 @@ impl Room {
/// Mark a room as read, by attaching a read receipt on the latest event.
///
/// Note: this does NOT unset the unread flag; it's the caller's
/// responsibility to do so, if needs be.
/// responsibility to do so, if need be.
pub async fn mark_as_read(&self, receipt_type: ReceiptType) -> Result<(), ClientError> {
let timeline = self.timeline().await?;
let timeline = TimelineBuilder::new(&self.inner).build().await?;

timeline.mark_as_read(receipt_type).await?;
timeline.mark_as_read(receipt_type.into()).await?;
Ok(())
}

Expand Down
Loading
Loading