Skip to content

Commit b643b68

Browse files
committed
WidgetDriver: add matrix driver toDevice support (reading and sending events via cs api)
This also hooks up the widget via the machine actions. And adds toDevice events to the subscription.
1 parent 9f84d9a commit b643b68

File tree

3 files changed

+137
-23
lines changed

3 files changed

+137
-23
lines changed

crates/matrix-sdk/src/widget/machine/to_widget.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ impl ToWidgetRequest for NotifyNewMatrixEvent {
128128
#[derive(Deserialize)]
129129
pub(crate) struct Empty {}
130130

131-
/// Notify the widget that we received a new matrix event.
131+
/// Notify the widget that we received a new matrix to device event.
132132
/// This is a "response" to the widget subscribing to the events in the room.
133133
#[derive(Serialize)]
134134
#[serde(transparent)]

crates/matrix-sdk/src/widget/matrix.rs

Lines changed: 118 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,24 @@
1717
1818
use std::collections::BTreeMap;
1919

20-
use matrix_sdk_base::deserialized_responses::RawAnySyncOrStrippedState;
20+
use matrix_sdk_base::deserialized_responses::{EncryptionInfo, RawAnySyncOrStrippedState};
2121
use ruma::{
2222
api::client::{
2323
account::request_openid_token::v3::{Request as OpenIdRequest, Response as OpenIdResponse},
2424
delayed_events::{self, update_delayed_event::unstable::UpdateAction},
2525
filter::RoomEventFilter,
26+
to_device::send_event_to_device::{self, v3::Request as RumaToDeviceRequest},
2627
},
2728
assign,
2829
events::{
2930
AnyMessageLikeEventContent, AnyStateEventContent, AnySyncMessageLikeEvent,
30-
AnySyncStateEvent, AnySyncTimelineEvent, AnyTimelineEvent, MessageLikeEventType,
31-
StateEventType, TimelineEventType,
31+
AnySyncStateEvent, AnySyncTimelineEvent, AnyTimelineEvent, AnyToDeviceEvent,
32+
AnyToDeviceEventContent, MessageLikeEventType, StateEventType, TimelineEventType,
33+
ToDeviceEventType,
3234
},
3335
serde::{from_raw_json_value, Raw},
34-
EventId, RoomId, TransactionId,
36+
to_device::DeviceIdOrAllDevices,
37+
EventId, OwnedUserId, RoomId, TransactionId,
3538
};
3639
use serde_json::{value::RawValue as RawJsonValue, Value};
3740
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
@@ -86,7 +89,11 @@ impl MatrixDriver {
8689
) -> Result<Vec<Raw<AnyTimelineEvent>>> {
8790
let room_id = self.room.room_id();
8891
let convert = |sync_or_stripped_state| match sync_or_stripped_state {
89-
RawAnySyncOrStrippedState::Sync(ev) => Some(attach_room_id(ev.cast_ref(), room_id)),
92+
RawAnySyncOrStrippedState::Sync(ev) => with_attached_room_id(ev.cast_ref(), room_id)
93+
.map_err(|e| {
94+
error!("failed to convert event from `get_state_event` response:{}", e)
95+
})
96+
.ok(),
9097
RawAnySyncOrStrippedState::Stripped(_) => {
9198
error!("MatrixDriver can't operate in invited rooms");
9299
None
@@ -181,7 +188,7 @@ impl MatrixDriver {
181188

182189
/// Starts forwarding new room events. Once the returned `EventReceiver`
183190
/// is dropped, forwarding will be stopped.
184-
pub(crate) fn events(&self) -> EventReceiver {
191+
pub(crate) fn events(&self) -> EventReceiver<Raw<AnyTimelineEvent>> {
185192
let (tx, rx) = unbounded_channel();
186193
let room_id = self.room.room_id().to_owned();
187194

@@ -190,14 +197,29 @@ impl MatrixDriver {
190197
let _room_id = room_id.clone();
191198
let handle_msg_like =
192199
self.room.add_event_handler(move |raw: Raw<AnySyncMessageLikeEvent>| {
193-
let _ = _tx.send(attach_room_id(raw.cast_ref(), &_room_id));
200+
match with_attached_room_id(raw.cast_ref(), &_room_id) {
201+
Ok(event_with_room_id) => {
202+
let _ = _tx.send(event_with_room_id);
203+
}
204+
Err(e) => {
205+
error!("Failed to attach room id to message like event: {}", e);
206+
}
207+
}
194208
async {}
195209
});
196210
let drop_guard_msg_like = self.room.client().event_handler_drop_guard(handle_msg_like);
197-
211+
let _room_id = room_id;
212+
let _tx = tx;
198213
// Get only all state events from the state section of the sync.
199214
let handle_state = self.room.add_event_handler(move |raw: Raw<AnySyncStateEvent>| {
200-
let _ = tx.send(attach_room_id(raw.cast_ref(), &room_id));
215+
match with_attached_room_id(raw.cast_ref(), &_room_id) {
216+
Ok(event_with_room_id) => {
217+
let _ = _tx.send(event_with_room_id);
218+
}
219+
Err(e) => {
220+
error!("Failed to attach room id to state event: {}", e);
221+
}
222+
}
201223
async {}
202224
});
203225
let drop_guard_state = self.room.client().event_handler_drop_guard(handle_state);
@@ -208,25 +230,102 @@ impl MatrixDriver {
208230
// section of the sync will not be forwarded to the widget.
209231
// TODO annotate the events and send both timeline and state section state
210232
// events.
211-
EventReceiver { rx, _drop_guards: [drop_guard_msg_like, drop_guard_state] }
233+
EventReceiver { rx, _drop_guards: vec![drop_guard_msg_like, drop_guard_state] }
234+
}
235+
236+
/// Starts forwarding new room events. Once the returned `EventReceiver`
237+
/// is dropped, forwarding will be stopped.
238+
pub(crate) fn to_device_events(&self) -> EventReceiver<Raw<AnyToDeviceEvent>> {
239+
let (tx, rx) = unbounded_channel();
240+
241+
let to_device_handle = self.room.client().add_event_handler(
242+
move |raw: Raw<AnyToDeviceEvent>, encryption_info: Option<EncryptionInfo>| {
243+
match with_attached_encryption_flag(raw, &encryption_info) {
244+
Ok(ev) => {
245+
let _ = tx.send(ev);
246+
}
247+
Err(e) => {
248+
error!("Failed to attach encryption flag to to_device event: {}", e);
249+
}
250+
}
251+
async {}
252+
},
253+
);
254+
255+
let drop_guard = self.room.client().event_handler_drop_guard(to_device_handle);
256+
EventReceiver { rx, _drop_guards: vec![drop_guard] }
257+
}
258+
259+
/// It will ignore all devices where errors occurred or where the device is
260+
/// not verified or where th user has a has_verification_violation.
261+
pub(crate) async fn send_to_device(
262+
&self,
263+
event_type: ToDeviceEventType,
264+
encrypted: bool,
265+
messages: BTreeMap<
266+
OwnedUserId,
267+
BTreeMap<DeviceIdOrAllDevices, Raw<AnyToDeviceEventContent>>,
268+
>,
269+
) -> Result<send_event_to_device::v3::Response> {
270+
let client = self.room.client();
271+
272+
let request = if encrypted {
273+
return Err(Error::UnknownError(
274+
"Sending encrypted to_device events is not supported by the widget driver.".into(),
275+
));
276+
} else {
277+
RumaToDeviceRequest::new_raw(event_type, TransactionId::new(), messages)
278+
};
279+
280+
let response = client.send(request).await;
281+
282+
response.map_err(Into::into)
212283
}
213284
}
214285

215286
/// A simple entity that wraps an `UnboundedReceiver`
216287
/// along with the drop guard for the room event handler.
217-
pub(crate) struct EventReceiver {
218-
rx: UnboundedReceiver<Raw<AnyTimelineEvent>>,
219-
_drop_guards: [EventHandlerDropGuard; 2],
288+
pub(crate) struct EventReceiver<E> {
289+
rx: UnboundedReceiver<E>,
290+
_drop_guards: Vec<EventHandlerDropGuard>,
220291
}
221292

222-
impl EventReceiver {
223-
pub(crate) async fn recv(&mut self) -> Option<Raw<AnyTimelineEvent>> {
293+
impl<T> EventReceiver<T> {
294+
pub(crate) async fn recv(&mut self) -> Option<T> {
224295
self.rx.recv().await
225296
}
226297
}
227298

228-
fn attach_room_id(raw_ev: &Raw<AnySyncTimelineEvent>, room_id: &RoomId) -> Raw<AnyTimelineEvent> {
229-
let mut ev_obj = raw_ev.deserialize_as::<BTreeMap<String, Box<RawJsonValue>>>().unwrap();
230-
ev_obj.insert("room_id".to_owned(), serde_json::value::to_raw_value(room_id).unwrap());
231-
Raw::new(&ev_obj).unwrap().cast()
299+
/// Attach a room id to the event. This is needed because the widget API
300+
/// requires the room id to be present in the event.
301+
302+
fn with_attached_room_id(
303+
raw: &Raw<AnySyncTimelineEvent>,
304+
room_id: &RoomId,
305+
) -> Result<Raw<AnyTimelineEvent>> {
306+
// This is the only modification we need to do to the events otherwise they are
307+
// just forwarded raw to the widget.
308+
// This is why we do the serialization dance here to allow the optimization of
309+
// using `BTreeMap<String, Box<RawJsonValue>` instead of serializing the full event.
310+
match raw.deserialize_as::<BTreeMap<String, Box<RawJsonValue>>>() {
311+
Ok(mut ev_mut) => {
312+
ev_mut.insert("room_id".to_owned(), serde_json::value::to_raw_value(room_id)?);
313+
Ok(Raw::new(&ev_mut)?.cast())
314+
}
315+
Err(e) => Err(Error::from(e)),
316+
}
317+
}
318+
319+
fn with_attached_encryption_flag(
320+
raw: Raw<AnyToDeviceEvent>,
321+
encryption_info: &Option<EncryptionInfo>,
322+
) -> Result<Raw<AnyToDeviceEvent>> {
323+
match raw.deserialize_as::<BTreeMap<String, Box<RawJsonValue>>>() {
324+
Ok(mut ev_mut) => {
325+
let encrypted = encryption_info.is_some();
326+
ev_mut.insert("encrypted".to_owned(), serde_json::value::to_raw_value(&encrypted)?);
327+
Ok(Raw::new(&ev_mut)?.cast())
328+
}
329+
Err(e) => Err(Error::from(e)),
330+
}
232331
}

crates/matrix-sdk/src/widget/mod.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,16 @@ impl WidgetDriver {
237237
.await
238238
.map(MatrixDriverResponse::MatrixDelayedEventUpdate),
239239

240-
MatrixDriverRequestData::SendToDeviceEvent(req) => todo!(),
240+
MatrixDriverRequestData::SendToDeviceEvent(send_to_device_request) => {
241+
matrix_driver
242+
.send_to_device(
243+
send_to_device_request.event_type.into(),
244+
send_to_device_request.encrypted,
245+
send_to_device_request.messages,
246+
)
247+
.await
248+
.map(MatrixDriverResponse::MatrixToDeviceSent)
249+
}
241250
};
242251

243252
// Forward the matrix driver response to the incoming message stream.
@@ -259,7 +268,8 @@ impl WidgetDriver {
259268

260269
self.event_forwarding_guard = Some(guard);
261270

262-
let mut matrix = matrix_driver.events();
271+
let mut events_receiver = matrix_driver.events();
272+
let mut to_device_receiver = matrix_driver.to_device_events();
263273
let incoming_msg_tx = incoming_msg_tx.clone();
264274

265275
spawn(async move {
@@ -270,10 +280,15 @@ impl WidgetDriver {
270280
return;
271281
}
272282

273-
Some(event) = matrix.recv() => {
283+
Some(event) = events_receiver.recv() => {
274284
// Forward all events to the incoming messages stream.
275285
let _ = incoming_msg_tx.send(IncomingMessage::MatrixEventReceived(event));
276286
}
287+
288+
Some(event) = to_device_receiver.recv() => {
289+
// Forward all events to the incoming messages stream.
290+
let _ = incoming_msg_tx.send(IncomingMessage::ToDeviceReceived(event));
291+
}
277292
}
278293
}
279294
});

0 commit comments

Comments
 (0)