Skip to content

Commit 16171b6

Browse files
committed
WidgetDriver: add matrix driver toDevice support (reading and sending events via cs api)
This also hooks up the widget via the machine actions. And adds toDevice events to the subscription.
1 parent 6237f29 commit 16171b6

File tree

2 files changed

+254
-24
lines changed

2 files changed

+254
-24
lines changed

crates/matrix-sdk/src/widget/matrix.rs

Lines changed: 236 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,36 @@
1717
1818
use std::collections::BTreeMap;
1919

20-
use matrix_sdk_base::deserialized_responses::RawAnySyncOrStrippedState;
20+
use futures_util::future::join_all;
21+
use matrix_sdk_base::deserialized_responses::{EncryptionInfo, RawAnySyncOrStrippedState};
22+
use matrix_sdk_common::executor::spawn;
2123
use ruma::{
2224
api::client::{
2325
account::request_openid_token::v3::{Request as OpenIdRequest, Response as OpenIdResponse},
2426
delayed_events::{self, update_delayed_event::unstable::UpdateAction},
2527
filter::RoomEventFilter,
28+
to_device::send_event_to_device::{self, v3::Request as RumaToDeviceRequest},
2629
},
2730
assign,
2831
events::{
2932
AnyMessageLikeEventContent, AnyStateEventContent, AnySyncMessageLikeEvent,
30-
AnySyncStateEvent, AnySyncTimelineEvent, AnyTimelineEvent, MessageLikeEventType,
31-
StateEventType, TimelineEventType,
33+
AnySyncStateEvent, AnySyncTimelineEvent, AnyTimelineEvent, AnyToDeviceEvent,
34+
AnyToDeviceEventContent, MessageLikeEventType, StateEventType, TimelineEventType,
35+
ToDeviceEventType,
3236
},
3337
serde::{from_raw_json_value, Raw},
34-
EventId, RoomId, TransactionId,
38+
to_device::DeviceIdOrAllDevices,
39+
EventId, OwnedUserId, RoomId, TransactionId, UserId,
3540
};
3641
use serde_json::{value::RawValue as RawJsonValue, Value};
3742
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
38-
use tracing::error;
43+
use tracing::{error, info, warn};
3944

4045
use super::{machine::SendEventResponse, StateKeySelector};
41-
use crate::{event_handler::EventHandlerDropGuard, room::MessagesOptions, Error, Result, Room};
46+
use crate::{
47+
encryption::identities::Device, event_handler::EventHandlerDropGuard, room::MessagesOptions,
48+
Client, Error, Result, Room,
49+
};
4250

