Skip to content

Commit 881bd40

Browse files
committed
Implement on-the-fly funding
Implement the on-the-fly funding protocol: when a payment cannot be relayed because of a liquidity issue, we notify the `Peer` actor that we'd like to trigger on-the-fly funding if available. If available, we we send a funding proposal to our peer and keep track of its status. Once a matching funding transaction is signed, we persist this funding attempt and wait for the additional liquidity to be available (once the channel is ready or the splice locked). We will then frequently try to relay the payment to get paid our liquidity fees. If the payment keeps getting rejected, or we cannot connect to our peer, we abandon the payment when it reaches its CLTV expiry, which ensures that the upstream channels are not at risk. When using on-the-fly funding, we use a single channel with our peer. If they try to open another channel while one is available, we reject their request and expect a splice instead.
1 parent 98c2dd1 commit 881bd40

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+3069
-179
lines changed

eclair-core/src/main/resources/reference.conf

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,14 @@ eclair {
339339
]
340340
}
341341

342+
// On-the-fly funding leverages liquidity ads to fund channels with wallet peers based on their payment patterns.
343+
on-the-fly-funding {
344+
wake-up-timeout = 30 seconds
345+
// If our peer doesn't respond to our funding proposal, we must fail the corresponding upstream HTLCs.
346+
// Since MPP may be used, we should use a timeout greater than the MPP timeout.
347+
proposal-timeout = 90 seconds
348+
}
349+
342350
peer-connection {
343351
auth-timeout = 15 seconds // will disconnect if connection authentication doesn't happen within that timeframe
344352
init-timeout = 15 seconds // will disconnect if initialization doesn't happen within that timeframe

eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import fr.acinq.eclair.db._
3030
import fr.acinq.eclair.io.MessageRelay.{RelayAll, RelayChannelsOnly, RelayPolicy}
3131
import fr.acinq.eclair.io.PeerConnection
3232
import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig
33+
import fr.acinq.eclair.payment.relay.OnTheFlyFunding
3334
import fr.acinq.eclair.payment.relay.Relayer.{AsyncPaymentsParams, RelayFees, RelayParams}
3435
import fr.acinq.eclair.router.Announcements.AddressException
3536
import fr.acinq.eclair.router.Graph.{HeuristicsConstants, WeightRatios}
@@ -90,7 +91,7 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,
9091
purgeInvoicesInterval: Option[FiniteDuration],
9192
revokedHtlcInfoCleanerConfig: RevokedHtlcInfoCleaner.Config,
9293
willFundRates_opt: Option[LiquidityAds.WillFundRates],
93-
wakeUpTimeout: FiniteDuration) {
94+
onTheFlyFundingConfig: OnTheFlyFunding.Config) {
9495
val privateKey: Crypto.PrivateKey = nodeKeyManager.nodeKey.privateKey
9596

9697
val nodeId: PublicKey = nodeKeyManager.nodeId
@@ -663,7 +664,10 @@ object NodeParams extends Logging {
663664
interval = FiniteDuration(config.getDuration("db.revoked-htlc-info-cleaner.interval").getSeconds, TimeUnit.SECONDS)
664665
),
665666
willFundRates_opt = willFundRates_opt,
666-
wakeUpTimeout = 30 seconds,
667+
onTheFlyFundingConfig = OnTheFlyFunding.Config(
668+
wakeUpTimeout = FiniteDuration(config.getDuration("on-the-fly-funding.wake-up-timeout").getSeconds, TimeUnit.SECONDS),
669+
proposalTimeout = FiniteDuration(config.getDuration("on-the-fly-funding.proposal-timeout").getSeconds, TimeUnit.SECONDS),
670+
),
667671
)
668672
}
669673
}

eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelData.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ object Upstream {
146146
val expiryIn: CltvExpiry = add.cltvExpiry
147147
}
148148
/** Our node is forwarding a payment based on a set of HTLCs from potentially multiple upstream channels. */
149-
case class Trampoline(received: Seq[Channel]) extends Hot {
149+
case class Trampoline(received: List[Channel]) extends Hot {
150150
override val amountIn: MilliSatoshi = received.map(_.add.amountMsat).sum
151151
// We must use the lowest expiry of the incoming HTLC set.
152152
val expiryIn: CltvExpiry = received.map(_.add.cltvExpiry).min
@@ -165,6 +165,10 @@ object Upstream {
165165

166166
/** Our node is forwarding a single incoming HTLC. */
167167
case class Channel(originChannelId: ByteVector32, originHtlcId: Long, amountIn: MilliSatoshi) extends Cold
168+
object Channel {
169+
def apply(add: UpdateAddHtlc): Channel = Channel(add.channelId, add.id, add.amountMsat)
170+
}
171+
168172
/** Our node is forwarding a payment based on a set of HTLCs from potentially multiple upstream channels. */
169173
case class Trampoline(originHtlcs: List[Channel]) extends Cold { override val amountIn: MilliSatoshi = originHtlcs.map(_.amountIn).sum }
170174
}

eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ case class ChannelAborted(channel: ActorRef, remoteNodeId: PublicKey, channelId:
5454
/** This event will be sent once a channel has been successfully opened and is ready to process payments. */
5555
case class ChannelOpened(channel: ActorRef, remoteNodeId: PublicKey, channelId: ByteVector32) extends ChannelEvent
5656

57+
/** This event is sent once channel_ready or splice_locked have been exchanged. */
58+
case class ChannelReadyForPayments(channel: ActorRef, remoteNodeId: PublicKey, channelId: ByteVector32, fundingTxIndex: Long) extends ChannelEvent
59+
5760
case class LocalChannelUpdate(channel: ActorRef, channelId: ByteVector32, shortIds: ShortIds, remoteNodeId: PublicKey, channelAnnouncement_opt: Option[ChannelAnnouncement], channelUpdate: ChannelUpdate, commitments: Commitments) extends ChannelEvent {
5861
/**
5962
* We always include the local alias because we must always be able to route based on it.

eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import fr.acinq.eclair.crypto.keymanager.ChannelKeyManager
4444
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent.EventType
4545
import fr.acinq.eclair.db.PendingCommandsDb
4646
import fr.acinq.eclair.io.Peer
47+
import fr.acinq.eclair.io.Peer.LiquidityPurchaseSigned
4748
import fr.acinq.eclair.payment.relay.Relayer
4849
import fr.acinq.eclair.payment.{Bolt11Invoice, PaymentSettlingOnChain}
4950
import fr.acinq.eclair.router.Announcements
@@ -1095,10 +1096,13 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
10951096
log.info("ignoring outgoing interactive-tx message {} from previous session", msg.getClass.getSimpleName)
10961097
stay()
10971098
}
1098-
case InteractiveTxBuilder.Succeeded(signingSession, commitSig) =>
1099+
case InteractiveTxBuilder.Succeeded(signingSession, commitSig, liquidityPurchase_opt) =>
10991100
log.info(s"splice tx created with fundingTxIndex=${signingSession.fundingTxIndex} fundingTxId=${signingSession.fundingTx.txId}")
11001101
cmd_opt.foreach(cmd => cmd.replyTo ! RES_SPLICE(fundingTxIndex = signingSession.fundingTxIndex, signingSession.fundingTx.txId, signingSession.fundingParams.fundingAmount, signingSession.localCommit.fold(_.spec, _.spec).toLocal))
11011102
remoteCommitSig_opt.foreach(self ! _)
1103+
liquidityPurchase_opt.collect {
1104+
case purchase if !signingSession.fundingParams.isInitiator => peer ! LiquidityPurchaseSigned(d.channelId, signingSession.fundingTx.txId, signingSession.fundingTxIndex, d.commitments.params.remoteParams.htlcMinimum, purchase)
1105+
}
11021106
val d1 = d.copy(spliceStatus = SpliceStatus.SpliceWaitingForSigs(signingSession))
11031107
stay() using d1 storing() sending commitSig
11041108
case f: InteractiveTxBuilder.Failed =>
@@ -2139,6 +2143,12 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
21392143
}
21402144
}
21412145

2146+
// We tell the peer that the channel is ready to process payments that may be queued.
2147+
if (!shutdownInProgress) {
2148+
val fundingTxIndex = commitments1.active.map(_.fundingTxIndex).min
2149+
peer ! ChannelReadyForPayments(self, remoteNodeId, d.channelId, fundingTxIndex)
2150+
}
2151+
21422152
goto(NORMAL) using d.copy(commitments = commitments1, spliceStatus = spliceStatus1) sending sendQueue
21432153
}
21442154

@@ -2710,6 +2720,12 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
27102720
if (oldCommitments.availableBalanceForSend != newCommitments.availableBalanceForSend || oldCommitments.availableBalanceForReceive != newCommitments.availableBalanceForReceive) {
27112721
context.system.eventStream.publish(AvailableBalanceChanged(self, newCommitments.channelId, shortIds, newCommitments))
27122722
}
2723+
if (oldCommitments.active.size != newCommitments.active.size) {
2724+
// Some commitments have been deactivated, which means our available balance changed, which may allow forwarding
2725+
// payments that couldn't be forwarded before.
2726+
val fundingTxIndex = newCommitments.active.map(_.fundingTxIndex).min
2727+
peer ! ChannelReadyForPayments(self, remoteNodeId, newCommitments.channelId, fundingTxIndex)
2728+
}
27132729
}
27142730

27152731
private def maybeUpdateMaxHtlcAmount(currentMaxHtlcAmount: MilliSatoshi, newCommitments: Commitments): Unit = {

eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenDualFunded.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import fr.acinq.eclair.channel.fund.InteractiveTxBuilder.{FullySignedSharedTrans
2727
import fr.acinq.eclair.channel.fund.{InteractiveTxBuilder, InteractiveTxSigningSession}
2828
import fr.acinq.eclair.channel.publish.TxPublisher.SetChannelId
2929
import fr.acinq.eclair.crypto.ShaChain
30-
import fr.acinq.eclair.io.Peer.OpenChannelResponse
30+
import fr.acinq.eclair.io.Peer.{LiquidityPurchaseSigned, OpenChannelResponse}
3131
import fr.acinq.eclair.wire.protocol._
3232
import fr.acinq.eclair.{MilliSatoshiLong, RealShortChannelId, ToMilliSatoshiConversion, UInt64, randomBytes32}
3333

@@ -339,9 +339,12 @@ trait ChannelOpenDualFunded extends DualFundingHandlers with ErrorHandlers {
339339

340340
case Event(msg: InteractiveTxBuilder.Response, d: DATA_WAIT_FOR_DUAL_FUNDING_CREATED) => msg match {
341341
case InteractiveTxBuilder.SendMessage(_, msg) => stay() sending msg
342-
case InteractiveTxBuilder.Succeeded(status, commitSig) =>
342+
case InteractiveTxBuilder.Succeeded(status, commitSig, liquidityPurchase_opt) =>
343343
d.deferred.foreach(self ! _)
344344
d.replyTo_opt.foreach(_ ! OpenChannelResponse.Created(d.channelId, status.fundingTx.txId, status.fundingTx.tx.localFees.truncateToSatoshi))
345+
liquidityPurchase_opt.collect {
346+
case purchase if !status.fundingParams.isInitiator => peer ! LiquidityPurchaseSigned(d.channelId, status.fundingTx.txId, status.fundingTxIndex, d.channelParams.remoteParams.htlcMinimum, purchase)
347+
}
345348
val d1 = DATA_WAIT_FOR_DUAL_FUNDING_SIGNED(d.channelParams, d.secondRemotePerCommitmentPoint, d.localPushAmount, d.remotePushAmount, status, None)
346349
goto(WAIT_FOR_DUAL_FUNDING_SIGNED) using d1 storing() sending commitSig
347350
case f: InteractiveTxBuilder.Failed =>
@@ -687,9 +690,12 @@ trait ChannelOpenDualFunded extends DualFundingHandlers with ErrorHandlers {
687690
case RbfStatus.RbfInProgress(cmd_opt, _, remoteCommitSig_opt) =>
688691
msg match {
689692
case InteractiveTxBuilder.SendMessage(_, msg) => stay() sending msg
690-
case InteractiveTxBuilder.Succeeded(signingSession, commitSig) =>
693+
case InteractiveTxBuilder.Succeeded(signingSession, commitSig, liquidityPurchase_opt) =>
691694
cmd_opt.foreach(cmd => cmd.replyTo ! RES_BUMP_FUNDING_FEE(rbfIndex = d.previousFundingTxs.length, signingSession.fundingTx.txId, signingSession.fundingTx.tx.localFees.truncateToSatoshi))
692695
remoteCommitSig_opt.foreach(self ! _)
696+
liquidityPurchase_opt.collect {
697+
case purchase if !signingSession.fundingParams.isInitiator => peer ! LiquidityPurchaseSigned(d.channelId, signingSession.fundingTx.txId, signingSession.fundingTxIndex, d.commitments.params.remoteParams.htlcMinimum, purchase)
698+
}
693699
val d1 = d.copy(rbfStatus = RbfStatus.RbfWaitingForSigs(signingSession))
694700
stay() using d1 storing() sending commitSig
695701
case f: InteractiveTxBuilder.Failed =>

eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/CommonFundingHandlers.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ trait CommonFundingHandlers extends CommonHandlers {
135135
// used to get the final shortChannelId, used in announcements (if minDepth >= ANNOUNCEMENTS_MINCONF this event will fire instantly)
136136
blockchain ! WatchFundingDeeplyBuried(self, commitments.latest.fundingTxId, ANNOUNCEMENTS_MINCONF)
137137
val commitments1 = commitments.modify(_.remoteNextCommitInfo).setTo(Right(channelReady.nextPerCommitmentPoint))
138+
peer ! ChannelReadyForPayments(self, remoteNodeId, commitments.channelId, fundingTxIndex = 0)
138139
DATA_NORMAL(commitments1, shortIds1, None, initialChannelUpdate, None, None, None, SpliceStatus.NoSplice)
139140
}
140141

eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ object InteractiveTxBuilder {
9090

9191
sealed trait Response
9292
case class SendMessage(sessionId: ByteVector32, msg: LightningMessage) extends Response
93-
case class Succeeded(signingSession: InteractiveTxSigningSession.WaitingForSigs, commitSig: CommitSig) extends Response
93+
case class Succeeded(signingSession: InteractiveTxSigningSession.WaitingForSigs, commitSig: CommitSig, liquidityPurchase_opt: Option[LiquidityAds.Purchase]) extends Response
9494
sealed trait Failed extends Response { def cause: ChannelException }
9595
case class LocalFailure(cause: ChannelException) extends Failed
9696
case class RemoteFailure(cause: ChannelException) extends Failed
@@ -370,12 +370,24 @@ object InteractiveTxBuilder {
370370
// Note that pending HTLCs are ignored: splices only affect the main outputs.
371371
val nextLocalBalance = purpose.previousLocalBalance + fundingParams.localContribution - localPushAmount + remotePushAmount - liquidityFee
372372
val nextRemoteBalance = purpose.previousRemoteBalance + fundingParams.remoteContribution - remotePushAmount + localPushAmount + liquidityFee
373+
val liquidityPaymentTypeOk = liquidityPurchase_opt match {
374+
case Some(l) if !fundingParams.isInitiator => l.paymentDetails match {
375+
case LiquidityAds.PaymentDetails.FromChannelBalance | _: LiquidityAds.PaymentDetails.FromChannelBalanceForFutureHtlc => true
376+
// If our peer has enough balance to pay the liquidity fees, they shouldn't use future HTLCs which
377+
// involves trust: they should directly pay from their channel balance.
378+
case _: LiquidityAds.PaymentDetails.FromFutureHtlc | _: LiquidityAds.PaymentDetails.FromFutureHtlcWithPreimage => nextRemoteBalance < l.fees.total
379+
}
380+
case _ => true
381+
}
373382
if (fundingParams.fundingAmount < fundingParams.dustLimit) {
374383
replyTo ! LocalFailure(FundingAmountTooLow(channelParams.channelId, fundingParams.fundingAmount, fundingParams.dustLimit))
375384
Behaviors.stopped
376385
} else if (nextLocalBalance < 0.msat || nextRemoteBalance < 0.msat) {
377386
replyTo ! LocalFailure(InvalidFundingBalances(channelParams.channelId, fundingParams.fundingAmount, nextLocalBalance, nextRemoteBalance))
378387
Behaviors.stopped
388+
} else if (!liquidityPaymentTypeOk) {
389+
replyTo ! LocalFailure(InvalidLiquidityAdsPaymentType(channelParams.channelId, liquidityPurchase_opt.get.paymentDetails.paymentType, Set(LiquidityAds.PaymentType.FromChannelBalance, LiquidityAds.PaymentType.FromChannelBalanceForFutureHtlc)))
390+
Behaviors.stopped
379391
} else {
380392
val actor = new InteractiveTxBuilder(replyTo, sessionId, nodeParams, channelParams, fundingParams, purpose, localPushAmount, remotePushAmount, liquidityPurchase_opt, wallet, stash, context)
381393
actor.start()
@@ -805,7 +817,7 @@ private class InteractiveTxBuilder(replyTo: ActorRef[InteractiveTxBuilder.Respon
805817
Behaviors.receiveMessagePartial {
806818
case SignTransactionResult(signedTx) =>
807819
log.info(s"interactive-tx txid=${signedTx.txId} partially signed with {} local inputs, {} remote inputs, {} local outputs and {} remote outputs", signedTx.tx.localInputs.length, signedTx.tx.remoteInputs.length, signedTx.tx.localOutputs.length, signedTx.tx.remoteOutputs.length)
808-
replyTo ! Succeeded(InteractiveTxSigningSession.WaitingForSigs(fundingParams, purpose.fundingTxIndex, signedTx, Left(localCommit), remoteCommit), commitSig)
820+
replyTo ! Succeeded(InteractiveTxSigningSession.WaitingForSigs(fundingParams, purpose.fundingTxIndex, signedTx, Left(localCommit), remoteCommit), commitSig, liquidityPurchase_opt)
809821
Behaviors.stopped
810822
case WalletFailure(t) =>
811823
log.error("could not sign funding transaction: ", t)

eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ trait Databases {
4444
def peers: PeersDb
4545
def payments: PaymentsDb
4646
def pendingCommands: PendingCommandsDb
47+
def onTheFlyFunding: OnTheFlyFundingDb
4748
//@formatter:on
4849
}
4950

@@ -65,6 +66,7 @@ object Databases extends Logging {
6566
peers: SqlitePeersDb,
6667
payments: SqlitePaymentsDb,
6768
pendingCommands: SqlitePendingCommandsDb,
69+
onTheFlyFunding: SqliteOnTheFlyFundingDb,
6870
private val backupConnection: Connection) extends Databases with FileBackup {
6971
override def backup(backupFile: File): Unit = SqliteUtils.using(backupConnection.createStatement()) {
7072
statement => {
@@ -83,6 +85,7 @@ object Databases extends Logging {
8385
peers = new SqlitePeersDb(eclairJdbc),
8486
payments = new SqlitePaymentsDb(eclairJdbc),
8587
pendingCommands = new SqlitePendingCommandsDb(eclairJdbc),
88+
onTheFlyFunding = new SqliteOnTheFlyFundingDb(eclairJdbc),
8689
backupConnection = eclairJdbc
8790
)
8891
}
@@ -94,6 +97,7 @@ object Databases extends Logging {
9497
peers: PgPeersDb,
9598
payments: PgPaymentsDb,
9699
pendingCommands: PgPendingCommandsDb,
100+
onTheFlyFunding: PgOnTheFlyFundingDb,
97101
dataSource: HikariDataSource,
98102
lock: PgLock) extends Databases with ExclusiveLock {
99103
override def obtainExclusiveLock(): Unit = lock.obtainExclusiveLock(dataSource)
@@ -154,6 +158,7 @@ object Databases extends Logging {
154158
peers = new PgPeersDb,
155159
payments = new PgPaymentsDb,
156160
pendingCommands = new PgPendingCommandsDb,
161+
onTheFlyFunding = new PgOnTheFlyFundingDb,
157162
dataSource = ds,
158163
lock = lock)
159164

eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorL
8989
case ChannelPaymentRelayed(_, _, _, fromChannelId, toChannelId, _, _) =>
9090
channelsDb.updateChannelMeta(fromChannelId, ChannelEvent.EventType.PaymentReceived)
9191
channelsDb.updateChannelMeta(toChannelId, ChannelEvent.EventType.PaymentSent)
92+
case OnTheFlyFundingPaymentRelayed(_, incoming, outgoing) =>
93+
incoming.foreach(p => channelsDb.updateChannelMeta(p.channelId, ChannelEvent.EventType.PaymentReceived))
94+
outgoing.foreach(p => channelsDb.updateChannelMeta(p.channelId, ChannelEvent.EventType.PaymentSent))
9295
}
9396
auditDb.add(e)
9497

0 commit comments

Comments
 (0)