@@ -387,7 +387,7 @@ async fn test_backpaginate_once() {
387
387
}
388
388
389
389
#[ async_test]
390
- async fn test_backpaginate_multiple_iterations ( ) {
390
+ async fn test_backpaginate_many_times_with_many_iterations ( ) {
391
391
let ( client, server) = logged_in_client_with_server ( ) . await ;
392
392
393
393
let event_cache = client. event_cache ( ) ;
@@ -491,6 +491,120 @@ async fn test_backpaginate_multiple_iterations() {
491
491
assert ! ( room_stream. is_empty( ) ) ;
492
492
}
493
493
494
+ #[ async_test]
495
+ async fn test_backpaginate_many_times_with_one_iteration ( ) {
496
+ let ( client, server) = logged_in_client_with_server ( ) . await ;
497
+
498
+ let event_cache = client. event_cache ( ) ;
499
+
500
+ // Immediately subscribe the event cache to sync updates.
501
+ event_cache. subscribe ( ) . unwrap ( ) ;
502
+
503
+ // If I sync and get informed I've joined The Room, and get a previous batch
504
+ // token,
505
+ let room_id = room_id ! ( "!omelette:fromage.fr" ) ;
506
+
507
+ let event_builder = EventBuilder :: new ( ) ;
508
+ let mut sync_builder = SyncResponseBuilder :: new ( ) ;
509
+
510
+ {
511
+ sync_builder. add_joined_room (
512
+ JoinedRoomBuilder :: new ( room_id)
513
+ // Note to self: a timeline must have at least single event to be properly
514
+ // serialized.
515
+ . add_timeline_event ( event_builder. make_sync_message_event (
516
+ user_id ! ( "@a:b.c" ) ,
517
+ RoomMessageEventContent :: text_plain ( "heyo" ) ,
518
+ ) )
519
+ . set_timeline_prev_batch ( "prev_batch" . to_owned ( ) ) ,
520
+ ) ;
521
+ let response_body = sync_builder. build_json_sync_response ( ) ;
522
+
523
+ mock_sync ( & server, response_body, None ) . await ;
524
+ client. sync_once ( Default :: default ( ) ) . await . unwrap ( ) ;
525
+ server. reset ( ) . await ;
526
+ }
527
+
528
+ let ( room_event_cache, _drop_handles) =
529
+ client. get_room ( room_id) . unwrap ( ) . event_cache ( ) . await . unwrap ( ) ;
530
+
531
+ let ( events, mut room_stream) = room_event_cache. subscribe ( ) . await . unwrap ( ) ;
532
+
533
+ // This is racy: either the initial message has been processed by the event
534
+ // cache (and no room updates will happen in this case), or it hasn't, and
535
+ // the stream will return the next message soon.
536
+ if events. is_empty ( ) {
537
+ let _ = room_stream. recv ( ) . await . expect ( "read error" ) ;
538
+ } else {
539
+ assert_eq ! ( events. len( ) , 1 ) ;
540
+ }
541
+
542
+ let mut num_iterations = 0 ;
543
+ let mut global_events = Vec :: new ( ) ;
544
+ let mut global_reached_start = false ;
545
+
546
+ // The first back-pagination will return these two.
547
+ mock_messages (
548
+ & server,
549
+ "prev_batch" ,
550
+ Some ( "prev_batch2" ) ,
551
+ non_sync_events ! ( event_builder, [ ( room_id, "$2" : "world" ) , ( room_id, "$3" : "hello" ) ] ) ,
552
+ )
553
+ . await ;
554
+
555
+ // The second round of back-pagination will return this one.
556
+ mock_messages (
557
+ & server,
558
+ "prev_batch2" ,
559
+ None ,
560
+ non_sync_events ! ( event_builder, [ ( room_id, "$4" : "oh well" ) , ] ) ,
561
+ )
562
+ . await ;
563
+
564
+ // Then if I backpaginate in a loop,
565
+ let pagination = room_event_cache. pagination ( ) ;
566
+ while pagination. get_or_wait_for_token ( ) . await . is_some ( ) {
567
+ pagination
568
+ . run_backwards ( 20 , |outcome, _timeline_has_been_reset| {
569
+ if !global_reached_start {
570
+ global_reached_start = outcome. reached_start ;
571
+ }
572
+
573
+ global_events. extend ( outcome. events ) ;
574
+
575
+ ready ( if outcome. reached_start {
576
+ ControlFlow :: Break ( ( ) )
577
+ } else {
578
+ ControlFlow :: Continue ( ( ) )
579
+ } )
580
+ } )
581
+ . await
582
+ . unwrap ( ) ;
583
+
584
+ num_iterations += 1 ;
585
+ }
586
+
587
+ // I'll get all the previous events,
588
+ assert_eq ! ( num_iterations, 1 ) ; // in one iteration!
589
+ assert ! ( global_reached_start) ;
590
+
591
+ assert_event_matches_msg ( & global_events[ 0 ] , "world" ) ;
592
+ assert_event_matches_msg ( & global_events[ 1 ] , "hello" ) ;
593
+ assert_event_matches_msg ( & global_events[ 2 ] , "oh well" ) ;
594
+ assert_eq ! ( global_events. len( ) , 3 ) ;
595
+
596
+ // And next time I'll open the room, I'll get the events in the right order.
597
+ let ( events, _receiver) = room_event_cache. subscribe ( ) . await . unwrap ( ) ;
598
+
599
+ assert_event_matches_msg ( & events[ 0 ] , "oh well" ) ;
600
+ assert_event_matches_msg ( & events[ 1 ] , "hello" ) ;
601
+ assert_event_matches_msg ( & events[ 2 ] , "world" ) ;
602
+ assert_event_matches_msg ( & events[ 3 ] , "heyo" ) ;
603
+ assert_eq ! ( events. len( ) , 4 ) ;
604
+
605
+ assert ! ( room_stream. is_empty( ) ) ;
606
+ }
607
+
494
608
#[ async_test]
495
609
async fn test_reset_while_backpaginating ( ) {
496
610
let ( client, server) = logged_in_client_with_server ( ) . await ;
0 commit comments