Skip to content

Commit aed3714

Browse files
committed
WIP: add RFQ system HTLC accepted event notification
1 parent 6c781c7 commit aed3714

File tree

6 files changed

+130
-23
lines changed

6 files changed

+130
-23
lines changed

itest/rfq_test.go

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package itest
22

33
import (
44
"context"
5+
"fmt"
6+
"math"
57
"time"
68

79
"github.com/btcsuite/btcd/btcutil"
@@ -10,6 +12,7 @@ import (
1012
"github.com/lightninglabs/taproot-assets/taprpc/rfqrpc"
1113
"github.com/lightningnetwork/lnd/chainreg"
1214
"github.com/lightningnetwork/lnd/lnrpc"
15+
"github.com/lightningnetwork/lnd/lnrpc/routerrpc"
1316
"github.com/lightningnetwork/lnd/lntest"
1417
"github.com/lightningnetwork/lnd/lntest/node"
1518
"github.com/lightningnetwork/lnd/lntest/wait"
@@ -96,9 +99,8 @@ func testRfqHtlcIntercept(t *harnessTest) {
9699
event, err := carolEventNtfns.Recv()
97100
require.NoError(t.t, err)
98101

99-
if _, ok := event.Event.(*rfqrpc.RfqEvent_IncomingAcceptQuote); ok {
100-
return nil
101-
}
102+
_, ok := event.Event.(*rfqrpc.RfqEvent_IncomingAcceptQuote)
103+
require.True(t.t, ok, "unexpected event: %v", event)
102104

103105
return nil
104106
}, defaultWaitTimeout)
@@ -159,13 +161,53 @@ func testRfqHtlcIntercept(t *harnessTest) {
159161
})
160162
invoice := ts.CarolLnd.RPC.LookupInvoice(addInvoiceResp.RHash)
161163

162-
// Alice pays the invoice.
163-
t.lndHarness.CompletePaymentRequests(
164-
ts.AliceLnd, []string{invoice.PaymentRequest},
164+
// Register to receive RFQ events from Bob's tapd node. We'll use this
165+
// to wait for Bob to receive the HTLC with the asset transfer specific
166+
// scid.
167+
bobEventNtfns, err := ts.BobTapd.SubscribeRfqEventNtfns(
168+
ctxb, &rfqrpc.SubscribeRfqEventNtfnsRequest{},
165169
)
170+
require.NoError(t.t, err)
171+
172+
// Alice pays the invoice.
173+
t.Log("Alice paying invoice")
174+
req := &routerrpc.SendPaymentRequest{
175+
PaymentRequest: invoice.PaymentRequest,
176+
TimeoutSeconds: int32(wait.PaymentTimeout.Seconds()),
177+
FeeLimitMsat: math.MaxInt64,
178+
}
179+
ts.AliceLnd.RPC.SendPayment(req)
180+
t.Log("Alice payment sent")
166181

167182
// At this point Bob should have received a HTLC with the asset transfer
168-
// specific scid.
183+
// specific scid. We'll wait for Bob to publish an accept HTLC event and
184+
// then validate it against the accepted quote.
185+
waitErr = wait.NoError(func() error {
186+
t.Log("Waiting for Bob to receive HTLC")
187+
188+
event, err := bobEventNtfns.Recv()
189+
require.NoError(t.t, err)
190+
191+
acceptHtlc, ok := event.Event.(*rfqrpc.RfqEvent_AcceptHtlc)
192+
if ok {
193+
require.Equal(
194+
t.t, acceptedQuote.Scid,
195+
acceptHtlc.AcceptHtlc.Scid,
196+
)
197+
t.Log("Bob has accepted the HTLC")
198+
return nil
199+
}
200+
201+
return fmt.Errorf("unexpected event: %v", event)
202+
}, defaultWaitTimeout)
203+
require.NoError(t.t, waitErr)
204+
205+
// Close event streams.
206+
err = carolEventNtfns.CloseSend()
207+
require.NoError(t.t, err)
208+
209+
err = bobEventNtfns.CloseSend()
210+
require.NoError(t.t, err)
169211
}
170212

171213
// newLndNode creates a new lnd node with the given name and funds its wallet

rfq/manager.go

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"time"
88

