Skip to content
26 changes: 18 additions & 8 deletions src/mimefactory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use std::io::Cursor;

use anyhow::{Context as _, Result, bail, ensure};
use base64::Engine as _;
use data_encoding::BASE32_NOPAD;
use deltachat_contact_tools::sanitize_bidi_characters;
use iroh_gossip::proto::TopicId;
use mail_builder::headers::HeaderType;
use mail_builder::headers::address::{Address, EmailAddress};
use mail_builder::mime::MimePart;
Expand All @@ -21,14 +23,14 @@ use crate::contact::{Contact, ContactId, Origin};
use crate::context::Context;
use crate::e2ee::EncryptHelper;
use crate::ephemeral::Timer as EphemeralTimer;
use crate::key::self_fingerprint;
use crate::key::{DcKey, SignedPublicKey};
use crate::headerdef::HeaderDef;
use crate::key::{DcKey, SignedPublicKey, self_fingerprint};
use crate::location;
use crate::log::{info, warn};
use crate::message::{Message, MsgId, Viewtype};
use crate::mimeparser::{SystemMessage, is_hidden};
use crate::param::Param;
use crate::peer_channels::create_iroh_header;
use crate::peer_channels::{create_iroh_header, get_iroh_topic_for_msg};
use crate::simplify::escape_message_footer_marks;
use crate::stock_str;
use crate::tools::{
Expand Down Expand Up @@ -139,6 +141,9 @@ pub struct MimeFactory {

/// True if the avatar should be attached.
pub attach_selfavatar: bool,

/// This field is used to sustain the topic id of webxdcs needed for peer channels.
webxdc_topic: Option<TopicId>,
}

/// Result of rendering a message, ready to be submitted to a send job.
Expand Down Expand Up @@ -449,7 +454,7 @@ impl MimeFactory {
member_timestamps.is_empty()
|| to.len() + past_members.len() == member_timestamps.len()
);

let webxdc_topic = get_iroh_topic_for_msg(context, msg.id).await?;
let factory = MimeFactory {
from_addr,
from_displayname,
Expand All @@ -469,6 +474,7 @@ impl MimeFactory {
last_added_location_id: None,
sync_ids_to_delete: None,
attach_selfavatar,
webxdc_topic,
};
Ok(factory)
}
Expand Down Expand Up @@ -516,6 +522,7 @@ impl MimeFactory {
last_added_location_id: None,
sync_ids_to_delete: None,
attach_selfavatar: false,
webxdc_topic: None,
};

Ok(res)
Expand Down Expand Up @@ -1492,7 +1499,7 @@ impl MimeFactory {
}
SystemMessage::IrohNodeAddr => {
headers.push((
"Iroh-Node-Addr",
HeaderDef::IrohNodeAddr.into(),
mail_builder::headers::text::Text::new(serde_json::to_string(
&context
.get_or_try_init_peer_channel()
Expand Down Expand Up @@ -1673,10 +1680,13 @@ impl MimeFactory {
let json = msg.param.get(Param::Arg).unwrap_or_default();
parts.push(context.build_status_update_part(json));
} else if msg.viewtype == Viewtype::Webxdc {
let topic = self
.webxdc_topic
.map(|top| BASE32_NOPAD.encode(top.as_bytes()).to_ascii_lowercase())
.unwrap_or(create_iroh_header(context, msg.id).await?);
headers.push((
"Iroh-Gossip-Topic",
mail_builder::headers::raw::Raw::new(create_iroh_header(context, msg.id).await?)
.into(),
HeaderDef::IrohGossipTopic.get_headername(),
mail_builder::headers::raw::Raw::new(topic).into(),
));
if let (Some(json), _) = context
.render_webxdc_status_update_object(
Expand Down
132 changes: 107 additions & 25 deletions src/peer_channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ use crate::mimeparser::SystemMessage;
const PUBLIC_KEY_LENGTH: usize = 32;
const PUBLIC_KEY_STUB: &[u8] = "static_string".as_bytes();

/// Store iroh peer channels for the context.
/// Store Iroh peer channels for the context.
#[derive(Debug)]
pub struct Iroh {
/// iroh router needed for iroh peer channels.
/// Iroh router needed for Iroh peer channels.
pub(crate) router: iroh::protocol::Router,

/// [Gossip] needed for iroh peer channels.
/// [Gossip] needed for Iroh peer channels.
pub(crate) gossip: Gossip,

/// Sequence numbers for gossip channels.
Expand Down Expand Up @@ -109,7 +109,7 @@ impl Iroh {

info!(
ctx,
"IROH_REALTIME: Joining gossip with peers: {:?}", node_ids,
"IROH_REALTIME: Joining gossip {topic} with peers: {:?}.", node_ids,
);

// Inform iroh of potentially new node addresses
Expand Down Expand Up @@ -138,17 +138,11 @@ impl Iroh {
Ok(Some(join_rx))
}

/// Add gossip peers to realtime channel if it is already active.
pub async fn maybe_add_gossip_peers(&self, topic: TopicId, peers: Vec<NodeAddr>) -> Result<()> {
/// Add gossip peer to realtime channel if it is already active.
pub async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> {
if self.iroh_channels.read().await.get(&topic).is_some() {
for peer in &peers {
self.router.endpoint().add_node_addr(peer.clone())?;
}

self.gossip.subscribe_with_opts(
topic,
JoinOptions::with_bootstrap(peers.into_iter().map(|peer| peer.node_id)),
);
self.router.endpoint().add_node_addr(peer.clone())?;
self.gossip.subscribe(topic, vec![peer.node_id])?;
}
Ok(())
}
Expand Down Expand Up @@ -316,6 +310,17 @@ impl Context {
}
}
}

pub(crate) async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> {
if let Some(iroh) = &*self.iroh.read().await {
info!(
self,
"Adding (maybe existing) peer with id {} to {topic}.", peer.node_id
);
iroh.maybe_add_gossip_peer(topic, peer).await?;
}
Ok(())
}
}

/// Cache a peers [NodeId] for one topic.
Expand Down Expand Up @@ -348,12 +353,13 @@ pub async fn add_gossip_peer_from_header(
return Ok(());
}

let node_addr =
serde_json::from_str::<NodeAddr>(node_addr).context("Failed to parse node address")?;

info!(
context,
"Adding iroh peer with address {node_addr:?} to the topic of {instance_id}."
"Adding iroh peer with node id {} to the topic of {instance_id}.", node_addr.node_id
);
let node_addr =
serde_json::from_str::<NodeAddr>(node_addr).context("Failed to parse node address")?;

context.emit_event(EventType::WebxdcRealtimeAdvertisementReceived {
msg_id: instance_id,
Expand All @@ -371,8 +377,7 @@ pub async fn add_gossip_peer_from_header(
let relay_server = node_addr.relay_url().map(|relay| relay.as_str());
iroh_add_peer_for_topic(context, instance_id, topic, node_id, relay_server).await?;

let iroh = context.get_or_try_init_peer_channel().await?;
iroh.maybe_add_gossip_peers(topic, vec![node_addr]).await?;
context.maybe_add_gossip_peer(topic, node_addr).await?;
Ok(())
}

Expand Down Expand Up @@ -555,9 +560,9 @@ mod tests {
use super::*;
use crate::{
EventType,
chat::send_msg,
chat::{self, ChatId, ProtectionStatus, add_contact_to_chat, resend_msgs, send_msg},
message::{Message, Viewtype},
test_utils::TestContextManager,
test_utils::{TestContext, TestContextManager},
};

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
Expand Down Expand Up @@ -924,8 +929,30 @@ mod tests {
let alice = &mut tcm.alice().await;
let bob = &mut tcm.bob().await;

let chat = alice.create_chat(bob).await.id;

let mut instance = Message::new(Viewtype::File);
instance
.set_file_from_bytes(
alice,
"minimal.xdc",
include_bytes!("../test-data/webxdc/minimal.xdc"),
None,
)
.unwrap();
connect_alice_bob(alice, chat, &mut instance, bob).await
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_webxdc_resend() {
let mut tcm = TestContextManager::new();
let alice = &mut tcm.alice().await;
let bob = &mut tcm.bob().await;
let group = chat::create_group_chat(alice, ProtectionStatus::Unprotected, "group chat")
.await
.unwrap();

// Alice sends webxdc to bob
let alice_chat = alice.create_chat(bob).await;
let mut instance = Message::new(Viewtype::File);
instance
.set_file_from_bytes(
Expand All @@ -935,7 +962,60 @@ mod tests {
None,
)
.unwrap();
send_msg(alice, alice_chat.id, &mut instance).await.unwrap();

add_contact_to_chat(alice, group, alice.add_or_lookup_contact_id(bob).await)
.await
.unwrap();

connect_alice_bob(alice, group, &mut instance, bob).await;

// fiona joins late
let fiona = &mut tcm.fiona().await;

add_contact_to_chat(alice, group, alice.add_or_lookup_contact_id(fiona).await)
.await
.unwrap();

resend_msgs(alice, &[instance.id]).await.unwrap();
let msg = alice.pop_sent_msg().await;
let fiona_instance = fiona.recv_msg(&msg).await;
fiona_instance.chat_id.accept(fiona).await.unwrap();
assert!(fiona.ctx.iroh.read().await.is_none());

let fiona_connect_future = send_webxdc_realtime_advertisement(fiona, fiona_instance.id)
.await
.unwrap()
.unwrap();
let fiona_advert = fiona.pop_sent_msg().await;
alice.recv_msg_trash(&fiona_advert).await;

fiona_connect_future.await.unwrap();
send_webxdc_realtime_data(alice, instance.id, b"alice -> bob & fiona".into())
.await
.unwrap();

loop {
let event = fiona.evtracker.recv().await.unwrap();
if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
if data == b"alice -> bob & fiona" {
break;
} else {
panic!(
"Unexpected status update: {}",
String::from_utf8_lossy(&data)
);
}
}
}
}

async fn connect_alice_bob(
alice: &mut TestContext,
alice_chat_id: ChatId,
instance: &mut Message,
bob: &mut TestContext,
) {
send_msg(alice, alice_chat_id, instance).await.unwrap();
let alice_webxdc = alice.get_last_msg().await;

let webxdc = alice.pop_sent_msg().await;
Expand All @@ -952,17 +1032,19 @@ mod tests {
.unwrap();
let alice_advertisement = alice.pop_sent_msg().await;

send_webxdc_realtime_advertisement(bob, bob_webxdc.id)
let bob_advertisement_future = send_webxdc_realtime_advertisement(bob, bob_webxdc.id)
.await
.unwrap()
.unwrap();
let bob_advertisement = bob.pop_sent_msg().await;

eprintln!("Receiving advertisements");
bob.recv_msg_trash(&alice_advertisement).await;
alice.recv_msg_trash(&bob_advertisement).await;

eprintln!("Alice waits for connection");
eprintln!("Alice and Bob wait for connection");
alice_advertisement_future.await.unwrap();
bob_advertisement_future.await.unwrap();

// Alice sends ephemeral message
eprintln!("Sending ephemeral message");
Expand Down
2 changes: 1 addition & 1 deletion src/receive_imf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2141,7 +2141,7 @@ RETURNING id
created_db_entries.push(row_id);
}

// check all parts whether they contain a new logging webxdc
// Maybe set logging xdc and add gossip topics for webxdcs.
for (part, msg_id) in mime_parser.parts.iter().zip(&created_db_entries) {
// check if any part contains a webxdc topic id
if part.typ == Viewtype::Webxdc {
Expand Down