@@ -62,7 +62,7 @@ use ruma::{
62
62
use tokio:: {
63
63
sync:: {
64
64
broadcast:: { error:: RecvError , Receiver , Sender } ,
65
- Mutex , Notify , RwLock , RwLockReadGuard ,
65
+ Mutex , Notify , RwLock , RwLockReadGuard , RwLockWriteGuard ,
66
66
} ,
67
67
time:: timeout,
68
68
} ;
@@ -249,13 +249,14 @@ impl EventCache {
249
249
// We could have received events during a previous sync; remove them all, since
250
250
// we can't know where to insert the "initial events" with respect to
251
251
// them.
252
- room_cache. inner . events . write ( ) . await . reset ( ) ;
252
+ // let mut room_events = room_cache.inner.events.write().await;
253
+ // room_events.reset();
253
254
254
- let _ = room_cache. inner . sender . send ( RoomEventCacheUpdate :: Clear ) ;
255
+ // let _ = room_cache.inner.sender.send(RoomEventCacheUpdate::Clear);
255
256
256
257
room_cache
257
258
. inner
258
- . append_events (
259
+ . replace_all_events_by (
259
260
events,
260
261
prev_batch,
261
262
Default :: default ( ) ,
@@ -482,26 +483,30 @@ impl RoomEventCacheInner {
482
483
// Ideally we'd try to reconcile existing events against those received in the
483
484
// timeline, but we're not there yet. In the meanwhile, clear the
484
485
// items from the room. TODO: implement Smart Matching™.
485
- trace ! ( "limited timeline, clearing all previous events" ) ;
486
-
487
- // Clear internal state (events, pagination tokens, etc.).
488
- self . events . write ( ) . await . reset ( ) ;
489
-
490
- // Propagate to observers.
491
- let _ = self . sender . send ( RoomEventCacheUpdate :: Clear ) ;
486
+ trace ! ( "limited timeline, clearing all previous events and pushing new events" ) ;
487
+
488
+ self . replace_all_events_by (
489
+ timeline. events ,
490
+ timeline. prev_batch ,
491
+ account_data,
492
+ ephemeral,
493
+ ambiguity_changes,
494
+ )
495
+ . await ?;
496
+ } else {
497
+ // Add all the events to the backend.
498
+ trace ! ( "adding new events" ) ;
499
+
500
+ self . append_new_events (
501
+ timeline. events ,
502
+ timeline. prev_batch ,
503
+ account_data,
504
+ ephemeral,
505
+ ambiguity_changes,
506
+ )
507
+ . await ?;
492
508
}
493
509
494
- // Add all the events to the backend.
495
- trace ! ( "adding new events" ) ;
496
- self . append_events (
497
- timeline. events ,
498
- timeline. prev_batch ,
499
- account_data,
500
- ephemeral,
501
- ambiguity_changes,
502
- )
503
- . await ?;
504
-
505
510
Ok ( ( ) )
506
511
}
507
512
@@ -511,15 +516,72 @@ impl RoomEventCacheInner {
511
516
Ok ( ( ) )
512
517
}
513
518
519
+ // Remove existing events, and append a set of events to the room cache and
520
+ // storage, notifying observers.
521
+ async fn replace_all_events_by (
522
+ & self ,
523
+ events : Vec < SyncTimelineEvent > ,
524
+ prev_batch : Option < String > ,
525
+ account_data : Vec < Raw < AnyRoomAccountDataEvent > > ,
526
+ ephemeral : Vec < Raw < AnySyncEphemeralRoomEvent > > ,
527
+ ambiguity_changes : BTreeMap < OwnedEventId , AmbiguityChange > ,
528
+ ) -> Result < ( ) > {
529
+ // Acquire the lock.
530
+ let mut room_events = self . events . write ( ) . await ;
531
+
532
+ // Reset the events.
533
+ room_events. reset ( ) ;
534
+
535
+ // Propagate to observers.
536
+ let _ = self . sender . send ( RoomEventCacheUpdate :: Clear ) ;
537
+
538
+ // Push the new events.
539
+ self . append_events_locked_impl (
540
+ Some ( room_events) ,
541
+ events,
542
+ prev_batch,
543
+ account_data,
544
+ ephemeral,
545
+ ambiguity_changes,
546
+ )
547
+ . await
548
+ }
549
+
514
550
/// Append a set of events to the room cache and storage, notifying
515
551
/// observers.
516
- async fn append_events (
552
+ async fn append_new_events (
517
553
& self ,
518
554
events : Vec < SyncTimelineEvent > ,
519
555
prev_batch : Option < String > ,
520
556
account_data : Vec < Raw < AnyRoomAccountDataEvent > > ,
521
557
ephemeral : Vec < Raw < AnySyncEphemeralRoomEvent > > ,
522
558
ambiguity_changes : BTreeMap < OwnedEventId , AmbiguityChange > ,
559
+ ) -> Result < ( ) > {
560
+ // Push the new events.
561
+ self . append_events_locked_impl (
562
+ None ,
563
+ events,
564
+ prev_batch,
565
+ account_data,
566
+ ephemeral,
567
+ ambiguity_changes,
568
+ )
569
+ . await
570
+ }
571
+
572
+ /// Append a set of events, with an attached lock.
573
+ ///
574
+ /// If the lock `room_events` is `None`, one will be created.
575
+ ///
576
+ /// This is a private implementation. It must not be exposed publicly.
577
+ async fn append_events_locked_impl (
578
+ & self ,
579
+ room_events : Option < RwLockWriteGuard < ' _ , RoomEvents > > ,
580
+ events : Vec < SyncTimelineEvent > ,
581
+ prev_batch : Option < String > ,
582
+ account_data : Vec < Raw < AnyRoomAccountDataEvent > > ,
583
+ ephemeral : Vec < Raw < AnySyncEphemeralRoomEvent > > ,
584
+ ambiguity_changes : BTreeMap < OwnedEventId , AmbiguityChange > ,
523
585
) -> Result < ( ) > {
524
586
if events. is_empty ( )
525
587
&& prev_batch. is_none ( )
@@ -530,11 +592,14 @@ impl RoomEventCacheInner {
530
592
return Ok ( ( ) ) ;
531
593
}
532
594
595
+ let mut room_events = match room_events {
596
+ Some ( room_events) => room_events,
597
+ None => self . events . write ( ) . await ,
598
+ } ;
599
+
533
600
// Add the previous back-pagination token (if present), followed by the timeline
534
601
// events themselves.
535
602
{
536
- let mut room_events = self . events . write ( ) . await ;
537
-
538
603
if let Some ( prev_token) = & prev_batch {
539
604
room_events. push_gap ( Gap { prev_token : PaginationToken ( prev_token. clone ( ) ) } ) ;
540
605
}
0 commit comments