@@ -27,10 +27,7 @@ use tracing::{debug, instrument, trace};
27
27
28
28
use super :: {
29
29
deduplicator:: DeduplicationOutcome ,
30
- room:: {
31
- events:: { Gap , RoomEvents } ,
32
- LoadMoreEventsBackwardsOutcome , RoomEventCacheInner ,
33
- } ,
30
+ room:: { events:: Gap , LoadMoreEventsBackwardsOutcome , RoomEventCacheInner } ,
34
31
BackPaginationOutcome , EventsOrigin , Result , RoomEventCacheState , RoomEventCacheUpdate ,
35
32
} ;
36
33
use crate :: { event_cache:: EventCacheError , room:: MessagesOptions } ;
@@ -450,72 +447,6 @@ impl RoomPagination {
450
447
Ok ( backpagination_outcome)
451
448
}
452
449
453
- /// Get the latest pagination token, as stored in the room events linked
454
- /// list, or wait for it for the given amount of time.
455
- ///
456
- /// It will only wait if we *never* saw an initial previous-batch token.
457
- /// Otherwise, it will immediately skip.
458
- #[ doc( hidden) ]
459
- pub async fn get_or_wait_for_token ( & self , wait_time : Option < Duration > ) -> PaginationToken {
460
- fn get_latest ( events : & RoomEvents ) -> Option < String > {
461
- events. rchunks ( ) . find_map ( |chunk| match chunk. content ( ) {
462
- ChunkContent :: Gap ( gap) => Some ( gap. prev_token . clone ( ) ) ,
463
- ChunkContent :: Items ( ..) => None ,
464
- } )
465
- }
466
-
467
- {
468
- // Scope for the lock guard.
469
- let state = self . inner . state . read ( ) . await ;
470
-
471
- // Check if the linked chunk contains any events. If so, absence of a gap means
472
- // we've hit the start of the timeline. If not, absence of a gap
473
- // means we've never received a pagination token from sync, and we
474
- // should wait for one.
475
- let has_events = state. events ( ) . events ( ) . next ( ) . is_some ( ) ;
476
-
477
- // Fast-path: we do have a previous-batch token already.
478
- if let Some ( found) = get_latest ( state. events ( ) ) {
479
- return PaginationToken :: HasMore ( found) ;
480
- }
481
-
482
- // If we had events, and there was no gap, then we've hit the end of the
483
- // timeline.
484
- if has_events {
485
- return PaginationToken :: HitEnd ;
486
- }
487
-
488
- // If we've already waited for an initial previous-batch token before,
489
- // immediately abort.
490
- if state. waited_for_initial_prev_token {
491
- return PaginationToken :: None ;
492
- }
493
- }
494
-
495
- // If the caller didn't set a wait time, return none early.
496
- let Some ( wait_time) = wait_time else {
497
- return PaginationToken :: None ;
498
- } ;
499
-
500
- // Otherwise, wait for a notification that we received a previous-batch token.
501
- // Note the state lock is released while doing so, allowing other tasks to write
502
- // into the linked chunk.
503
- let _ = timeout ( self . inner . pagination_batch_token_notifier . notified ( ) , wait_time) . await ;
504
-
505
- let mut state = self . inner . state . write ( ) . await ;
506
-
507
- state. waited_for_initial_prev_token = true ;
508
-
509
- if let Some ( token) = get_latest ( state. events ( ) ) {
510
- PaginationToken :: HasMore ( token)
511
- } else if state. events ( ) . events ( ) . next ( ) . is_some ( ) {
512
- // See logic above, in the read lock guard scope.
513
- PaginationToken :: HitEnd
514
- } else {
515
- PaginationToken :: None
516
- }
517
- }
518
-
519
450
/// Returns a subscriber to the pagination status used for the
520
451
/// back-pagination integrated to the event cache.
521
452
pub fn status ( & self ) -> Subscriber < RoomPaginationStatus > {
@@ -545,311 +476,3 @@ impl From<Option<String>> for PaginationToken {
545
476
}
546
477
}
547
478
}
548
-
549
- #[ cfg( test) ]
550
- mod tests {
551
- // Those tests require time to work, and it does not on wasm32.
552
- #[ cfg( not( target_arch = "wasm32" ) ) ]
553
- mod time_tests {
554
- use std:: time:: { Duration , Instant } ;
555
-
556
- use assert_matches:: assert_matches;
557
- use matrix_sdk_base:: RoomState ;
558
- use matrix_sdk_test:: { async_test, event_factory:: EventFactory , ALICE } ;
559
- use ruma:: { event_id, room_id, user_id} ;
560
- use tokio:: { spawn, time:: sleep} ;
561
-
562
- use crate :: {
563
- event_cache:: { pagination:: PaginationToken , room:: events:: Gap } ,
564
- test_utils:: logged_in_client,
565
- } ;
566
-
567
- #[ async_test]
568
- async fn test_wait_no_pagination_token ( ) {
569
- let client = logged_in_client ( None ) . await ;
570
- let room_id = room_id ! ( "!galette:saucisse.bzh" ) ;
571
- client. base_client ( ) . get_or_create_room ( room_id, RoomState :: Joined ) ;
572
-
573
- let event_cache = client. event_cache ( ) ;
574
-
575
- event_cache. subscribe ( ) . unwrap ( ) ;
576
-
577
- let ( room_event_cache, _drop_handlers) = event_cache. for_room ( room_id) . await . unwrap ( ) ;
578
-
579
- let pagination = room_event_cache. pagination ( ) ;
580
-
581
- // If I have a room with no events, and try to get a pagination token without
582
- // waiting,
583
- let found = pagination. get_or_wait_for_token ( None ) . await ;
584
- // Then I don't get any pagination token.
585
- assert_matches ! ( found, PaginationToken :: None ) ;
586
-
587
- // Reset waited_for_initial_prev_token and event state.
588
- let _ = pagination. inner . state . write ( ) . await . reset ( ) . await . unwrap ( ) ;
589
-
590
- // If I wait for a back-pagination token for 0 seconds,
591
- let before = Instant :: now ( ) ;
592
- let found = pagination. get_or_wait_for_token ( Some ( Duration :: default ( ) ) ) . await ;
593
- let waited = before. elapsed ( ) ;
594
- // then I don't get any,
595
- assert_matches ! ( found, PaginationToken :: None ) ;
596
- // and I haven't waited long.
597
- assert ! ( waited. as_secs( ) < 1 ) ;
598
-
599
- // Reset waited_for_initial_prev_token state.
600
- let _ = pagination. inner . state . write ( ) . await . reset ( ) . await . unwrap ( ) ;
601
-
602
- // If I wait for a back-pagination token for 1 second,
603
- let before = Instant :: now ( ) ;
604
- let found = pagination. get_or_wait_for_token ( Some ( Duration :: from_secs ( 1 ) ) ) . await ;
605
- let waited = before. elapsed ( ) ;
606
- // then I still don't get any.
607
- assert_matches ! ( found, PaginationToken :: None ) ;
608
- // and I've waited a bit.
609
- assert ! ( waited. as_secs( ) < 2 ) ;
610
- assert ! ( waited. as_secs( ) >= 1 ) ;
611
- }
612
-
613
- #[ async_test]
614
- async fn test_wait_hit_end_of_timeline ( ) {
615
- let client = logged_in_client ( None ) . await ;
616
- let room_id = room_id ! ( "!galette:saucisse.bzh" ) ;
617
- client. base_client ( ) . get_or_create_room ( room_id, RoomState :: Joined ) ;
618
-
619
- let event_cache = client. event_cache ( ) ;
620
-
621
- event_cache. subscribe ( ) . unwrap ( ) ;
622
-
623
- let ( room_event_cache, _drop_handlers) = event_cache. for_room ( room_id) . await . unwrap ( ) ;
624
-
625
- let f = EventFactory :: new ( ) . room ( room_id) . sender ( * ALICE ) ;
626
- let pagination = room_event_cache. pagination ( ) ;
627
-
628
- // Add a previous event.
629
- room_event_cache
630
- . inner
631
- . state
632
- . write ( )
633
- . await
634
- . with_events_mut ( |events| {
635
- events. push_events ( [ f
636
- . text_msg ( "this is the start of the timeline" )
637
- . into_event ( ) ] ) ;
638
- } )
639
- . await
640
- . unwrap ( ) ;
641
-
642
- // If I have a room with events, and try to get a pagination token without
643
- // waiting,
644
- let found = pagination. get_or_wait_for_token ( None ) . await ;
645
- // I've reached the start of the timeline.
646
- assert_matches ! ( found, PaginationToken :: HitEnd ) ;
647
-
648
- // If I wait for a back-pagination token for 0 seconds,
649
- let before = Instant :: now ( ) ;
650
- let found = pagination. get_or_wait_for_token ( Some ( Duration :: default ( ) ) ) . await ;
651
- let waited = before. elapsed ( ) ;
652
- // Then I still have reached the start of the timeline.
653
- assert_matches ! ( found, PaginationToken :: HitEnd ) ;
654
- // and I've waited very little.
655
- assert ! ( waited. as_secs( ) < 1 ) ;
656
-
657
- // If I wait for a back-pagination token for 1 second,
658
- let before = Instant :: now ( ) ;
659
- let found = pagination. get_or_wait_for_token ( Some ( Duration :: from_secs ( 1 ) ) ) . await ;
660
- let waited = before. elapsed ( ) ;
661
- // then I still don't get any.
662
- assert_matches ! ( found, PaginationToken :: HitEnd ) ;
663
- // and I've waited very little (there's no point in waiting in this case).
664
- assert ! ( waited. as_secs( ) < 1 ) ;
665
-
666
- // Now, reset state. We'll add an event *after* we've started waiting, this
667
- // time.
668
- room_event_cache. clear ( ) . await . unwrap ( ) ;
669
-
670
- spawn ( async move {
671
- sleep ( Duration :: from_secs ( 1 ) ) . await ;
672
-
673
- room_event_cache
674
- . inner
675
- . state
676
- . write ( )
677
- . await
678
- . with_events_mut ( |events| {
679
- events. push_events ( [ f
680
- . text_msg ( "this is the start of the timeline" )
681
- . into_event ( ) ] ) ;
682
- } )
683
- . await
684
- . unwrap ( ) ;
685
- } ) ;
686
-
687
- // If I wait for a pagination token,
688
- let before = Instant :: now ( ) ;
689
- let found = pagination. get_or_wait_for_token ( Some ( Duration :: from_secs ( 2 ) ) ) . await ;
690
- let waited = before. elapsed ( ) ;
691
- // since sync has returned all events, and no prior gap, I've hit the end.
692
- assert_matches ! ( found, PaginationToken :: HitEnd ) ;
693
- // and I've waited for the whole duration.
694
- assert ! ( waited. as_secs( ) >= 2 ) ;
695
- assert ! ( waited. as_secs( ) < 3 ) ;
696
- }
697
-
698
- #[ async_test]
699
- async fn test_wait_for_pagination_token_already_present ( ) {
700
- let client = logged_in_client ( None ) . await ;
701
- let room_id = room_id ! ( "!galette:saucisse.bzh" ) ;
702
- client. base_client ( ) . get_or_create_room ( room_id, RoomState :: Joined ) ;
703
-
704
- let event_cache = client. event_cache ( ) ;
705
-
706
- event_cache. subscribe ( ) . unwrap ( ) ;
707
-
708
- let ( room_event_cache, _drop_handlers) = event_cache. for_room ( room_id) . await . unwrap ( ) ;
709
-
710
- let expected_token = "old" . to_owned ( ) ;
711
-
712
- // When I have events and multiple gaps, in a room,
713
- {
714
- room_event_cache
715
- . inner
716
- . state
717
- . write ( )
718
- . await
719
- . with_events_mut ( |room_events| {
720
- room_events. push_gap ( Gap { prev_token : expected_token. clone ( ) } ) ;
721
- room_events. push_events ( [ EventFactory :: new ( )
722
- . text_msg ( "yolo" )
723
- . sender ( user_id ! ( "@b:z.h" ) )
724
- . event_id ( event_id ! ( "$ida" ) )
725
- . into_event ( ) ] ) ;
726
- } )
727
- . await
728
- . unwrap ( ) ;
729
- }
730
-
731
- let pagination = room_event_cache. pagination ( ) ;
732
-
733
- // If I don't wait for a back-pagination token,
734
- let found = pagination. get_or_wait_for_token ( None ) . await ;
735
- // Then I get it.
736
- assert_eq ! ( found, PaginationToken :: HasMore ( expected_token. clone( ) ) ) ;
737
-
738
- // If I wait for a back-pagination token for 0 seconds,
739
- let before = Instant :: now ( ) ;
740
- let found = pagination. get_or_wait_for_token ( Some ( Duration :: default ( ) ) ) . await ;
741
- let waited = before. elapsed ( ) ;
742
- // then I do get one.
743
- assert_eq ! ( found, PaginationToken :: HasMore ( expected_token. clone( ) ) ) ;
744
- // and I haven't waited long.
745
- assert ! ( waited. as_millis( ) < 100 ) ;
746
-
747
- // If I wait for a back-pagination token for 1 second,
748
- let before = Instant :: now ( ) ;
749
- let found = pagination. get_or_wait_for_token ( Some ( Duration :: from_secs ( 1 ) ) ) . await ;
750
- let waited = before. elapsed ( ) ;
751
- // then I do get one.
752
- assert_eq ! ( found, PaginationToken :: HasMore ( expected_token) ) ;
753
- // and I haven't waited long.
754
- assert ! ( waited. as_millis( ) < 100 ) ;
755
- }
756
-
757
- #[ async_test]
758
- async fn test_wait_for_late_pagination_token ( ) {
759
- let client = logged_in_client ( None ) . await ;
760
- let room_id = room_id ! ( "!galette:saucisse.bzh" ) ;
761
- client. base_client ( ) . get_or_create_room ( room_id, RoomState :: Joined ) ;
762
-
763
- let event_cache = client. event_cache ( ) ;
764
-
765
- event_cache. subscribe ( ) . unwrap ( ) ;
766
-
767
- let ( room_event_cache, _drop_handles) = event_cache. for_room ( room_id) . await . unwrap ( ) ;
768
-
769
- let expected_token = "old" . to_owned ( ) ;
770
-
771
- let before = Instant :: now ( ) ;
772
- let cloned_expected_token = expected_token. clone ( ) ;
773
- let cloned_room_event_cache = room_event_cache. clone ( ) ;
774
- let insert_token_task = spawn ( async move {
775
- // If a backpagination token is inserted after 400 milliseconds,
776
- sleep ( Duration :: from_millis ( 400 ) ) . await ;
777
-
778
- cloned_room_event_cache
779
- . inner
780
- . state
781
- . write ( )
782
- . await
783
- . with_events_mut ( |events| {
784
- events. push_gap ( Gap { prev_token : cloned_expected_token } )
785
- } )
786
- . await
787
- . unwrap ( ) ;
788
- } ) ;
789
-
790
- let pagination = room_event_cache. pagination ( ) ;
791
-
792
- // Then first I don't get it (if I'm not waiting,)
793
- let found = pagination. get_or_wait_for_token ( None ) . await ;
794
- assert_matches ! ( found, PaginationToken :: None ) ;
795
-
796
- // And if I wait for the back-pagination token for 600ms,
797
- let found = pagination. get_or_wait_for_token ( Some ( Duration :: from_millis ( 600 ) ) ) . await ;
798
- let waited = before. elapsed ( ) ;
799
-
800
- // then I do get one eventually.
801
- assert_eq ! ( found, PaginationToken :: HasMore ( expected_token) ) ;
802
- // and I have waited between ~400 and ~1000 milliseconds.
803
- assert ! ( waited. as_secs( ) < 1 ) ;
804
- assert ! ( waited. as_millis( ) >= 400 ) ;
805
-
806
- // The task succeeded.
807
- insert_token_task. await . unwrap ( ) ;
808
- }
809
-
810
- #[ async_test]
811
- async fn test_get_latest_token ( ) {
812
- let client = logged_in_client ( None ) . await ;
813
- let room_id = room_id ! ( "!galette:saucisse.bzh" ) ;
814
- client. base_client ( ) . get_or_create_room ( room_id, RoomState :: Joined ) ;
815
-
816
- let event_cache = client. event_cache ( ) ;
817
-
818
- event_cache. subscribe ( ) . unwrap ( ) ;
819
-
820
- let ( room_event_cache, _drop_handles) = event_cache. for_room ( room_id) . await . unwrap ( ) ;
821
-
822
- let old_token = "old" . to_owned ( ) ;
823
- let new_token = "new" . to_owned ( ) ;
824
-
825
- // Assuming a room event cache that contains both an old and a new pagination
826
- // token, and events in between,
827
- room_event_cache
828
- . inner
829
- . state
830
- . write ( )
831
- . await
832
- . with_events_mut ( |events| {
833
- let f = EventFactory :: new ( ) . room ( room_id) . sender ( * ALICE ) ;
834
-
835
- // This simulates a valid representation of a room: first group of gap+events
836
- // were e.g. restored from the cache; second group of gap+events was received
837
- // from a subsequent sync.
838
- events. push_gap ( Gap { prev_token : old_token } ) ;
839
- events. push_events ( [ f. text_msg ( "oldest from cache" ) . into ( ) ] ) ;
840
-
841
- events. push_gap ( Gap { prev_token : new_token. clone ( ) } ) ;
842
- events. push_events ( [ f. text_msg ( "sync'd gappy timeline" ) . into ( ) ] ) ;
843
- } )
844
- . await
845
- . unwrap ( ) ;
846
-
847
- let pagination = room_event_cache. pagination ( ) ;
848
-
849
- // Retrieving the pagination token will return the most recent one, not the old
850
- // one.
851
- let found = pagination. get_or_wait_for_token ( None ) . await ;
852
- assert_eq ! ( found, PaginationToken :: HasMore ( new_token) ) ;
853
- }
854
- }
855
- }
0 commit comments