Skip to content

Commit ba5fc1b

Browse files
committed
rpc: add RPC endpoints for the RFQ system
This commit adds the RFQ RPC server. It also adds four RPC endpoints: 1. Upsert buy order. 2. Upsert sell offer. 3. List accepted quotes. 4. Subscribe to RFQ events.
1 parent fa70fb4 commit ba5fc1b

10 files changed

+3119
-2
lines changed

perms/perms.go

+16
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,22 @@ var (
212212
Entity: "universe",
213213
Action: "read",
214214
}},
215+
"/rfqrpc.Rfq/AddAssetBuyOrder": {{
216+
Entity: "rfq",
217+
Action: "write",
218+
}},
219+
"/rfqrpc.Rfq/AddAssetSellOffer": {{
220+
Entity: "rfq",
221+
Action: "write",
222+
}},
223+
"/rfqrpc.Rfq/QueryRfqAcceptedQuotes": {{
224+
Entity: "rfq",
225+
Action: "read",
226+
}},
227+
"/rfqrpc.Rfq/SubscribeRfqEventNtfns": {{
228+
Entity: "rfq",
229+
Action: "write",
230+
}},
215231
"/tapdevrpc.TapDev/ImportProof": {{
216232
Entity: "proofs",
217233
Action: "write",

rpcserver.go

+318
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,16 @@ import (
2828
"github.com/lightninglabs/taproot-assets/fn"
2929
"github.com/lightninglabs/taproot-assets/mssmt"
3030
"github.com/lightninglabs/taproot-assets/proof"
31+
"github.com/lightninglabs/taproot-assets/rfq"
32+
"github.com/lightninglabs/taproot-assets/rfqmsg"
3133
"github.com/lightninglabs/taproot-assets/rpcperms"
3234
"github.com/lightninglabs/taproot-assets/tapfreighter"
3335
"github.com/lightninglabs/taproot-assets/tapgarden"
3436
"github.com/lightninglabs/taproot-assets/tappsbt"
3537
"github.com/lightninglabs/taproot-assets/taprpc"
3638
wrpc "github.com/lightninglabs/taproot-assets/taprpc/assetwalletrpc"
3739
"github.com/lightninglabs/taproot-assets/taprpc/mintrpc"
40+
"github.com/lightninglabs/taproot-assets/taprpc/rfqrpc"
3841
"github.com/lightninglabs/taproot-assets/taprpc/tapdevrpc"
3942
unirpc "github.com/lightninglabs/taproot-assets/taprpc/universerpc"
4043
"github.com/lightninglabs/taproot-assets/tapscript"
@@ -43,6 +46,8 @@ import (
4346
"github.com/lightningnetwork/lnd/keychain"
4447
"github.com/lightningnetwork/lnd/lnrpc"
4548
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
49+
"github.com/lightningnetwork/lnd/lnwire"
50+
"github.com/lightningnetwork/lnd/routing/route"
4651
"github.com/lightningnetwork/lnd/signal"
4752
"golang.org/x/time/rate"
4853
"google.golang.org/grpc"
@@ -104,6 +109,7 @@ type rpcServer struct {
104109
taprpc.UnimplementedTaprootAssetsServer
105110
wrpc.UnimplementedAssetWalletServer
106111
mintrpc.UnimplementedMintServer
112+
rfqrpc.UnimplementedRfqServer
107113
unirpc.UnimplementedUniverseServer
108114
tapdevrpc.UnimplementedTapDevServer
109115

@@ -176,6 +182,7 @@ func (r *rpcServer) RegisterWithGrpcServer(grpcServer *grpc.Server) error {
176182
taprpc.RegisterTaprootAssetsServer(grpcServer, r)
177183
wrpc.RegisterAssetWalletServer(grpcServer, r)
178184
mintrpc.RegisterMintServer(grpcServer, r)
185+
rfqrpc.RegisterRfqServer(grpcServer, r)
179186
unirpc.RegisterUniverseServer(grpcServer, r)
180187
tapdevrpc.RegisterGrpcServer(grpcServer, r)
181188
return nil
@@ -4749,3 +4756,314 @@ func MarshalAssetFedSyncCfg(
47494756
AllowSyncExport: config.AllowSyncExport,
47504757
}, nil
47514758
}
4759+
4760+
// unmarshalAssetSpecifier unmarshals an asset specifier from the RPC form.
4761+
func unmarshalAssetSpecifier(req *rfqrpc.AssetSpecifier) (*asset.ID,
4762+
*btcec.PublicKey, error) {
4763+
4764+
// Attempt to decode the asset specifier from the RPC request. In cases
4765+
// where both the asset ID and asset group key are provided, we will
4766+
// give precedence to the asset ID due to its higher level of
4767+
// specificity.
4768+
var (
4769+
assetID *asset.ID
4770+
4771+
groupKeyBytes []byte
4772+
groupKey *btcec.PublicKey
4773+
4774+
err error
4775+
)
4776+
4777+
switch {
4778+
// Parse the asset ID if it's set.
4779+
case len(req.GetAssetId()) > 0:
4780+
var assetIdBytes [32]byte
4781+
copy(assetIdBytes[:], req.GetAssetId())
4782+
id := asset.ID(assetIdBytes)
4783+
assetID = &id
4784+
4785+
case len(req.GetAssetIdStr()) > 0:
4786+
assetIDBytes, err := hex.DecodeString(req.GetAssetIdStr())
4787+
if err != nil {
4788+
return nil, nil, fmt.Errorf("error decoding asset "+
4789+
"ID: %w", err)
4790+
}
4791+
4792+
var id asset.ID
4793+
copy(id[:], assetIDBytes)
4794+
assetID = &id
4795+
4796+
// Parse the group key if it's set.
4797+
case len(req.GetGroupKey()) > 0:
4798+
groupKeyBytes = req.GetGroupKey()
4799+
groupKey, err = btcec.ParsePubKey(groupKeyBytes)
4800+
if err != nil {
4801+
return nil, nil, fmt.Errorf("error parsing group "+
4802+
"key: %w", err)
4803+
}
4804+
4805+
case len(req.GetGroupKeyStr()) > 0:
4806+
groupKeyBytes, err := hex.DecodeString(
4807+
req.GetGroupKeyStr(),
4808+
)
4809+
if err != nil {
4810+
return nil, nil, fmt.Errorf("error decoding group "+
4811+
"key: %w", err)
4812+
}
4813+
4814+
groupKey, err = btcec.ParsePubKey(groupKeyBytes)
4815+
if err != nil {
4816+
return nil, nil, fmt.Errorf("error parsing group "+
4817+
"key: %w", err)
4818+
}
4819+
4820+
default:
4821+
// At this point, we know that neither the asset ID nor the
4822+
// group key are specified. Return an error.
4823+
return nil, nil, fmt.Errorf("either asset ID or asset group " +
4824+
"key must be specified")
4825+
}
4826+
4827+
return assetID, groupKey, nil
4828+
}
4829+
4830+
// unmarshalAssetBuyOrder unmarshals an asset buy order from the RPC form.
4831+
func unmarshalAssetBuyOrder(
4832+
req *rfqrpc.AddAssetBuyOrderRequest) (*rfq.BuyOrder, error) {
4833+
4834+
assetId, assetGroupKey, err := unmarshalAssetSpecifier(
4835+
req.AssetSpecifier,
4836+
)
4837+
if err != nil {
4838+
return nil, fmt.Errorf("error unmarshalling asset specifier: "+
4839+
"%w", err)
4840+
}
4841+
4842+
// Unmarshal the peer if specified.
4843+
var peer *route.Vertex
4844+
if len(req.PeerPubKey) > 0 {
4845+
pv, err := route.NewVertexFromBytes(req.PeerPubKey)
4846+
if err != nil {
4847+
return nil, fmt.Errorf("error unmarshalling peer "+
4848+
"route vertex: %w", err)
4849+
}
4850+
4851+
peer = &pv
4852+
}
4853+
4854+
return &rfq.BuyOrder{
4855+
AssetID: assetId,
4856+
AssetGroupKey: assetGroupKey,
4857+
MinAssetAmount: req.MinAssetAmount,
4858+
MaxBid: lnwire.MilliSatoshi(req.MaxBid),
4859+
Expiry: req.Expiry,
4860+
Peer: peer,
4861+
}, nil
4862+
}
4863+
4864+
// AddAssetBuyOrder upserts a new buy order for the given asset into the RFQ
4865+
// manager. If the order already exists for the given asset, it will be updated.
4866+
func (r *rpcServer) AddAssetBuyOrder(_ context.Context,
4867+
req *rfqrpc.AddAssetBuyOrderRequest) (*rfqrpc.AddAssetBuyOrderResponse,
4868+
error) {
4869+
4870+
// Unmarshal the buy order from the RPC form.
4871+
buyOrder, err := unmarshalAssetBuyOrder(req)
4872+
if err != nil {
4873+
return nil, fmt.Errorf("error unmarshalling buy order: %w", err)
4874+
}
4875+
4876+
var peer string
4877+
if buyOrder.Peer != nil {
4878+
peer = buyOrder.Peer.String()
4879+
}
4880+
rpcsLog.Debugf("[AddAssetBuyOrder]: upserting buy order "+
4881+
"(dest_peer=%s)", peer)
4882+
4883+
// Upsert the buy order into the RFQ manager.
4884+
err = r.cfg.RfqManager.UpsertAssetBuyOrder(*buyOrder)
4885+
if err != nil {
4886+
return nil, fmt.Errorf("error upserting buy order into RFQ "+
4887+
"manager: %w", err)
4888+
}
4889+
4890+
return &rfqrpc.AddAssetBuyOrderResponse{}, nil
4891+
}
4892+
4893+
// AddAssetSellOffer upserts a new sell offer for the given asset into the
4894+
// RFQ manager. If the offer already exists for the given asset, it will be
4895+
// updated.
4896+
func (r *rpcServer) AddAssetSellOffer(_ context.Context,
4897+
req *rfqrpc.AddAssetSellOfferRequest) (*rfqrpc.AddAssetSellOfferResponse,
4898+
error) {
4899+
4900+
// Unmarshal the sell offer from the RPC form.
4901+
assetID, assetGroupKey, err := unmarshalAssetSpecifier(
4902+
req.AssetSpecifier,
4903+
)
4904+
if err != nil {
4905+
return nil, fmt.Errorf("error unmarshalling asset specifier: "+
4906+
"%w", err)
4907+
}
4908+
4909+
sellOffer := &rfq.SellOffer{
4910+
AssetID: assetID,
4911+
AssetGroupKey: assetGroupKey,
4912+
MaxUnits: req.MaxUnits,
4913+
}
4914+
4915+
rpcsLog.Debugf("[AddAssetSellOffer]: upserting sell offer "+
4916+
"(sell_offer=%v)", sellOffer)
4917+
4918+
// Upsert the sell offer into the RFQ manager.
4919+
err = r.cfg.RfqManager.UpsertAssetSellOffer(*sellOffer)
4920+
if err != nil {
4921+
return nil, fmt.Errorf("error upserting sell offer into RFQ "+
4922+
"manager: %w", err)
4923+
}
4924+
4925+
return &rfqrpc.AddAssetSellOfferResponse{}, nil
4926+
}
4927+
4928+
// marshalAcceptedQuotes marshals a map of accepted quotes into the RPC form.
4929+
func marshalAcceptedQuotes(
4930+
acceptedQuotes map[rfq.SerialisedScid]rfqmsg.Accept) []*rfqrpc.AcceptedQuote {
4931+
4932+
// Marshal the accepted quotes into the RPC form.
4933+
rpcQuotes := make([]*rfqrpc.AcceptedQuote, 0, len(acceptedQuotes))
4934+
for scid, quote := range acceptedQuotes {
4935+
rpcQuote := &rfqrpc.AcceptedQuote{
4936+
Peer: quote.Peer.String(),
4937+
Id: quote.ID[:],
4938+
Scid: uint64(scid),
4939+
AssetAmount: quote.AssetAmount,
4940+
AskPrice: uint64(quote.AskPrice),
4941+
Expiry: quote.Expiry,
4942+
}
4943+
rpcQuotes = append(rpcQuotes, rpcQuote)
4944+
}
4945+
4946+
return rpcQuotes
4947+
}
4948+
4949+
// QueryRfqAcceptedQuotes queries for accepted quotes from the RFQ system.
4950+
func (r *rpcServer) QueryRfqAcceptedQuotes(_ context.Context,
4951+
_ *rfqrpc.QueryRfqAcceptedQuotesRequest) (
4952+
*rfqrpc.QueryRfqAcceptedQuotesResponse, error) {
4953+
4954+
// Query the RFQ manager for accepted quotes.
4955+
acceptedQuotes := r.cfg.RfqManager.QueryAcceptedQuotes()
4956+
4957+
rpcQuotes := marshalAcceptedQuotes(acceptedQuotes)
4958+
4959+
return &rfqrpc.QueryRfqAcceptedQuotesResponse{
4960+
AcceptedQuotes: rpcQuotes,
4961+
}, nil
4962+
}
4963+
4964+
// marshallRfqEvent marshals an RFQ event into the RPC form.
4965+
func marshallRfqEvent(eventInterface fn.Event) (*rfqrpc.RfqEvent, error) {
4966+
timestamp := eventInterface.Timestamp().UTC().Unix()
4967+
4968+
switch event := eventInterface.(type) {
4969+
case *rfq.IncomingAcceptQuoteEvent:
4970+
acceptedQuote := &rfqrpc.AcceptedQuote{
4971+
Peer: event.Peer.String(),
4972+
Id: event.ID[:],
4973+
Scid: uint64(event.ShortChannelId()),
4974+
AssetAmount: event.AssetAmount,
4975+
AskPrice: uint64(event.AskPrice),
4976+
Expiry: event.Expiry,
4977+
}
4978+
4979+
eventRpc := &rfqrpc.RfqEvent_IncomingAcceptQuote{
4980+
IncomingAcceptQuote: &rfqrpc.IncomingAcceptQuoteEvent{
4981+
Timestamp: uint64(timestamp),
4982+
AcceptedQuote: acceptedQuote,
4983+
},
4984+
}
4985+
return &rfqrpc.RfqEvent{
4986+
Event: eventRpc,
4987+
}, nil
4988+
4989+
case *rfq.AcceptHtlcEvent:
4990+
eventRpc := &rfqrpc.RfqEvent_AcceptHtlc{
4991+
AcceptHtlc: &rfqrpc.AcceptHtlcEvent{
4992+
Timestamp: uint64(timestamp),
4993+
Scid: uint64(event.ChannelRemit.Scid),
4994+
},
4995+
}
4996+
return &rfqrpc.RfqEvent{
4997+
Event: eventRpc,
4998+
}, nil
4999+
5000+
default:
5001+
return nil, fmt.Errorf("unknown RFQ event type: %T",
5002+
eventInterface)
5003+
}
5004+
}
5005+
5006+
// SubscribeRfqEventNtfns subscribes to RFQ event notifications.
5007+
func (r *rpcServer) SubscribeRfqEventNtfns(
5008+
_ *rfqrpc.SubscribeRfqEventNtfnsRequest,
5009+
ntfnStream rfqrpc.Rfq_SubscribeRfqEventNtfnsServer) error {
5010+
5011+
// Create a new event subscriber and pass a copy to the RFQ manager.
5012+
// We will then read events from the subscriber.
5013+
eventSubscriber := fn.NewEventReceiver[fn.Event](fn.DefaultQueueSize)
5014+
defer eventSubscriber.Stop()
5015+
5016+
// Register the subscriber with the ChainPorter.
5017+
err := r.cfg.RfqManager.RegisterSubscriber(
5018+
eventSubscriber, false, 0,
5019+
)
5020+
if err != nil {
5021+
return fmt.Errorf("failed to register RFQ manager event "+
5022+
"notifications subscription: %w", err)
5023+
}
5024+
5025+
for {
5026+
select {
5027+
// Handle new events from the subscriber.
5028+
case event := <-eventSubscriber.NewItemCreated.ChanOut():
5029+
// Marshal the event into its RPC form.
5030+
rpcEvent, err := marshallRfqEvent(event)
5031+
if err != nil {
5032+
return fmt.Errorf("failed to marshall RFQ "+
5033+
"event into RPC form: %w", err)
5034+
}
5035+
5036+
err = ntfnStream.Send(rpcEvent)
5037+
if err != nil {
5038+
return err
5039+
}
5040+
5041+
// Handle the case where the RPC stream is closed by the client.
5042+
case <-ntfnStream.Context().Done():
5043+
// Remove the subscriber from the RFQ manager.
5044+
err := r.cfg.RfqManager.RemoveSubscriber(
5045+
eventSubscriber,
5046+
)
5047+
if err != nil {
5048+
return fmt.Errorf("failed to remove RFQ "+
5049+
"manager event notifications "+
5050+
"subscription: %w", err)
5051+
}
5052+
5053+
// Don't return an error if a normal context
5054+
// cancellation has occurred.
5055+
isCanceledContext := errors.Is(
5056+
ntfnStream.Context().Err(), context.Canceled,
5057+
)
5058+
if isCanceledContext {
5059+
return nil
5060+
}
5061+
5062+
return ntfnStream.Context().Err()
5063+
5064+
// Handle the case where the RPC server is shutting down.
5065+
case <-r.quit:
5066+
return nil
5067+
}
5068+
}
5069+
}

taprpc/gen_protos.sh

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ set -e
66
function generate() {
77
echo "Generating root gRPC server protos"
88

9-
PROTOS="taprootassets.proto assetwalletrpc/assetwallet.proto mintrpc/mint.proto universerpc/universe.proto tapdevrpc/tapdev.proto"
9+
PROTOS="taprootassets.proto assetwalletrpc/assetwallet.proto mintrpc/mint.proto rfqrpc/rfq.proto universerpc/universe.proto tapdevrpc/tapdev.proto"
1010

1111
# For each of the sub-servers, we then generate their protos, but a restricted
1212
# set as they don't yet require REST proxies, or swagger docs.
@@ -48,7 +48,7 @@ function generate() {
4848
--custom_opt="$opts" \
4949
taprootassets.proto
5050

51-
PACKAGES="assetwalletrpc universerpc mintrpc"
51+
PACKAGES="assetwalletrpc universerpc mintrpc rfqrpc"
5252
for package in $PACKAGES; do
5353

5454
opts="package_name=$package,manual_import=$manual_import,js_stubs=1"

0 commit comments

Comments
 (0)