Skip to content

fix(ffi): propagate initial values before the future is picked by a runtime #4820

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 19 additions & 20 deletions bindings/matrix-sdk-ffi/src/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -607,29 +607,28 @@ impl Room {
})))
}

pub fn subscribe_to_identity_status_changes(
pub async fn subscribe_to_identity_status_changes(
&self,
listener: Box<dyn IdentityStatusChangeListener>,
) -> Arc<TaskHandle> {
) -> Result<Arc<TaskHandle>, ClientError> {
let room = self.inner.clone();
Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
let status_changes = room.subscribe_to_identity_status_changes().await;
if let Ok(status_changes) = status_changes {
// TODO: what to do with failures?
let mut status_changes = pin!(status_changes);
while let Some(identity_status_changes) = status_changes.next().await {
listener.call(
identity_status_changes
.into_iter()
.map(|change| {
let user_id = change.user_id.to_string();
IdentityStatusChange { user_id, changed_to: change.changed_to }
})
.collect(),
);
}

let status_changes = room.subscribe_to_identity_status_changes().await?;

Ok(Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
let mut status_changes = pin!(status_changes);
while let Some(identity_status_changes) = status_changes.next().await {
listener.call(
identity_status_changes
.into_iter()
.map(|change| {
let user_id = change.user_id.to_string();
IdentityStatusChange { user_id, changed_to: change.changed_to }
})
.collect(),
);
}
})))
}))))
}

/// Set (or unset) a flag on the room to indicate that the user has
Expand Down Expand Up @@ -1032,7 +1031,7 @@ impl Room {

Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
let subscription = room.observe_live_location_shares();
let mut stream = subscription.subscribe();
let stream = subscription.subscribe();
let mut pinned_stream = pin!(stream);

while let Some(event) = pinned_stream.next().await {
Expand Down
8 changes: 4 additions & 4 deletions bindings/matrix-sdk-ffi/src/room_directory_search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,11 @@ impl RoomDirectorySearch {
) -> Arc<TaskHandle> {
let (initial_values, mut stream) = self.inner.read().await.results();

Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
listener.on_update(vec![RoomDirectorySearchEntryUpdate::Reset {
values: initial_values.into_iter().map(Into::into).collect(),
}]);
listener.on_update(vec![RoomDirectorySearchEntryUpdate::Reset {
values: initial_values.into_iter().map(Into::into).collect(),
}]);

Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
while let Some(diffs) = stream.next().await {
listener.on_update(diffs.into_iter().map(|diff| diff.into()).collect());
}
Expand Down
30 changes: 14 additions & 16 deletions bindings/matrix-sdk-ffi/src/timeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,18 +215,18 @@ impl Timeline {
pub async fn add_listener(&self, listener: Box<dyn TimelineListener>) -> Arc<TaskHandle> {
let (timeline_items, timeline_stream) = self.inner.subscribe().await;

Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
pin_mut!(timeline_stream);
// It's important that the initial items are passed *before* we forward the
// stream updates, with a guaranteed ordering. Otherwise, it could
// be that the listener be called before the initial items have been
// handled by the caller. See #3535 for details.

// It's important that the initial items are passed *before* we forward the
// stream updates, with a guaranteed ordering. Otherwise, it could
// be that the listener be called before the initial items have been
// handled by the caller. See #3535 for details.
// First, pass all the items as a reset update.
listener.on_update(vec![Arc::new(TimelineDiff::new(VectorDiff::Reset {
values: timeline_items,
}))]);

// First, pass all the items as a reset update.
listener.on_update(vec![Arc::new(TimelineDiff::new(VectorDiff::Reset {
values: timeline_items,
}))]);
Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
pin_mut!(timeline_stream);

// Then forward new items.
while let Some(diffs) = timeline_stream.next().await {
Expand Down Expand Up @@ -446,7 +446,7 @@ impl Timeline {
Ok(())
}

pub fn end_poll(
pub async fn end_poll(
self: Arc<Self>,
poll_start_event_id: String,
text: String,
Expand All @@ -456,11 +456,9 @@ impl Timeline {
let poll_end_event_content = UnstablePollEndEventContent::new(text, poll_start_event_id);
let event_content = AnyMessageLikeEventContent::UnstablePollEnd(poll_end_event_content);

get_runtime_handle().spawn(async move {
if let Err(err) = self.inner.send(event_content).await {
error!("unable to end poll: {err}");
}
});
if let Err(err) = self.inner.send(event_content).await {
error!("unable to end poll: {err}");
}

Ok(())
}
Expand Down
Loading