4351
/// Thin wrapper around a [`Room`] that provides functionality relevant for
4452
/// widgets.
@@ -86,7 +94,11 @@ impl MatrixDriver {
8694
) -> Result<Vec<Raw<AnyTimelineEvent>>> {
8795
let room_id = self.room.room_id();
8896
let convert = |sync_or_stripped_state| match sync_or_stripped_state {
89-
RawAnySyncOrStrippedState::Sync(ev) => Some(attach_room_id(ev.cast_ref(), room_id)),
97+
RawAnySyncOrStrippedState::Sync(ev) => with_attached_room_id(ev.cast_ref(), room_id)
98+
.map_err(|e| {
99+
error!("failed to convert event from `get_state_event` response:{}", e)
100+
})
101+
.ok(),
90102
RawAnySyncOrStrippedState::Stripped(_) => {
91103
error!("MatrixDriver can't operate in invited rooms");
92104
None
@@ -181,7 +193,7 @@ impl MatrixDriver {
181193

182194
/// Starts forwarding new room events. Once the returned `EventReceiver`
183195
/// is dropped, forwarding will be stopped.
184-
pub(crate) fn events(&self) -> EventReceiver {
196+
pub(crate) fn events(&self) -> EventReceiver<Raw<AnyTimelineEvent>> {
185197
let (tx, rx) = unbounded_channel();
186198
let room_id = self.room.room_id().to_owned();
187199

@@ -190,14 +202,29 @@ impl MatrixDriver {
190202
let _room_id = room_id.clone();
191203
let handle_msg_like =
192204
self.room.add_event_handler(move |raw: Raw<AnySyncMessageLikeEvent>| {
193-
let _ = _tx.send(attach_room_id(raw.cast_ref(), &_room_id));
205+
match with_attached_room_id(raw.cast_ref(), &_room_id) {
206+
Ok(event_with_room_id) => {
207+
let _ = _tx.send(event_with_room_id);
208+
}
209+
Err(e) => {
210+
error!("Failed to attach room id to message like event: {}", e);
211+
}
212+
}
194213
async {}
195214
});
196215
let drop_guard_msg_like = self.room.client().event_handler_drop_guard(handle_msg_like);
197-
216+
let _room_id = room_id;
217+
let _tx = tx;
198218
// Get only all state events from the state section of the sync.
199219
let handle_state = self.room.add_event_handler(move |raw: Raw<AnySyncStateEvent>| {
200-
let _ = tx.send(attach_room_id(raw.cast_ref(), &room_id));
220+
match with_attached_room_id(raw.cast_ref(), &_room_id) {
221+
Ok(event_with_room_id) => {
222+
let _ = _tx.send(event_with_room_id);
223+
}
224+
Err(e) => {
225+
error!("Failed to attach room id to state event: {}", e);
226+
}
227+
}
201228
async {}
202229
});
203230
let drop_guard_state = self.room.client().event_handler_drop_guard(handle_state);
@@ -208,25 +235,213 @@ impl MatrixDriver {
208235
// section of the sync will not be forwarded to the widget.
209236
// TODO annotate the events and send both timeline and state section state
210237
// events.
211-
EventReceiver { rx, _drop_guards: [drop_guard_msg_like, drop_guard_state] }
238+
EventReceiver { rx, _drop_guards: vec![drop_guard_msg_like, drop_guard_state] }
239+
}
240+
241+
/// Starts forwarding new room events. Once the returned `EventReceiver`
242+
/// is dropped, forwarding will be stopped.
243+
pub(crate) fn to_device_events(&self) -> EventReceiver<Raw<AnyToDeviceEvent>> {
244+
let (tx, rx) = unbounded_channel();
245+
246+
let to_device_handle =
247+
self.room.client().add_event_handler(move |raw: Raw<AnyToDeviceEvent>| {
248+
match with_attached_encryption_flag(raw, &encryption_info) {
249+
Ok(ev) => {
250+
let _ = tx.send(ev);
251+
}
252+
Err(e) => {
253+
error!("Failed to attach encryption flag to to_device event: {}", e);
254+
}
255+
}
256+
async {}
257+
});
258+
259+
let drop_guard = self.room.client().event_handler_drop_guard(to_device_handle);
260+
EventReceiver { rx, _drop_guards: vec![drop_guard] }
261+
}
262+
263+
/// It will ignore all devices where errors occurred or where the device is
264+
/// not verified or where th user has a has_verification_violation.
265+
pub(crate) async fn send_to_device(
266+
&self,
267+
event_type: ToDeviceEventType,
268+
encrypted: bool,
269+
messages: BTreeMap<
270+
OwnedUserId,
271+
BTreeMap<DeviceIdOrAllDevices, Raw<AnyToDeviceEventContent>>,
272+
>,
273+
) -> Result<send_event_to_device::v3::Response> {
274+
let client = self.room.client();
275+
/// This encrypts to device content for a collection of devices.
276+
/// It will ignore all devices where errors occurred or where the device
277+
/// is not verified or where th user has a has_verification_violation.
278+
async fn encrypted_content_for_devices(
279+
unencrypted_content: &Raw<AnyToDeviceEventContent>,
280+
devices: Vec<Device>,
281+
event_type: &ToDeviceEventType,
282+
) -> Result<impl Iterator<Item = (DeviceIdOrAllDevices, Raw<AnyToDeviceEventContent>)>>
283+
{
284+
let content: Value =
285+
unencrypted_content.deserialize_as().map_err(Into::<Error>::into)?;
286+
let event_type = event_type.clone();
287+
let device_content_tasks = devices.into_iter().map(|device| spawn({
288+
let event_type = event_type.clone();
289+
let content = content.clone();
290+
291+
async move {
292+
if !device.is_cross_signed_by_owner() {
293+
info!("Device {} is not verified, skipping encryption", device.device_id());
294+
return None;
295+
}
296+
match device
297+
.inner
298+
.encrypt_event_raw(&event_type.to_string(), &content)
299+
.await {
300+
Ok(encrypted) => Some((device.device_id().to_owned().into(), encrypted.cast())),
301+
Err(e) =>{ info!("Failed to encrypt to_device event from widget for device: {} because, {}", device.device_id(), e); None},
302+
}
303+
}
304+
}));
305+
let device_encrypted_content_map =
306+
join_all(device_content_tasks).await.into_iter().flatten().flatten();
307+
Ok(device_encrypted_content_map)
308+
}
309+
310+
/// Convert the device content map for one user into the same content
311+
/// map with encrypted content This needs to flatten the vectors
312+
/// we get from `encrypted_content_for_devices`
313+
/// since one `DeviceIdOrAllDevices` id can be multiple devices.
314+
async fn encrypted_device_content_map(
315+
client: &Client,
316+
user_id: &UserId,
317+
event_type: &ToDeviceEventType,
318+
device_content_map: BTreeMap<DeviceIdOrAllDevices, Raw<AnyToDeviceEventContent>>,
319+
) -> BTreeMap<DeviceIdOrAllDevices, Raw<AnyToDeviceEventContent>> {
320+
let device_map_futures =
321+
device_content_map.into_iter().map(|(device_or_all_id, content)| spawn({
322+
let client = client.clone();
323+
let user_id = user_id.to_owned();
324+
let event_type = event_type.clone();
325+
async move {
326+
let Ok(user_devices) = client.encryption().get_user_devices(&user_id).await else {
327+
warn!("Failed to get user devices for user: {}", user_id);
328+
return None;
329+
};
330+
let Ok(user_identity) = client.encryption().get_user_identity(&user_id).await else{
331+
warn!("Failed to get user identity for user: {}", user_id);
332+
return None;
333+
};
334+
if user_identity.map(|i|i.has_verification_violation()).unwrap_or(false) {
335+
info!("User {} has a verification violation, skipping encryption", user_id);
336+
return None;
337+
}
338+
let devices: Vec<Device> = match device_or_all_id {
339+
DeviceIdOrAllDevices::DeviceId(device_id) => {
340+
vec![user_devices.get(&device_id)].into_iter().flatten().collect()
341+
}
342+
DeviceIdOrAllDevices::AllDevices => user_devices.devices().collect(),
343+
};
344+
encrypted_content_for_devices(
345+
&content,
346+
devices,
347+
&event_type,
348+
)
349+
.await
350+
.map_err(|e| info!("WidgetDriver: could not encrypt content for to device widget event content: {}. because, {}", content.json(), e))
351+
.ok()
352+
}}));
353+
let content_map_iterator = join_all(device_map_futures).await.into_iter();
354+
355+
// The first flatten takes the iterator over Result<Option<impl Iterator<Item =
356+
// (DeviceIdOrAllDevices, Raw<AnyToDeviceEventContent>)>>, JoinError>>
357+
// and flattens the Result (drops Err() items)
358+
// The second takes the iterator over: Option<impl Iterator<Item =
359+
// (DeviceIdOrAllDevices, Raw<AnyToDeviceEventContent>)>>
360+
// and flattens the Option (drops None items)
361+
// The third takes the iterator over iterators: impl Iterator<Item =
362+
// (DeviceIdOrAllDevices, Raw<AnyToDeviceEventContent>)>
363+
// and flattens it to just an iterator over (DeviceIdOrAllDevices,
364+
// Raw<AnyToDeviceEventContent>)
365+
content_map_iterator.flatten().flatten().flatten().collect()
366+
}
367+
368+
let request = if encrypted {
369+
// We first want to get all missing session before we start any to device
370+
// sending!
371+
client.claim_one_time_keys(messages.keys().map(|u| u.as_ref())).await?;
372+
let encrypted_content: BTreeMap<
373+
OwnedUserId,
374+
BTreeMap<DeviceIdOrAllDevices, Raw<AnyToDeviceEventContent>>,
375+
> = join_all(messages.into_iter().map(|(user_id, device_content_map)| {
376+
let event_type = event_type.clone();
377+
async move {
378+
(
379+
user_id.clone(),
380+
encrypted_device_content_map(
381+
&self.room.client(),
382+
&user_id,
383+
&event_type,
384+
device_content_map,
385+
)
386+
.await,
387+
)
388+
}
389+
}))
390+
.await
391+
.into_iter()
392+
.collect();
393+
394+
RumaToDeviceRequest::new_raw(
395+
ToDeviceEventType::RoomEncrypted,
396+
TransactionId::new(),
397+
encrypted_content,
398+
)
399+
} else {
400+
RumaToDeviceRequest::new_raw(event_type, TransactionId::new(), messages)
401+
};
402+
403+
let response = client.send(request).await;
404+
405+
response.map_err(Into::into)
212406
}
213407
}
214408

215409
/// A simple entity that wraps an `UnboundedReceiver`
216410
/// along with the drop guard for the room event handler.
217-
pub(crate) struct EventReceiver {
218-
rx: UnboundedReceiver<Raw<AnyTimelineEvent>>,
219-
_drop_guards: [EventHandlerDropGuard; 2],
411+
pub(crate) struct EventReceiver<E> {
412+
rx: UnboundedReceiver<E>,
413+
_drop_guards: Vec<EventHandlerDropGuard>,
220414
}
221415

222-
impl EventReceiver {
223-
pub(crate) async fn recv(&mut self) -> Option<Raw<AnyTimelineEvent>> {
416+
impl<T> EventReceiver<T> {
417+
pub(crate) async fn recv(&mut self) -> Option<T> {
224418
self.rx.recv().await
225419
}
226420
}
227421

228-
fn attach_room_id(raw_ev: &Raw<AnySyncTimelineEvent>, room_id: &RoomId) -> Raw<AnyTimelineEvent> {
229-
let mut ev_obj = raw_ev.deserialize_as::<BTreeMap<String, Box<RawJsonValue>>>().unwrap();
230-
ev_obj.insert("room_id".to_owned(), serde_json::value::to_raw_value(room_id).unwrap());
231-
Raw::new(&ev_obj).unwrap().cast()
422+
fn with_attached_room_id(
423+
raw: &Raw<AnySyncTimelineEvent>,
424+
room_id: &RoomId,
425+
) -> Result<Raw<AnyTimelineEvent>> {
426+
match raw.deserialize_as::<BTreeMap<String, Box<RawJsonValue>>>() {
427+
Ok(mut ev_mut) => {
428+
ev_mut.insert("room_id".to_owned(), serde_json::value::to_raw_value(room_id)?);
429+
Ok(Raw::new(&ev_mut)?.cast())
430+
}
431+
Err(e) => Err(Error::from(e)),
432+
}
433+
}
434+
435+
fn with_attached_encryption_flag(
436+
raw: Raw<AnyToDeviceEvent>,
437+
encryption_info: &Option<EncryptionInfo>,
438+
) -> Result<Raw<AnyToDeviceEvent>> {
439+
match raw.deserialize_as::<BTreeMap<String, Box<RawJsonValue>>>() {
440+
Ok(mut ev_mut) => {
441+
let encrypted = encryption_info.is_some();
442+
ev_mut.insert("encrypted".to_owned(), serde_json::value::to_raw_value(&encrypted)?);
443+
Ok(Raw::new(&ev_mut)?.cast())
444+
}
445+
Err(e) => Err(Error::from(e)),
446+
}
232447
}

crates/matrix-sdk/src/widget/mod.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,16 @@ impl WidgetDriver {
236236
.await
237237
.map(MatrixDriverResponse::MatrixDelayedEventUpdate),
238238

239-
MatrixDriverRequestData::SendToDeviceEvent(req) => todo!(),
239+
MatrixDriverRequestData::SendToDeviceEvent(send_to_device_request) => {
240+
matrix_driver
241+
.send_to_device(
242+
send_to_device_request.event_type,
243+
send_to_device_request.encrypted,
244+
send_to_device_request.messages,
245+
)
246+
.await
247+
.map(MatrixDriverResponse::MatrixToDeviceSent)
248+
}
240249
};
241250

242251
// Forward the matrix driver response to the incoming message stream.
@@ -258,7 +267,8 @@ impl WidgetDriver {
258267

259268
self.event_forwarding_guard = Some(guard);
260269

261-
let mut matrix = matrix_driver.events();
270+
let mut events_receiver = matrix_driver.events();
271+
let mut to_device_receiver = matrix_driver.to_device_events();
262272
let incoming_msg_tx = incoming_msg_tx.clone();
263273

264274
tokio::spawn(async move {
@@ -269,10 +279,15 @@ impl WidgetDriver {
269279
return;
270280
}
271281

272-
Some(event) = matrix.recv() => {
282+
Some(event) = events_receiver.recv() => {
273283
// Forward all events to the incoming messages stream.
274284
let _ = incoming_msg_tx.send(IncomingMessage::MatrixEventReceived(event));
275285
}
286+
287+
Some(event) = to_device_receiver.recv() => {
288+
// Forward all events to the incoming messages stream.
289+
let _ = incoming_msg_tx.send(IncomingMessage::ToDeviceReceived(event));
290+
}
276291
}
277292
}
278293
});

0 commit comments

Comments
 (0)