Skip to content

Commit b83889d

Browse files
committed
fix(ffi): propagate initial values before the future is picked by a runtime
1 parent cfc839f commit b83889d

File tree

3 files changed

+37
-40
lines changed

3 files changed

+37
-40
lines changed

bindings/matrix-sdk-ffi/src/room.rs

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -607,29 +607,28 @@ impl Room {
607607
})))
608608
}
609609

610-
pub fn subscribe_to_identity_status_changes(
610+
pub async fn subscribe_to_identity_status_changes(
611611
&self,
612612
listener: Box<dyn IdentityStatusChangeListener>,
613-
) -> Arc<TaskHandle> {
613+
) -> Result<Arc<TaskHandle>, ClientError> {
614614
let room = self.inner.clone();
615-
Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
616-
let status_changes = room.subscribe_to_identity_status_changes().await;
617-
if let Ok(status_changes) = status_changes {
618-
// TODO: what to do with failures?
619-
let mut status_changes = pin!(status_changes);
620-
while let Some(identity_status_changes) = status_changes.next().await {
621-
listener.call(
622-
identity_status_changes
623-
.into_iter()
624-
.map(|change| {
625-
let user_id = change.user_id.to_string();
626-
IdentityStatusChange { user_id, changed_to: change.changed_to }
627-
})
628-
.collect(),
629-
);
630-
}
615+
616+
let status_changes = room.subscribe_to_identity_status_changes().await?;
617+
618+
Ok(Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
619+
let mut status_changes = pin!(status_changes);
620+
while let Some(identity_status_changes) = status_changes.next().await {
621+
listener.call(
622+
identity_status_changes
623+
.into_iter()
624+
.map(|change| {
625+
let user_id = change.user_id.to_string();
626+
IdentityStatusChange { user_id, changed_to: change.changed_to }
627+
})
628+
.collect(),
629+
);
631630
}
632-
})))
631+
}))))
633632
}
634633

635634
/// Set (or unset) a flag on the room to indicate that the user has
@@ -1032,7 +1031,7 @@ impl Room {
10321031

10331032
Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
10341033
let subscription = room.observe_live_location_shares();
1035-
let mut stream = subscription.subscribe();
1034+
let stream = subscription.subscribe();
10361035
let mut pinned_stream = pin!(stream);
10371036

10381037
while let Some(event) = pinned_stream.next().await {

bindings/matrix-sdk-ffi/src/room_directory_search.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,11 +137,11 @@ impl RoomDirectorySearch {
137137
) -> Arc<TaskHandle> {
138138
let (initial_values, mut stream) = self.inner.read().await.results();
139139

140-
Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
141-
listener.on_update(vec![RoomDirectorySearchEntryUpdate::Reset {
142-
values: initial_values.into_iter().map(Into::into).collect(),
143-
}]);
140+
listener.on_update(vec![RoomDirectorySearchEntryUpdate::Reset {
141+
values: initial_values.into_iter().map(Into::into).collect(),
142+
}]);
144143

144+
Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
145145
while let Some(diffs) = stream.next().await {
146146
listener.on_update(diffs.into_iter().map(|diff| diff.into()).collect());
147147
}

bindings/matrix-sdk-ffi/src/timeline/mod.rs

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -214,18 +214,18 @@ impl Timeline {
214214
pub async fn add_listener(&self, listener: Box<dyn TimelineListener>) -> Arc<TaskHandle> {
215215
let (timeline_items, timeline_stream) = self.inner.subscribe().await;
216216

217-
Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
218-
pin_mut!(timeline_stream);
217+
// It's important that the initial items are passed *before* we forward the
218+
// stream updates, with a guaranteed ordering. Otherwise, it could
219+
// be that the listener be called before the initial items have been
220+
// handled by the caller. See #3535 for details.
219221

220-
// It's important that the initial items are passed *before* we forward the
221-
// stream updates, with a guaranteed ordering. Otherwise, it could
222-
// be that the listener be called before the initial items have been
223-
// handled by the caller. See #3535 for details.
222+
// First, pass all the items as a reset update.
223+
listener.on_update(vec![Arc::new(TimelineDiff::new(VectorDiff::Reset {
224+
values: timeline_items,
225+
}))]);
224226

225-
// First, pass all the items as a reset update.
226-
listener.on_update(vec![Arc::new(TimelineDiff::new(VectorDiff::Reset {
227-
values: timeline_items,
228-
}))]);
227+
Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
228+
pin_mut!(timeline_stream);
229229

230230
// Then forward new items.
231231
while let Some(diffs) = timeline_stream.next().await {
@@ -445,7 +445,7 @@ impl Timeline {
445445
Ok(())
446446
}
447447

448-
pub fn end_poll(
448+
pub async fn end_poll(
449449
self: Arc<Self>,
450450
poll_start_event_id: String,
451451
text: String,
@@ -455,11 +455,9 @@ impl Timeline {
455455
let poll_end_event_content = UnstablePollEndEventContent::new(text, poll_start_event_id);
456456
let event_content = AnyMessageLikeEventContent::UnstablePollEnd(poll_end_event_content);
457457

458-
get_runtime_handle().spawn(async move {
459-
if let Err(err) = self.inner.send(event_content).await {
460-
error!("unable to end poll: {err}");
461-
}
462-
});
458+
if let Err(err) = self.inner.send(event_content).await {
459+
error!("unable to end poll: {err}");
460+
}
463461

464462
Ok(())
465463
}

0 commit comments

Comments
 (0)