17
17
use std:: { fmt, time:: Duration } ;
18
18
19
19
use async_channel:: { Receiver , Sender } ;
20
+ use matrix_sdk_common:: executor;
20
21
use ruma:: api:: client:: delayed_events:: DelayParameters ;
21
22
use serde:: de:: { self , Deserialize , Deserializer , Visitor } ;
22
23
use tokio:: sync:: mpsc:: { unbounded_channel, UnboundedSender } ;
@@ -67,6 +68,9 @@ pub struct WidgetDriver {
67
68
///
68
69
/// Only set if a subscription happened ([`Action::Subscribe`]).
69
70
event_forwarding_guard : Option < DropGuard > ,
71
+
72
+ /// JoinHandle for the matrix event subscribe task.
73
+ matrix_subscribe_join_handle : Option < executor:: JoinHandle < ( ) > > ,
70
74
}
71
75
72
76
/// A handle that encapsulates the communication between a widget driver and the
@@ -115,7 +119,13 @@ impl WidgetDriver {
115
119
let ( from_widget_tx, from_widget_rx) = async_channel:: unbounded ( ) ;
116
120
let ( to_widget_tx, to_widget_rx) = async_channel:: unbounded ( ) ;
117
121
118
- let driver = Self { settings, from_widget_rx, to_widget_tx, event_forwarding_guard : None } ;
122
+ let driver = Self {
123
+ settings,
124
+ from_widget_rx,
125
+ to_widget_tx,
126
+ event_forwarding_guard : None ,
127
+ matrix_subscribe_join_handle : None ,
128
+ } ;
119
129
let channels = WidgetDriverHandle { from_widget_tx, to_widget_rx } ;
120
130
121
131
( driver, channels)
@@ -139,17 +149,15 @@ impl WidgetDriver {
139
149
let ( incoming_msg_tx, mut incoming_msg_rx) = unbounded_channel ( ) ;
140
150
141
151
// Forward all of the incoming messages from the widget.
142
- matrix_sdk_common :: executor:: spawn ( {
152
+ let _handle = executor:: spawn ( {
143
153
let incoming_msg_tx = incoming_msg_tx. clone ( ) ;
144
154
let from_widget_rx = self . from_widget_rx . clone ( ) ;
145
155
async move {
146
156
while let Ok ( msg) = from_widget_rx. recv ( ) . await {
147
157
let _ = incoming_msg_tx. send ( IncomingMessage :: WidgetMessage ( msg) ) ;
148
158
}
149
159
}
150
- } )
151
- . await
152
- . map_err ( |_| ( ) ) ?;
160
+ } ) ;
153
161
154
162
// Create widget API machine.
155
163
let ( mut widget_machine, initial_actions) = WidgetMachine :: new (
@@ -178,7 +186,6 @@ impl WidgetDriver {
178
186
. await ?;
179
187
}
180
188
}
181
-
182
189
Ok ( ( ) )
183
190
}
184
191
@@ -256,12 +263,10 @@ impl WidgetDriver {
256
263
( token. child_token ( ) , token. drop_guard ( ) )
257
264
} ;
258
265
259
- self . event_forwarding_guard = Some ( guard) ;
260
-
261
266
let mut matrix = matrix_driver. events ( ) ;
262
267
let incoming_msg_tx = incoming_msg_tx. clone ( ) ;
263
268
264
- matrix_sdk_common :: executor:: spawn ( async move {
269
+ let handle = executor:: spawn ( async move {
265
270
loop {
266
271
tokio:: select! {
267
272
_ = stop_forwarding. cancelled( ) => {
@@ -275,11 +280,15 @@ impl WidgetDriver {
275
280
}
276
281
}
277
282
}
278
- } ) . await . map_err ( |_|( ) ) ?;
283
+ } ) ;
284
+
285
+ self . matrix_subscribe_join_handle = Some ( handle) ;
286
+ self . event_forwarding_guard = Some ( guard) ;
279
287
}
280
288
281
289
Action :: Unsubscribe => {
282
290
self . event_forwarding_guard = None ;
291
+ self . matrix_subscribe_join_handle = None ;
283
292
}
284
293
}
285
294
0 commit comments