Skip to content

Commit 88ebebd

Browse files
committed
WIP: add asset group key field to QuoteRequest
1 parent c44a9cd commit 88ebebd

File tree

7 files changed

+421
-0
lines changed

7 files changed

+421
-0
lines changed

chain_bridge.go

+9
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,15 @@ func (l *LndRpcChainBridge) EstimateFee(ctx context.Context,
200200
return l.lnd.WalletKit.EstimateFeeRate(ctx, int32(confTarget))
201201
}
202202

203+
// SubscribeCustomMessages creates a subscription to custom messages received
204+
// from our peers.
205+
func (l *LndRpcChainBridge) SubscribeCustomMessages(
206+
ctx context.Context) (<-chan lndclient.CustomMessage,
207+
<-chan error, error) {
208+
209+
return l.lnd.Client.SubscribeCustomMessages(ctx)
210+
}
211+
203212
// A compile time assertion to ensure LndRpcChainBridge meets the
204213
// tapgarden.ChainBridge interface.
205214
var _ tapgarden.ChainBridge = (*LndRpcChainBridge)(nil)

config.go

+3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/lightninglabs/taproot-assets/address"
1111
"github.com/lightninglabs/taproot-assets/monitoring"
1212
"github.com/lightninglabs/taproot-assets/proof"
13+
"github.com/lightninglabs/taproot-assets/rfq"
1314
"github.com/lightninglabs/taproot-assets/tapdb"
1415
"github.com/lightninglabs/taproot-assets/tapfreighter"
1516
"github.com/lightninglabs/taproot-assets/tapgarden"
@@ -122,6 +123,8 @@ type Config struct {
122123

123124
UniverseFederation *universe.FederationEnvoy
124125

126+
RfqManager *rfq.Manager
127+
125128
UniverseStats universe.Telemetry
126129

127130
// UniversePublicAccess is flag which, If true, and the Universe server

rfq/log.go

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package rfq
2+
3+
import (
4+
"github.com/btcsuite/btclog"
5+
)
6+
7+
// Subsystem defines the logging code for this subsystem.
8+
const Subsystem = "RFQ"
9+
10+
// log is a logger that is initialized with no output filters. This
11+
// means the package will not perform any logging by default until the caller
12+
// requests it.
13+
var log = btclog.Disabled
14+
15+
// DisableLog disables all library log output. Logging output is disabled
16+
// by default until UseLogger is called.
17+
func DisableLog() {
18+
UseLogger(btclog.Disabled)
19+
}
20+
21+
// UseLogger uses a specified Logger to output package logging info.
22+
// This should be used in preference to SetLogWriter if the caller is also
23+
// using btclog.
24+
func UseLogger(logger btclog.Logger) {
25+
log = logger
26+
}

rfq/rfq.go

+124
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package rfq
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
8+
"github.com/lightninglabs/taproot-assets/fn"
9+
)
10+
11+
type ManagerCfg struct {
12+
PeerMessagePorter PeerMessagePorter
13+
}
14+
15+
// Manager is a struct that handles RFQ order management.
16+
type Manager struct {
17+
startOnce sync.Once
18+
stopOnce sync.Once
19+
20+
cfg ManagerCfg
21+
22+
rfqStreamHandle *StreamHandler
23+
24+
// ContextGuard provides a wait group and main quit channel that can be
25+
// used to create guarded contexts.
26+
*fn.ContextGuard
27+
}
28+
29+
func NewManager(cfg ManagerCfg) (Manager, error) {
30+
return Manager{
31+
cfg: cfg,
32+
}, nil
33+
}
34+
35+
// Start attempts to start a new RFQ manager.
36+
func (m *Manager) Start() error {
37+
var startErr error
38+
m.startOnce.Do(func() {
39+
ctx, cancel := m.WithCtxQuitNoTimeout()
40+
defer cancel()
41+
42+
log.Info("Initializing RFQ subsystems")
43+
err := m.initSubsystems(ctx)
44+
if err != nil {
45+
startErr = err
46+
return
47+
}
48+
49+
// Start the manager's main event loop in a separate goroutine.
50+
m.Wg.Add(1)
51+
go func() {
52+
defer m.Wg.Done()
53+
54+
log.Info("Starting RFQ manager main event loop")
55+
err = m.mainEventLoop()
56+
if err != nil {
57+
startErr = err
58+
return
59+
}
60+
}()
61+
})
62+
return startErr
63+
}
64+
65+
func (m *Manager) Stop() error {
66+
var stopErr error
67+
68+
m.stopOnce.Do(func() {
69+
log.Info("Stopping RFQ manager")
70+
71+
err := m.rfqStreamHandle.Stop()
72+
if err != nil {
73+
stopErr = fmt.Errorf("error stopping RFQ stream "+
74+
"handler: %w", err)
75+
return
76+
}
77+
})
78+
79+
return stopErr
80+
}
81+
82+
func (m *Manager) initSubsystems(ctx context.Context) error {
83+
var err error
84+
85+
// Initialise the RFQ raw message stream handler and start it in a
86+
// separate goroutine.
87+
m.rfqStreamHandle, err = NewStreamHandler(ctx, m.cfg.PeerMessagePorter)
88+
if err != nil {
89+
return fmt.Errorf("failed to create RFQ stream handler: %w",
90+
err)
91+
}
92+
93+
m.Wg.Add(1)
94+
go func() {
95+
defer m.Wg.Done()
96+
97+
log.Info("Starting RFQ stream handler")
98+
err = m.rfqStreamHandle.Start()
99+
if err != nil {
100+
return
101+
}
102+
}()
103+
104+
return nil
105+
}
106+
107+
func (m *Manager) mainEventLoop() error {
108+
for {
109+
select {
110+
// Handle RFQ message stream events.
111+
case quoteReq := <-m.rfqStreamHandle.IncomingQuoteRequests.NewItemCreated.ChanOut():
112+
log.Debugf("Received RFQ quote request from message "+
113+
"stream handler: %v", quoteReq)
114+
// TODO(ffranr): send to negotiator (+ price oracle)
115+
116+
case errStream := <-m.rfqStreamHandle.ErrChan:
117+
return fmt.Errorf("error received from RFQ stream "+
118+
"handler: %w", errStream)
119+
120+
case <-m.Quit:
121+
return nil
122+
}
123+
}
124+
}

0 commit comments

Comments
 (0)