99
"github.com/btcsuite/btcd/btcec/v2"
10+
"github.com/lightninglabs/lndclient"
1011
"github.com/lightninglabs/taproot-assets/asset"
1112
"github.com/lightninglabs/taproot-assets/fn"
1213
"github.com/lightninglabs/taproot-assets/rfqmsg"
@@ -77,6 +78,10 @@ type Manager struct {
7778
// messages.
7879
outgoingMessages chan rfqmsg.OutgoingMsg
7980

81+
// acceptHtlcEvents is a channel which is populated with accept HTLCs
82+
// events.
83+
acceptHtlcEvents chan AcceptHtlcEvent
84+
8085
// peerAcceptedQuotes is a map of serialised short channel IDs (SCIDs)
8186
// to associated accepted quotes. These quotes have been accepted by
8287
// peer nodes and are therefore available for use in buying assets.
@@ -105,6 +110,7 @@ func NewManager(cfg ManagerCfg) (Manager, error) {
105110
incomingMessages: make(chan rfqmsg.IncomingMsg),
106111
outgoingMessages: make(chan rfqmsg.OutgoingMsg),
107112

113+
acceptHtlcEvents: make(chan AcceptHtlcEvent),
108114
peerAcceptedQuotes: make(map[SerialisedScid]rfqmsg.Accept),
109115

110116
subscribers: make(
@@ -124,8 +130,9 @@ func (m *Manager) startSubsystems(ctx context.Context) error {
124130

125131
// Initialise and start the order handler.
126132
m.orderHandler, err = NewOrderHandler(OrderHandlerCfg{
127-
CleanupInterval: CacheCleanupInterval,
128-
HtlcInterceptor: m.cfg.HtlcInterceptor,
133+
CleanupInterval: CacheCleanupInterval,
134+
HtlcInterceptor: m.cfg.HtlcInterceptor,
135+
AcceptHtlcEvents: m.acceptHtlcEvents,
129136
})
130137
if err != nil {
131138
return fmt.Errorf("error initializing RFQ order handler: %w",
@@ -339,6 +346,10 @@ func (m *Manager) mainEventLoop() {
339346
err)
340347
}
341348

349+
case acceptHtlcEvent := <-m.acceptHtlcEvents:
350+
// Handle a HTLC accept event. Notify any subscribers.
351+
m.publishSubscriberEvent(&acceptHtlcEvent)
352+
342353
// Handle errors from the negotiator.
343354
case err := <-m.negotiator.ErrChan:
344355
log.Warnf("Negotiator has encountered an error: %v",
@@ -487,6 +498,7 @@ func (m *Manager) publishSubscriberEvent(event fn.Event) {
487498
// IncomingAcceptQuoteEvent is an event that is broadcast when the RFQ manager
488499
// receives an accept quote message from a peer.
489500
type IncomingAcceptQuoteEvent struct {
501+
// timestamp is the event creation UTC timestamp.
490502
timestamp time.Time
491503

492504
// Accept is the accepted quote.
@@ -511,3 +523,35 @@ func (q *IncomingAcceptQuoteEvent) Timestamp() time.Time {
511523
// Ensure that the IncomingAcceptQuoteEvent struct implements the Event
512524
// interface.
513525
var _ fn.Event = (*IncomingAcceptQuoteEvent)(nil)
526+
527+
// AcceptHtlcEvent is an event that is sent to the accept HTLCs channel when
528+
// an HTLC is accepted.
529+
type AcceptHtlcEvent struct {
530+
// Timestamp is the unix timestamp at which the HTLC was accepted.
531+
timestamp uint64
532+
533+
// Htlc is the intercepted HTLC.
534+
Htlc lndclient.InterceptedHtlc
535+
536+
// ChannelRemit is the channel remit that the HTLC complies with.
537+
ChannelRemit ChannelRemit
538+
}
539+
540+
// NewAcceptHtlcEvent creates a new AcceptedHtlcEvent.
541+
func NewAcceptHtlcEvent(htlc lndclient.InterceptedHtlc,
542+
channelRemit ChannelRemit) AcceptHtlcEvent {
543+
544+
return AcceptHtlcEvent{
545+
timestamp: uint64(time.Now().UTC().Unix()),
546+
Htlc: htlc,
547+
ChannelRemit: channelRemit,
548+
}
549+
}
550+
551+
// Timestamp returns the event creation UTC timestamp.
552+
func (q *AcceptHtlcEvent) Timestamp() time.Time {
553+
return time.Unix(int64(q.timestamp), 0).UTC()
554+
}
555+
556+
// Ensure that the AcceptedHtlcEvent struct implements the Event interface.
557+
var _ fn.Event = (*AcceptHtlcEvent)(nil)

rfq/order.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ type OrderHandlerCfg struct {
8585
// HtlcInterceptor is the HTLC interceptor. This component is used to
8686
// intercept and accept/reject HTLCs.
8787
HtlcInterceptor HtlcInterceptor
88+
89+
// AcceptHtlcEvents is a channel that receives accepted HTLCs.
90+
AcceptHtlcEvents chan<- AcceptHtlcEvent
8891
}
8992

9093
// OrderHandler orchestrates management of accepted quote bundles. It monitors
@@ -157,7 +160,9 @@ func (h *OrderHandler) handleIncomingHtlc(_ context.Context,
157160
}, nil
158161
}
159162

160-
log.Info("HTLC complies with channel remit. Accepting HTLC")
163+
log.Debug("HTLC complies with channel remit.")
164+
acceptHtlcEvent := NewAcceptHtlcEvent(htlc, *channelRemit)
165+
h.cfg.AcceptHtlcEvents <- acceptHtlcEvent
161166

162167
return &lndclient.InterceptedHtlcResponse{
163168
Action: lndclient.InterceptorActionResume,

rfq/stream.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package rfq
33
import (
44
"context"
55
"fmt"
6-
"math/rand"
76
"sync"
87

98
"github.com/lightninglabs/lndclient"
@@ -43,8 +42,6 @@ type StreamHandler struct {
4342
// stream handler.
4443
cfg StreamHandlerCfg
4544

46-
randId int
47-
4845
// recvRawMessages is a channel that receives incoming raw peer
4946
// messages.
5047
recvRawMessages <-chan lndclient.CustomMessage
@@ -81,15 +78,9 @@ func NewStreamHandler(ctx context.Context, cfg StreamHandlerCfg,
8178
"messages via peer message porter: %w", err)
8279
}
8380

84-
minRand := 100
85-
maxRand := 1000
86-
randId := rand.Intn(maxRand-minRand+1) + minRand
87-
8881
return &StreamHandler{
8982
cfg: cfg,
9083

91-
randId: randId,
92-
9384
recvRawMessages: msgChan,
9485
errRecvRawMessages: peerMsgErrChan,
9586

@@ -132,9 +123,10 @@ func (h *StreamHandler) handleIncomingQuoteRequest(
132123

133124
h.seenMessagesMtx.Unlock()
134125

135-
log.Debugf("(%d) Stream handling incoming message (msg_type=%T, msg_id=%s, "+
136-
"origin_peer=%s, self=%s)", h.randId, quoteRequest, quoteRequest.ID.String(),
137-
quoteRequest.MsgPeer(), h.cfg.LightningSelfId)
126+
log.Debugf("Stream handling incoming message (msg_type=%T, "+
127+
"msg_id=%s, origin_peer=%s, self=%s)", quoteRequest,
128+
quoteRequest.ID.String(), quoteRequest.MsgPeer(),
129+
h.cfg.LightningSelfId)
138130

139131
// Send the quote request to the RFQ manager.
140132
var msg rfqmsg.IncomingMsg = quoteRequest

rpcserver.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4771,6 +4771,17 @@ func marshallRfqEvent(eventInterface fn.Event) (*rfqrpc.RfqEvent, error) {
47714771
return &rfqrpc.RfqEvent{
47724772
Event: eventRpc,
47734773
}, nil
4774+
4775+
case *rfq.AcceptHtlcEvent:
4776+
eventRpc := &rfqrpc.RfqEvent_AcceptHtlc{
4777+
AcceptHtlc: &rfqrpc.AcceptHtlcEvent{
4778+
Timestamp: uint64(timestamp),
4779+
Scid: uint64(event.ChannelRemit.Scid),
4780+
},
4781+
}
4782+
return &rfqrpc.RfqEvent{
4783+
Event: eventRpc,
4784+
}, nil
47744785
}
47754786

47764787
return nil, fmt.Errorf("unknown RFQ event type: %T", eventInterface)

taprpc/rfqrpc/rfq.proto

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,9 @@ message QueryRfqAcceptedQuotesResponse {
105105
repeated AcceptedQuote accepted_quotes = 1;
106106
}
107107

108+
message SubscribeRfqEventNtfnsRequest {
109+
}
110+
108111
message IncomingAcceptQuoteEvent {
109112
// Unix timestamp.
110113
uint64 timestamp = 1;
@@ -113,13 +116,23 @@ message IncomingAcceptQuoteEvent {
113116
AcceptedQuote accepted_quote = 2;
114117
}
115118

116-
message SubscribeRfqEventNtfnsRequest {
119+
message AcceptHtlcEvent {
120+
// Unix timestamp.
121+
uint64 timestamp = 1;
122+
123+
// scid is the short channel ID of the channel over which the payment for
124+
// the quote is made.
125+
uint64 scid = 2;
117126
}
118127

119128
message RfqEvent {
120129
oneof event {
121130
// incoming_accept_quote is an event that is sent when an incoming
122131
// accept quote message is received.
123132
IncomingAcceptQuoteEvent incoming_accept_quote = 1;
133+
134+
// accept_htlc is an event that is sent when a HTLC is accepted by the
135+
// RFQ service.
136+
AcceptHtlcEvent accept_htlc = 2;
124137
}
125138
}

0 commit comments

Comments
 (0)