14
14
15
15
use std:: { sync:: Arc , time:: Duration } ;
16
16
17
- use anyhow:: { Ok , Result } ;
17
+ use anyhow:: Result ;
18
18
use assert_matches:: assert_matches;
19
19
use assert_matches2:: assert_let;
20
20
use assign:: assign;
@@ -35,16 +35,34 @@ use tokio::time::timeout;
35
35
36
36
use crate :: helpers:: TestClientBuilder ;
37
37
38
+ /// Sync until we receive an update for a room.
39
+ ///
40
+ /// Beware: it may sync forever, if it doesn't receive such an update!
41
+ async fn sync_until_update_for_room ( client : & Client , room_id : & RoomId ) -> Result < ( ) > {
42
+ client
43
+ . sync_with_callback ( SyncSettings :: default ( ) , |response| async move {
44
+ if response. rooms . join . iter ( ) . any ( |( id, _) | id == room_id) {
45
+ LoopCtrl :: Break
46
+ } else {
47
+ LoopCtrl :: Continue
48
+ }
49
+ } )
50
+ . await ?;
51
+ Ok ( ( ) )
52
+ }
53
+
38
54
#[ tokio:: test( flavor = "multi_thread" , worker_threads = 4 ) ]
39
55
async fn test_toggling_reaction ( ) -> Result < ( ) > {
40
56
// Set up sync for user Alice, and create a room.
41
57
let alice = TestClientBuilder :: new ( "alice" . to_owned ( ) ) . use_sqlite ( ) . build ( ) . await ?;
58
+
42
59
let user_id = alice. user_id ( ) . unwrap ( ) . to_owned ( ) ;
43
60
let room = alice
44
61
. create_room ( assign ! ( CreateRoomRequest :: new( ) , {
45
62
is_direct: true ,
46
63
} ) )
47
64
. await ?;
65
+
48
66
let room_id = room. room_id ( ) ;
49
67
50
68
// Create a timeline for this room.
@@ -57,12 +75,25 @@ async fn test_toggling_reaction() -> Result<()> {
57
75
// Sync until the remote echo arrives.
58
76
let mut num_attempts = 0 ;
59
77
let event_id = loop {
60
- sync_room ( & alice, room_id) . await ?;
78
+ // We expect a quick update from sync, so timeout after 10 seconds, and ignore
79
+ // timeouts; they might just indicate we received the necessary
80
+ // information on the previous attempt, but racily tried to read the
81
+ // timeline items.
82
+ if let Ok ( sync_result) =
83
+ timeout ( Duration :: from_secs ( 10 ) , sync_until_update_for_room ( & alice, room_id) ) . await
84
+ {
85
+ sync_result?;
86
+ }
87
+
88
+ // Let time to the timeline for processing the update, at most 1 second.
89
+ let _ = timeout ( Duration :: from_secs ( 1 ) , stream. next ( ) ) . await ;
90
+
61
91
let items = timeline. items ( ) . await ;
62
92
let last_item = items. last ( ) . unwrap ( ) . as_event ( ) . unwrap ( ) ;
63
93
if !last_item. is_local_echo ( ) {
64
94
break last_item. event_id ( ) . unwrap ( ) . to_owned ( ) ;
65
95
}
96
+
66
97
if num_attempts == 2 {
67
98
panic ! ( "had 3 sync responses and no echo of our own event" ) ;
68
99
}
@@ -181,19 +212,6 @@ async fn assert_remote_added(
181
212
assert ! ( reaction. unwrap( ) . timestamp <= MilliSecondsSinceUnixEpoch :: now( ) ) ;
182
213
}
183
214
184
- async fn sync_room ( client : & Client , room_id : & RoomId ) -> Result < ( ) > {
185
- client
186
- . sync_with_callback ( SyncSettings :: default ( ) , |response| async move {
187
- if response. rooms . join . iter ( ) . any ( |( id, _) | id == room_id) {
188
- LoopCtrl :: Break
189
- } else {
190
- LoopCtrl :: Continue
191
- }
192
- } )
193
- . await ?;
194
- Ok ( ( ) )
195
- }
196
-
197
215
async fn assert_event_is_updated (
198
216
stream : & mut ( impl Stream < Item = VectorDiff < Arc < TimelineItem > > > + Unpin ) ,
199
217
event_id : & EventId ,
0 commit comments