Skip to content

Commit 1e1498e

Browse files
committed
WIP
1 parent c8807f7 commit 1e1498e

File tree

6 files changed

+111
-99
lines changed

6 files changed

+111
-99
lines changed

rfqmsg/accept.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,9 @@ type Accept struct {
152152
// NewAcceptFromRequest creates a new instance of a quote accept message given
153153
// a quote request message.
154154
func NewAcceptFromRequest(request Request, askingPrice lnwire.MilliSatoshi,
155-
expiry uint64, sig [64]byte) Accept {
155+
expiry uint64) Accept {
156+
157+
var sig [64]byte
156158

157159
return Accept{
158160
Peer: request.Peer,

rfqservice/negotiator.go

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@ type NegotiatorCfg struct {
1515
PriceOracle PriceOracle
1616
}
1717

18-
// Negotiator is a struct that handles the negotiation of quotes. It is a
19-
// subsystem of the RFQ system. It determines whether a quote is accepted or
20-
// rejected.
18+
// Negotiator is a struct that handles the negotiation of quotes. It is a RFQ
19+
// subsystem. It determines whether a quote is accepted or rejected.
2120
type Negotiator struct {
2221
startOnce sync.Once
2322
stopOnce sync.Once
@@ -50,7 +49,7 @@ func NewNegotiator(cfg NegotiatorCfg,
5049

5150
// queryPriceOracle queries the price oracle for the asking price.
5251
func (n *Negotiator) queryPriceOracle(
53-
req rfqmsg.Request) (*PriceOracleSuggestedRate, error) {
52+
req rfqmsg.Request) (*OracleAskingPriceResponse, error) {
5453

5554
ctx, cancel := n.WithCtxQuitNoTimeout()
5655
defer cancel()
@@ -66,46 +65,47 @@ func (n *Negotiator) queryPriceOracle(
6665
return res, nil
6766
}
6867

69-
// handlePriceOracleResponse handles the response from the price oracle.
70-
func (n *Negotiator) handlePriceOracleResponse(
71-
request rfqmsg.Request, oracleResponse *PriceOracleSuggestedRate) error {
68+
// handlePriceOracleResponse handles a response from the price oracle.
69+
func (n *Negotiator) handlePriceOracleResponse(request rfqmsg.Request,
70+
oracleResponse *OracleAskingPriceResponse) error {
7271

73-
// If the suggested rate is nil, then we will return the error message
72+
// If the asking price is nil, then we will return the error message
7473
// supplied by the price oracle.
7574
if oracleResponse.AskingPrice == nil {
7675
rejectMsg := rfqmsg.NewRejectMsg(
77-
request.Peer, request.ID, oracleResponse.Err,
76+
request.Peer, request.ID, *oracleResponse.Err,
7877
)
7978
var msg rfqmsg.OutgoingMsg = &rejectMsg
8079

8180
sendSuccess := fn.SendOrQuit(n.outgoingMessages, msg, n.Quit)
8281
if !sendSuccess {
83-
return fmt.Errorf("negotiator failed to send reject " +
84-
"message")
82+
return fmt.Errorf("negotiator failed to add reject " +
83+
"message to the outgoing messages channel")
8584
}
8685
}
8786

88-
// If the suggested rate is not nil, then we can proceed to respond
89-
// with an accept message.
90-
var sig [64]byte
87+
// TODO(ffranr): Ensure that the expiry time is valid and sufficient.
88+
89+
// If the asking price is not nil, then we can proceed to respond with
90+
// an accept message.
9191
acceptMsg := rfqmsg.NewAcceptFromRequest(
92-
request, *oracleResponse.AskingPrice, oracleResponse.Expiry, sig,
92+
request, *oracleResponse.AskingPrice, oracleResponse.Expiry,
9393
)
9494
var msg rfqmsg.OutgoingMsg = &acceptMsg
9595

9696
sendSuccess := fn.SendOrQuit(n.outgoingMessages, msg, n.Quit)
9797
if !sendSuccess {
98-
return fmt.Errorf("negotiator failed to populate accept message " +
99-
"channel")
98+
return fmt.Errorf("negotiator failed to add accept message " +
99+
"to the outgoing messages channel")
100100
}
101101

102102
return nil
103103
}
104104

105105
// HandleIncomingQuoteRequest handles an incoming quote request.
106106
func (n *Negotiator) HandleIncomingQuoteRequest(req rfqmsg.Request) error {
107-
// If there is no price oracle available, then we cannot proceed with the
108-
// negotiation. We will reject the quote request with an error.
107+
// If there is no price oracle available, then we cannot proceed with
108+
// the negotiation. We will reject the quote request with an error.
109109
if n.cfg.PriceOracle == nil {
110110
rejectMsg := rfqmsg.NewRejectMsg(
111111
req.Peer, req.ID, rfqmsg.ErrPriceOracleUnavailable,
@@ -114,7 +114,8 @@ func (n *Negotiator) HandleIncomingQuoteRequest(req rfqmsg.Request) error {
114114

115115
sendSuccess := fn.SendOrQuit(n.outgoingMessages, msg, n.Quit)
116116
if !sendSuccess {
117-
return fmt.Errorf("negotiator failed to send reject message")
117+
return fmt.Errorf("negotiator failed to send reject " +
118+
"message")
118119
}
119120
}
120121

@@ -174,6 +175,7 @@ func (n *Negotiator) Start() error {
174175
func (n *Negotiator) Stop() error {
175176
log.Info("Stopping RFQ subsystem: quote negotiator")
176177

178+
// Stop the main event loop.
177179
close(n.Quit)
178180
return nil
179181
}

rfqservice/oracle.go

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -14,26 +14,17 @@ import (
1414
"google.golang.org/grpc/credentials"
1515
)
1616

17-
// PriceOracleSuggestedRate is a struct that holds the price oracle's suggested
18-
// exchange rate for an asset.
19-
type PriceOracleSuggestedRate struct {
20-
// AssetID is the asset ID.
21-
AssetID *asset.ID
22-
23-
// AssetGroupKey is the asset group key.
24-
AssetGroupKey *btcec.PublicKey
25-
26-
// AssetAmount is the asset amount.
27-
AssetAmount uint64
28-
17+
// OracleAskingPriceResponse is a struct that holds the price oracle's suggested
18+
// asking price for an asset.
19+
type OracleAskingPriceResponse struct {
2920
// AskingPrice is the asking price of the quote.
3021
AskingPrice *lnwire.MilliSatoshi
3122

3223
// Expiry is the asking price expiry lifetime unix timestamp.
3324
Expiry uint64
3425

3526
// Err is the error returned by the price oracle service.
36-
Err rfqmsg.RejectErr
27+
Err *rfqmsg.RejectErr
3728
}
3829

3930
// PriceOracle is an interface that provides exchange rate information for
@@ -42,7 +33,7 @@ type PriceOracle interface {
4233
// QueryAskingPrice returns the asking price for the given asset amount.
4334
QueryAskingPrice(ctx context.Context, assetId *asset.ID,
4435
assetGroupKey *btcec.PublicKey, assetAmount uint64,
45-
bidPrice *lnwire.MilliSatoshi) (*PriceOracleSuggestedRate,
36+
bidPrice *lnwire.MilliSatoshi) (*OracleAskingPriceResponse,
4637
error)
4738
}
4839

@@ -85,7 +76,7 @@ func NewRpcPriceOracle(addr url.URL) (*RpcPriceOracle, error) {
8576
// QueryAskingPrice returns the asking price for the given asset amount.
8677
func (r *RpcPriceOracle) QueryAskingPrice(ctx context.Context,
8778
assetId *asset.ID, assetGroupKey *btcec.PublicKey, assetAmount uint64,
88-
bidPrice *lnwire.MilliSatoshi) (*PriceOracleSuggestedRate, error) {
79+
bidPrice *lnwire.MilliSatoshi) (*OracleAskingPriceResponse, error) {
8980

9081
//// Call the external oracle service to get the exchange rate.
9182
//conn := getClientConn(ctx, false)
@@ -111,18 +102,15 @@ func NewMockPriceOracle(rateLifetime uint64) *MockPriceOracle {
111102

112103
// QueryAskingPrice returns the asking price for the given asset amount.
113104
func (m *MockPriceOracle) QueryAskingPrice(_ context.Context,
114-
assetId *asset.ID, assetGroupKey *btcec.PublicKey, assetAmt uint64,
115-
bidPrice *lnwire.MilliSatoshi) (*PriceOracleSuggestedRate, error) {
105+
_ *asset.ID, _ *btcec.PublicKey, _ uint64,
106+
bidPrice *lnwire.MilliSatoshi) (*OracleAskingPriceResponse, error) {
116107

117108
// Calculate the rate expiry lifetime.
118109
expiry := uint64(time.Now().Unix()) + m.rateLifetime
119110

120-
return &PriceOracleSuggestedRate{
121-
AssetID: assetId,
122-
AssetGroupKey: assetGroupKey,
123-
AssetAmount: assetAmt,
124-
AskingPrice: bidPrice,
125-
Expiry: expiry,
111+
return &OracleAskingPriceResponse{
112+
AskingPrice: bidPrice,
113+
Expiry: expiry,
126114
}, nil
127115
}
128116

rfqservice/order.go

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@ type SerialisedScid uint64
1818
// ChannelRemit is a struct that holds the terms which determine whether a
1919
// channel HTLC is accepted or rejected.
2020
type ChannelRemit struct {
21-
// Scid is the serialised short channel ID (SCID) of the channel for
21+
// Scid is the serialised short channel ID (SCID) of the channel to
2222
// which the remit applies.
2323
Scid SerialisedScid
2424

2525
// AssetAmount is the amount of the tap asset that is being requested.
2626
AssetAmount uint64
2727

28-
// MinimumChannelPayment is the minimum number of millisatoshis that must be
29-
// sent in the HTLC.
28+
// MinimumChannelPayment is the minimum number of millisatoshis that
29+
// must be sent in the HTLC.
3030
MinimumChannelPayment lnwire.MilliSatoshi
3131

3232
// Expiry is the asking price expiry lifetime unix timestamp.
@@ -51,23 +51,25 @@ func NewChannelRemit(quoteAccept rfqmsg.Accept) (*ChannelRemit, error) {
5151
func (c *ChannelRemit) CheckHtlcCompliance(
5252
htlc lndclient.InterceptedHtlc) error {
5353

54+
// Check that the channel SCID is as expected.
55+
htlcScid := SerialisedScid(htlc.OutgoingChannelID.ToUint64())
56+
if htlcScid != c.Scid {
57+
return fmt.Errorf("htlc outgoing channel ID does not match "+
58+
"remit's SCID (htlc_scid=%d, remit_scid=%d)", htlcScid,
59+
c.Scid)
60+
}
61+
5462
// Check that the HTLC amount is at least the minimum acceptable amount.
5563
if htlc.AmountOutMsat <= c.MinimumChannelPayment {
5664
return fmt.Errorf("htlc out amount is less than the remit's "+
5765
"minimum (htlc_out_msat=%d, remit_min_msat=%d)",
5866
htlc.AmountOutMsat, c.MinimumChannelPayment)
5967
}
6068

61-
// Check that the channel SCID is as expected.
62-
htlcScid := SerialisedScid(htlc.OutgoingChannelID.ToUint64())
63-
if htlcScid != c.Scid {
64-
return fmt.Errorf("htlc outgoing channel ID does not match "+
65-
"remit's SCID (htlc_scid=%d, remit_scid=%d)", htlcScid, c.Scid)
66-
}
67-
6869
// Lastly, check to ensure that the channel remit has not expired.
6970
if time.Now().Unix() > int64(c.Expiry) {
70-
return fmt.Errorf("channel remit has expired (expiry=%d)", c.Expiry)
71+
return fmt.Errorf("channel remit has expired (expiry=%d)",
72+
c.Expiry)
7173
}
7274

7375
return nil
@@ -76,19 +78,18 @@ func (c *ChannelRemit) CheckHtlcCompliance(
7678
// OrderHandlerCfg is a struct that holds the configuration parameters for the
7779
// order handler service.
7880
type OrderHandlerCfg struct {
79-
// CleanupInterval is the interval at which the order
80-
// handler cleans up expired accepted quotes from its local cache.
81+
// CleanupInterval is the interval at which the order handler cleans up
82+
// expired accepted quotes from its local cache.
8183
CleanupInterval time.Duration
8284

8385
// HtlcInterceptor is the HTLC interceptor. This component is used to
8486
// intercept and accept/reject HTLCs.
8587
HtlcInterceptor HtlcInterceptor
8688
}
8789

88-
// OrderHandler orchestrates management of accepted RFQ (Request For Quote)
89-
// bundles. It monitors HTLCs (Hash Time Locked Contracts), determining
90-
// acceptance or rejection based on compliance with the terms of the associated
91-
// quote.
90+
// OrderHandler orchestrates management of accepted quote bundles. It monitors
91+
// HTLCs (Hash Time Locked Contracts), and determines acceptance/rejection based
92+
// on the terms of the associated accepted quote.
9293
type OrderHandler struct {
9394
startOnce sync.Once
9495
stopOnce sync.Once
@@ -97,7 +98,7 @@ type OrderHandler struct {
9798
cfg OrderHandlerCfg
9899

99100
// channelRemits is a map of serialised short channel IDs (SCIDs) to
100-
// associated active channel quote remits.
101+
// associated active channel remits.
101102
channelRemits map[SerialisedScid]ChannelRemit
102103

103104
// channelRemitsMtx guards the channelRemits map.
@@ -241,7 +242,7 @@ func (h *OrderHandler) RegisterChannelRemit(quoteAccept rfqmsg.Accept) error {
241242

242243
// FetchChannelRemit fetches a channel remit given a serialised SCID. If a
243244
// channel remit is not found, false is returned. Expired channel remits are
244-
// also not returned and are removed from the cache.
245+
// not returned and are removed from the cache.
245246
func (h *OrderHandler) FetchChannelRemit(scid SerialisedScid) (*ChannelRemit,
246247
bool) {
247248

@@ -278,15 +279,16 @@ func (h *OrderHandler) cleanupStaleChannelRemits() {
278279
}
279280

280281
if staleCounter > 0 {
281-
log.Tracef("Removed %d stale channel remits from the order "+
282-
"handler", staleCounter)
282+
log.Tracef("Removed stale channel remits from the order "+
283+
"handler: (count=%d)", staleCounter)
283284
}
284285
}
285286

286287
// Stop stops the handler.
287288
func (h *OrderHandler) Stop() error {
288289
log.Info("Stopping RFQ subsystem: order handler")
289290

291+
// Stop the main event loop.
290292
close(h.Quit)
291293
return nil
292294
}

rfqservice/rfq.go

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,9 @@ func (m *Manager) Stop() error {
183183
m.stopOnce.Do(func() {
184184
log.Info("Stopping RFQ system")
185185
stopErr = m.stopSubsystems()
186+
187+
// Stop the main event loop.
188+
close(m.Quit)
186189
})
187190

188191
return stopErr
@@ -221,8 +224,8 @@ func (m *Manager) handleIncomingMessage(incomingMsg rfqmsg.IncomingMsg) error {
221224
case *rfqmsg.Request:
222225
err := m.negotiator.HandleIncomingQuoteRequest(*msg)
223226
if err != nil {
224-
return fmt.Errorf("error handling incoming quote request: %w",
225-
err)
227+
return fmt.Errorf("error handling incoming quote "+
228+
"request: %w", err)
226229
}
227230
}
228231

@@ -234,11 +237,12 @@ func (m *Manager) handleOutgoingMessage(outgoingMsg rfqmsg.OutgoingMsg) error {
234237
// Perform type specific handling of the outgoing message.
235238
switch msg := outgoingMsg.(type) {
236239
case *rfqmsg.Accept:
237-
// Inform the HTLC order handler that we've accepted the quote request.
238-
// TODO(ffranr): set the asset amount correctly
240+
// Inform the HTLC order handler that we've accepted the quote
241+
// request.
239242
err := m.orderHandler.RegisterChannelRemit(*msg)
240243
if err != nil {
241-
return fmt.Errorf("error registering channel remit: %w", err)
244+
return fmt.Errorf("error registering channel remit: %w",
245+
err)
242246
}
243247
}
244248

@@ -258,26 +262,29 @@ func (m *Manager) mainEventLoop() {
258262
select {
259263
// Handle incoming message.
260264
case incomingMsg := <-m.incomingMessages:
261-
log.Debugf("RFQ manager has received an incoming message")
265+
log.Debugf("RFQ manager has received an incoming " +
266+
"message")
262267

263268
err := m.handleIncomingMessage(incomingMsg)
264269
if err != nil {
265-
log.Warnf("Error handling incoming quote "+
266-
"request: %v", err)
270+
log.Warnf("Error handling incoming message: %v",
271+
err)
267272
}
268273

269274
// Handle outgoing message.
270275
case outgoingMsg := <-m.outgoingMessages:
271-
log.Debugf("RFQ manager has received an outgoing message.")
276+
log.Debugf("RFQ manager has received an outgoing " +
277+
"message.")
272278

273279
err := m.handleOutgoingMessage(outgoingMsg)
274280
if err != nil {
275-
log.Warnf("Error handling outgoing message: %v", err)
281+
log.Warnf("Error handling outgoing message: %v",
282+
err)
276283
}
277284

278285
case <-m.Quit:
279-
log.Debug("RFQ manager main event loop has received the " +
280-
"shutdown signal")
286+
log.Debug("RFQ manager main event loop has received " +
287+
"the shutdown signal")
281288
return
282289
}
283290
}

0 commit comments

Comments
 (0)