Skip to content
This repository was archived by the owner on Feb 3, 2025. It is now read-only.

Commit 900a185

Browse files
committed
Send v2 payjoin
Persist sessions to poll and pending transactions to wallet as in receiving v2 payjoin.
1 parent 4573358 commit 900a185

File tree

2 files changed

+202
-54
lines changed

2 files changed

+202
-54
lines changed

Diff for: mutiny-core/src/nodemanager.rs

+136-46
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::auth::MutinyAuthClient;
22
use crate::labels::LabelStorage;
33
use crate::ldkstorage::CHANNEL_CLOSURE_PREFIX;
44
use crate::logging::LOGGING_KEY;
5-
use crate::payjoin::{Error as PayjoinError, PayjoinStorage, RecvSession};
5+
use crate::payjoin::{random_ohttp_relay, Error as PayjoinError, PayjoinStorage, RecvSession};
66
use crate::utils::{sleep, spawn};
77
use crate::MutinyInvoice;
88
use crate::MutinyWalletConfig;
@@ -56,7 +56,6 @@ use reqwest::Client;
5656
use serde::{Deserialize, Serialize};
5757
use serde_json::Value;
5858
use std::cmp::max;
59-
use std::io::Cursor;
6059
use std::str::FromStr;
6160
use std::sync::atomic::{AtomicBool, Ordering};
6261
#[cfg(not(target_arch = "wasm32"))]
@@ -580,10 +579,14 @@ impl<S: MutinyStorage> NodeManager<S> {
580579

581580
/// Starts a background task to poll payjoin sessions to attempt receiving.
582581
pub(crate) fn resume_payjoins(nm: Arc<NodeManager<S>>) {
583-
let all = nm.storage.list_recv_sessions().unwrap_or_default();
584-
for payjoin in all {
582+
let receives = nm.storage.list_recv_sessions().unwrap_or_default();
583+
for payjoin in receives {
585584
nm.clone().spawn_payjoin_receiver(payjoin);
586585
}
586+
let sends = nm.storage.list_send_sessions().unwrap_or_default();
587+
for payjoin in sends {
588+
nm.clone().spawn_payjoin_sender(payjoin);
589+
}
587590
}
588591

589592
/// Creates a background process that will sync the wallet with the blockchain.
@@ -678,7 +681,7 @@ impl<S: MutinyStorage> NodeManager<S> {
678681
pub async fn start_payjoin_session(
679682
&self,
680683
) -> Result<(Enrolled, payjoin::OhttpKeys), PayjoinError> {
681-
use crate::payjoin::{fetch_ohttp_keys, random_ohttp_relay, PAYJOIN_DIR};
684+
use crate::payjoin::{fetch_ohttp_keys, PAYJOIN_DIR};
682685

683686
log_info!(self.logger, "Starting payjoin session");
684687

@@ -704,7 +707,7 @@ impl<S: MutinyStorage> NodeManager<S> {
704707
))
705708
}
706709

707-
// Send v1 payjoin request
710+
// Send v2 payjoin request
708711
pub async fn send_payjoin(
709712
&self,
710713
uri: Uri<'_, NetworkUnchecked>,
@@ -717,64 +720,151 @@ impl<S: MutinyStorage> NodeManager<S> {
717720
.map_err(|_| MutinyError::IncorrectNetwork)?;
718721
let address = uri.address.clone();
719722
let original_psbt = self.wallet.create_signed_psbt(address, amount, fee_rate)?;
723+
// Track this transaction in the wallet so it shows as an ActivityItem in UI.
724+
// We'll cancel it if and when this original_psbt fallback is replaced with a received payjoin.
725+
self.wallet
726+
.insert_tx(
727+
original_psbt.clone().extract_tx(),
728+
ConfirmationTime::unconfirmed(crate::utils::now().as_secs()),
729+
None,
730+
)
731+
.await?;
732+
720733
let fee_rate = if let Some(rate) = fee_rate {
721734
FeeRate::from_sat_per_vb(rate)
722735
} else {
723736
let sat_per_kwu = self.fee_estimator.get_normal_fee_rate();
724737
FeeRate::from_sat_per_kwu(sat_per_kwu as f32)
725738
};
726739
let fee_rate = payjoin::bitcoin::FeeRate::from_sat_per_kwu(fee_rate.sat_per_kwu() as u64);
727-
let original_psbt = payjoin::bitcoin::psbt::PartiallySignedTransaction::from_str(
728-
&original_psbt.to_string(),
729-
)
730-
.map_err(|_| MutinyError::WalletOperationFailed)?;
731740
log_debug!(self.logger, "Creating payjoin request");
732-
let (req, ctx) =
733-
payjoin::send::RequestBuilder::from_psbt_and_uri(original_psbt.clone(), uri)
734-
.unwrap()
735-
.build_recommended(fee_rate)
736-
.map_err(|_| MutinyError::PayjoinCreateRequest)?
737-
.extract_v1()?;
738-
739-
let client = Client::builder()
740-
.build()
741-
.map_err(|e| MutinyError::Other(e.into()))?;
741+
let req_ctx = payjoin::send::RequestBuilder::from_psbt_and_uri(original_psbt.clone(), uri)
742+
.map_err(|_| MutinyError::PayjoinCreateRequest)?
743+
.build_recommended(fee_rate)
744+
.map_err(|_| MutinyError::PayjoinConfigError)?;
745+
let session = self.storage.store_new_send_session(
746+
labels.clone(),
747+
original_psbt.clone(),
748+
req_ctx.clone(),
749+
)?;
750+
self.spawn_payjoin_sender(session);
751+
Ok(original_psbt.extract_tx().txid())
752+
}
742753

743-
log_debug!(self.logger, "Sending payjoin request");
744-
let res = client
745-
.post(req.url)
746-
.body(req.body)
747-
.header("Content-Type", "text/plain")
748-
.send()
754+
fn spawn_payjoin_sender(&self, session: crate::payjoin::SendSession) {
755+
let wallet = self.wallet.clone();
756+
let logger = self.logger.clone();
757+
let stop = self.stop.clone();
758+
let storage = Arc::new(self.storage.clone());
759+
utils::spawn(async move {
760+
let proposal_psbt = match Self::poll_payjoin_sender(
761+
stop,
762+
wallet.clone(),
763+
storage.clone(),
764+
session.clone(),
765+
)
749766
.await
750-
.map_err(|_| MutinyError::PayjoinCreateRequest)?
751-
.bytes()
767+
{
768+
Ok(psbt) => psbt,
769+
Err(e) => {
770+
// self.wallet cancel_tx
771+
log_error!(logger, "Error polling payjoin sender: {e}");
772+
return;
773+
}
774+
};
775+
776+
let session_clone = session.clone();
777+
match Self::handle_proposal_psbt(
778+
logger.clone(),
779+
wallet,
780+
session_clone.original_psbt,
781+
proposal_psbt,
782+
session_clone.labels,
783+
)
752784
.await
753-
.map_err(|_| MutinyError::PayjoinCreateRequest)?;
785+
{
786+
// Ensure ResponseError is logged with debug formatting
787+
Err(e) => log_error!(logger, "Error handling payjoin proposal: {:?}", e),
788+
Ok(txid) => log_info!(logger, "Payjoin proposal handled: {}", txid),
789+
}
790+
let o_txid = session.clone().original_psbt.clone().extract_tx().txid();
791+
match storage.delete_send_session(session) {
792+
Ok(_) => log_info!(logger, "Deleted payjoin send session: {}", o_txid),
793+
Err(e) => log_error!(logger, "Error deleting payjoin send session: {e}"),
794+
}
795+
});
796+
}
754797

755-
let mut cursor = Cursor::new(res.to_vec());
798+
async fn poll_payjoin_sender(
799+
stop: Arc<AtomicBool>,
800+
wallet: Arc<OnChainWallet<S>>,
801+
storage: Arc<S>,
802+
mut session: crate::payjoin::SendSession,
803+
) -> Result<bitcoin::psbt::Psbt, MutinyError> {
804+
let http = Client::builder()
805+
.build()
806+
.map_err(|_| MutinyError::Other(anyhow!("failed to build http client")))?;
807+
loop {
808+
if stop.load(Ordering::Relaxed) {
809+
return Err(MutinyError::NotRunning);
810+
}
756811

757-
log_debug!(self.logger, "Processing payjoin response");
758-
let proposal_psbt = ctx.process_response(&mut cursor).map_err(|e| {
759-
// unrecognized error contents may only appear in debug logs and will not Display
760-
log_debug!(self.logger, "Payjoin response error: {:?}", e);
761-
e
762-
})?;
812+
if session.expiry < utils::now() {
813+
wallet
814+
.cancel_tx(&session.clone().original_psbt.extract_tx())
815+
.map_err(|_| crate::payjoin::Error::CancelPayjoinTx)?;
816+
storage.delete_send_session(session)?;
817+
return Err(MutinyError::Payjoin(crate::payjoin::Error::SessionExpired));
818+
}
763819

764-
// convert to pdk types
765-
let original_psbt = PartiallySignedTransaction::from_str(&original_psbt.to_string())
766-
.map_err(|_| MutinyError::PayjoinConfigError)?;
767-
let proposal_psbt = PartiallySignedTransaction::from_str(&proposal_psbt.to_string())
768-
.map_err(|_| MutinyError::PayjoinConfigError)?;
820+
let (req, ctx) = session
821+
.req_ctx
822+
.extract_v2(random_ohttp_relay().to_owned())
823+
.map_err(|_| MutinyError::PayjoinConfigError)?;
824+
// extract_v2 mutates the session, so we need to update it in storage to not reuse keys
825+
storage.update_send_session(session.clone())?;
826+
let response = http
827+
.post(req.url)
828+
.header("Content-Type", "message/ohttp-req")
829+
.body(req.body)
830+
.send()
831+
.await
832+
.map_err(|_| MutinyError::Other(anyhow!("failed to parse payjoin response")))?;
833+
let mut reader =
834+
std::io::Cursor::new(response.bytes().await.map_err(|_| {
835+
MutinyError::Other(anyhow!("failed to parse payjoin response"))
836+
})?);
837+
838+
let psbt = ctx
839+
.process_response(&mut reader)
840+
.map_err(MutinyError::PayjoinResponse)?;
841+
if let Some(psbt) = psbt {
842+
let psbt = bitcoin::psbt::Psbt::from_str(&psbt.to_string())
843+
.map_err(|_| MutinyError::Other(anyhow!("psbt conversion failed")))?;
844+
return Ok(psbt);
845+
} else {
846+
log::info!("No response yet for POST payjoin request, retrying some seconds");
847+
std::thread::sleep(std::time::Duration::from_secs(5));
848+
}
849+
}
850+
}
769851

770-
log_debug!(self.logger, "Sending payjoin..");
771-
let tx = self
772-
.wallet
852+
async fn handle_proposal_psbt(
853+
logger: Arc<MutinyLogger>,
854+
wallet: Arc<OnChainWallet<S>>,
855+
original_psbt: PartiallySignedTransaction,
856+
proposal_psbt: PartiallySignedTransaction,
857+
labels: Vec<String>,
858+
) -> Result<Txid, MutinyError> {
859+
log_debug!(logger, "Sending payjoin..");
860+
let original_tx = original_psbt.clone().extract_tx();
861+
let tx = wallet
773862
.send_payjoin(original_psbt, proposal_psbt, labels)
774863
.await?;
775864
let txid = tx.txid();
776-
self.broadcast_transaction(tx).await?;
777-
log_debug!(self.logger, "Payjoin broadcast! TXID: {txid}");
865+
wallet.broadcast_transaction(tx).await?;
866+
wallet.cancel_tx(&original_tx)?;
867+
log_info!(logger, "Payjoin broadcast! TXID: {txid}");
778868
Ok(txid)
779869
}
780870

Diff for: mutiny-core/src/payjoin.rs

+66-8
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,14 @@ use std::sync::Arc;
33

44
use crate::error::MutinyError;
55
use crate::storage::MutinyStorage;
6-
use bitcoin::Transaction;
6+
use bitcoin::{psbt::Psbt, Transaction, Txid};
77
use core::time::Duration;
88
use gloo_net::websocket::futures::WebSocket;
99
use hex_conservative::DisplayHex;
1010
use once_cell::sync::Lazy;
1111
use payjoin::receive::v2::Enrolled;
1212
use payjoin::OhttpKeys;
13+
use pj::send::RequestContext;
1314
use serde::{Deserialize, Serialize};
1415
use url::Url;
1516

@@ -40,22 +41,46 @@ impl RecvSession {
4041
self.enrolled.pubkey()
4142
}
4243
}
44+
45+
#[derive(Clone, PartialEq, Serialize, Deserialize)]
46+
pub struct SendSession {
47+
pub original_psbt: Psbt,
48+
pub req_ctx: RequestContext,
49+
pub labels: Vec<String>,
50+
pub expiry: Duration,
51+
}
52+
4353
pub trait PayjoinStorage {
4454
fn list_recv_sessions(&self) -> Result<Vec<RecvSession>, MutinyError>;
4555
fn store_new_recv_session(&self, session: Enrolled) -> Result<RecvSession, MutinyError>;
4656
fn update_recv_session(&self, session: RecvSession) -> Result<(), MutinyError>;
4757
fn delete_recv_session(&self, id: &[u8; 33]) -> Result<(), MutinyError>;
58+
59+
fn list_send_sessions(&self) -> Result<Vec<SendSession>, MutinyError>;
60+
fn store_new_send_session(
61+
&self,
62+
labels: Vec<String>,
63+
original_psbt: Psbt,
64+
req_ctx: RequestContext,
65+
) -> Result<SendSession, MutinyError>;
66+
fn update_send_session(&self, session: SendSession) -> Result<(), MutinyError>;
67+
fn delete_send_session(&self, session: SendSession) -> Result<(), MutinyError>;
4868
}
4969

50-
const PAYJOIN_KEY_PREFIX: &str = "recvpj/";
70+
const RECV_PAYJOIN_KEY_PREFIX: &str = "recvpj/";
71+
const SEND_PAYJOIN_KEY_PREFIX: &str = "sendpj/";
5172

52-
fn get_payjoin_key(id: &[u8; 33]) -> String {
53-
format!("{PAYJOIN_KEY_PREFIX}{}", id.as_hex())
73+
fn get_recv_key(id: &[u8; 33]) -> String {
74+
format!("{RECV_PAYJOIN_KEY_PREFIX}{}", id.as_hex())
75+
}
76+
77+
fn get_send_key(original_txid: Txid) -> String {
78+
format!("{RECV_PAYJOIN_KEY_PREFIX}{}", original_txid)
5479
}
5580

5681
impl<S: MutinyStorage> PayjoinStorage for S {
5782
fn list_recv_sessions(&self) -> Result<Vec<RecvSession>, MutinyError> {
58-
let map: HashMap<String, RecvSession> = self.scan(PAYJOIN_KEY_PREFIX, None)?;
83+
let map: HashMap<String, RecvSession> = self.scan(RECV_PAYJOIN_KEY_PREFIX, None)?;
5984
Ok(map.values().map(|v| v.to_owned()).collect())
6085
}
6186

@@ -66,16 +91,49 @@ impl<S: MutinyStorage> PayjoinStorage for S {
6691
expiry: in_24_hours,
6792
payjoin_tx: None,
6893
};
69-
self.set_data(get_payjoin_key(&session.pubkey()), session.clone(), None)
94+
self.set_data(get_recv_key(&session.pubkey()), session.clone(), None)
7095
.map(|_| session)
7196
}
7297

7398
fn update_recv_session(&self, session: RecvSession) -> Result<(), MutinyError> {
74-
self.set_data(get_payjoin_key(&session.pubkey()), session, None)
99+
self.set_data(get_recv_key(&session.pubkey()), session, None)
75100
}
76101

77102
fn delete_recv_session(&self, id: &[u8; 33]) -> Result<(), MutinyError> {
78-
self.delete(&[get_payjoin_key(id)])
103+
self.delete(&[get_recv_key(id)])
104+
}
105+
106+
fn store_new_send_session(
107+
&self,
108+
labels: Vec<String>,
109+
original_psbt: Psbt,
110+
req_ctx: RequestContext,
111+
) -> Result<SendSession, MutinyError> {
112+
let in_24_hours = crate::utils::now() + Duration::from_secs(60 * 60 * 24);
113+
let o_txid = original_psbt.clone().extract_tx().txid();
114+
let session = SendSession {
115+
labels,
116+
original_psbt,
117+
expiry: in_24_hours,
118+
req_ctx,
119+
};
120+
self.set_data(o_txid.to_string(), session.clone(), None)
121+
.map(|_| session)
122+
}
123+
124+
fn list_send_sessions(&self) -> Result<Vec<SendSession>, MutinyError> {
125+
let map: HashMap<String, SendSession> = self.scan(SEND_PAYJOIN_KEY_PREFIX, None)?;
126+
Ok(map.values().map(|v| v.to_owned()).collect())
127+
}
128+
129+
fn update_send_session(&self, session: SendSession) -> Result<(), MutinyError> {
130+
let o_txid = session.clone().original_psbt.extract_tx().txid();
131+
self.set_data(get_send_key(o_txid), session, None)
132+
}
133+
134+
fn delete_send_session(&self, session: SendSession) -> Result<(), MutinyError> {
135+
let o_txid = session.original_psbt.extract_tx().txid();
136+
self.delete(&[get_send_key(o_txid)])
79137
}
80138
}
81139

0 commit comments

Comments
 (0)