@@ -23,6 +23,7 @@ use matrix_sdk::{
23
23
} ;
24
24
use ruma:: { events:: AnySyncTimelineEvent , RoomVersionId } ;
25
25
use tokio:: sync:: broadcast:: error:: RecvError ;
26
+ use tokio_stream:: wrappers:: errors:: BroadcastStreamRecvError ;
26
27
use tracing:: { info, info_span, trace, warn, Instrument , Span } ;
27
28
28
29
use super :: {
@@ -426,6 +427,44 @@ impl TimelineBuilder {
426
427
} )
427
428
} ;
428
429
430
+ // TODO: Technically this should™ be the only stream we need to listen to get
431
+ // notified when we should retry to decrypt an event. We sadly can't do that,
432
+ // since the cross-process support kills the `OlmMachine` which then in
433
+ // turn kills this stream. Once this is solved remove all the other ways we
434
+ // listen for room keys.
435
+ let room_keys_received_join_handle = {
436
+ let inner = controller. clone ( ) ;
437
+ let stream = client. encryption ( ) . room_keys_received_stream ( ) . await . expect ( "" ) ;
438
+
439
+ spawn ( async move {
440
+ pin_mut ! ( stream) ;
441
+
442
+ while let Some ( room_keys) = stream. next ( ) . await {
443
+ let session_ids = match room_keys {
444
+ Ok ( room_keys) => {
445
+ let session_ids: BTreeSet < String > = room_keys
446
+ . into_iter ( )
447
+ . filter ( |info| info. room_id == inner. room ( ) . room_id ( ) )
448
+ . map ( |info| info. session_id )
449
+ . collect ( ) ;
450
+
451
+ Some ( session_ids)
452
+ }
453
+ Err ( BroadcastStreamRecvError :: Lagged ( missed_updates) ) => {
454
+ // We lagged, let's retry to decrypt anything we have, maybe something
455
+ // was received.
456
+ warn ! ( missed_updates, "The room keys stream has lagged, retrying to decrypt the whole timeline" ) ;
457
+
458
+ None
459
+ }
460
+ } ;
461
+
462
+ let room = inner. room ( ) ;
463
+ inner. retry_event_decryption ( room, session_ids) . await ;
464
+ }
465
+ } )
466
+ } ;
467
+
429
468
let timeline = Timeline {
430
469
controller,
431
470
event_cache : room_event_cache,
@@ -436,6 +475,7 @@ impl TimelineBuilder {
436
475
pinned_events_join_handle,
437
476
room_key_from_backups_join_handle,
438
477
room_key_backup_enabled_join_handle,
478
+ room_keys_received_join_handle,
439
479
local_echo_listener_handle,
440
480
_event_cache_drop_handle : event_cache_drop,
441
481
encryption_changes_handle,
0 commit comments