Skip to content

Commit 5df53cd

Browse files
committed
notifications: add notification manager
This commit adds a generic notification manager that can be used to subscribe to different types of notifications.
1 parent 6408ec8 commit 5df53cd

File tree

3 files changed

+411
-0
lines changed

3 files changed

+411
-0
lines changed

notifications/log.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package notifications
2+
3+
import (
4+
"github.com/btcsuite/btclog"
5+
"github.com/lightningnetwork/lnd/build"
6+
)
7+
8+
// Subsystem defines the sub system name of this package.
9+
const Subsystem = "NTFNS"
10+
11+
// log is a logger that is initialized with no output filters. This
12+
// means the package will not perform any logging by default until the caller
13+
// requests it.
14+
var log btclog.Logger
15+
16+
// The default amount of logging is none.
17+
func init() {
18+
UseLogger(build.NewSubLogger(Subsystem, nil))
19+
}
20+
21+
// UseLogger uses a specified Logger to output package logging info.
22+
// This should be used in preference to SetLogWriter if the caller is also
23+
// using btclog.
24+
func UseLogger(logger btclog.Logger) {
25+
log = logger
26+
}

notifications/manager.go

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
package notifications
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
"github.com/lightninglabs/loop/swapserverrpc"
9+
"google.golang.org/grpc"
10+
)
11+
12+
// NotificationType is the type of notification that the manager can handle.
13+
type NotificationType int
14+
15+
const (
16+
// NotificationTypeUnknown is the default notification type.
17+
NotificationTypeUnknown NotificationType = iota
18+
19+
// NotificationTypeReservation is the notification type for reservation
20+
// notifications.
21+
NotificationTypeReservation
22+
)
23+
24+
// Client is the interface that the notification manager needs to implement in
25+
// order to be able to subscribe to notifications.
26+
type Client interface {
27+
// SubscribeNotifications subscribes to the notifications from the server.
28+
SubscribeNotifications(ctx context.Context,
29+
in *swapserverrpc.SubscribeNotificationsRequest,
30+
opts ...grpc.CallOption) (
31+
swapserverrpc.SwapServer_SubscribeNotificationsClient, error)
32+
}
33+
34+
// Config contains all the services that the notification manager needs to
35+
// operate.
36+
type Config struct {
37+
// Client is the client used to communicate with the swap server.
38+
Client Client
39+
40+
// FetchL402 is the function used to fetch the l402 token.
41+
FetchL402 func(context.Context) error
42+
}
43+
44+
// Manager is a manager for notifications that the swap server sends to the
45+
// client.
46+
type Manager struct {
47+
cfg *Config
48+
49+
hasL402 bool
50+
51+
subscribers map[NotificationType][]subscriber
52+
sync.Mutex
53+
}
54+
55+
// NewManager creates a new notification manager.
56+
func NewManager(cfg *Config) *Manager {
57+
return &Manager{
58+
cfg: cfg,
59+
subscribers: make(map[NotificationType][]subscriber),
60+
}
61+
}
62+
63+
type subscriber struct {
64+
subCtx context.Context
65+
recvChan interface{}
66+
}
67+
68+
// SubscribeReservations subscribes to the reservation notifications.
69+
func (m *Manager) SubscribeReservations(ctx context.Context,
70+
) <-chan *swapserverrpc.ServerReservationNotification {
71+
72+
notifChan := make(chan *swapserverrpc.ServerReservationNotification, 1)
73+
sub := subscriber{
74+
subCtx: ctx,
75+
recvChan: notifChan,
76+
}
77+
78+
m.addSubscriber(NotificationTypeReservation, sub)
79+
80+
// Start a goroutine to remove the subscriber when the context is canceled
81+
go func() {
82+
<-ctx.Done()
83+
m.removeSubscriber(NotificationTypeReservation, sub)
84+
close(notifChan)
85+
}()
86+
87+
return notifChan
88+
}
89+
90+
// Run starts the notification manager. It will keep on running until the
91+
// context is canceled. It will subscribe to notifications and forward them to
92+
// the subscribers. On a first successful connection to the server, it will
93+
// close the readyChan to signal that the manager is ready.
94+
func (m *Manager) Run(ctx context.Context) error {
95+
// Initially we want to immediately try to connect to the server.
96+
waitTime := time.Duration(0)
97+
98+
// Start the notification runloop.
99+
for {
100+
timer := time.NewTimer(waitTime)
101+
// Increase the wait time for the next iteration.
102+
waitTime += time.Second * 1
103+
104+
// Return if the context has been canceled.
105+
select {
106+
case <-ctx.Done():
107+
return nil
108+
109+
case <-timer.C:
110+
}
111+
112+
// In order to create a valid l402 we first are going to call
113+
// the FetchL402 method. As a client might not have outbound capacity
114+
// yet, we'll retry until we get a valid response.
115+
if !m.hasL402 {
116+
err := m.cfg.FetchL402(ctx)
117+
if err != nil {
118+
log.Errorf("Error fetching L402: %v", err)
119+
continue
120+
}
121+
m.hasL402 = true
122+
}
123+
124+
connectedFunc := func() {
125+
// Reset the wait time to 10 seconds.
126+
waitTime = time.Second * 10
127+
}
128+
129+
err := m.subscribeNotifications(ctx, connectedFunc)
130+
if err != nil {
131+
log.Errorf("Error subscribing to notifications: %v", err)
132+
}
133+
}
134+
}
135+
136+
// subscribeNotifications subscribes to the notifications from the server.
137+
func (m *Manager) subscribeNotifications(ctx context.Context,
138+
connectedFunc func()) error {
139+
140+
callCtx, cancel := context.WithCancel(ctx)
141+
defer cancel()
142+
143+
notifStream, err := m.cfg.Client.SubscribeNotifications(
144+
callCtx, &swapserverrpc.SubscribeNotificationsRequest{},
145+
)
146+
if err != nil {
147+
return err
148+
}
149+
150+
// Signal that we're connected to the server.
151+
connectedFunc()
152+
log.Debugf("Successfully subscribed to server notifications")
153+
154+
for {
155+
notification, err := notifStream.Recv()
156+
if err == nil && notification != nil {
157+
log.Debugf("Received notification: %v", notification)
158+
m.handleNotification(notification)
159+
continue
160+
}
161+
162+
log.Errorf("Error receiving notification: %v", err)
163+
164+
return err
165+
}
166+
}
167+
168+
// handleNotification handles an incoming notification from the server,
169+
// forwarding it to the appropriate subscribers.
170+
func (m *Manager) handleNotification(notification *swapserverrpc.
171+
SubscribeNotificationsResponse) {
172+
173+
switch notification.Notification.(type) {
174+
case *swapserverrpc.SubscribeNotificationsResponse_ReservationNotification:
175+
// We'll forward the reservation notification to all subscribers.
176+
reservationNtfn := notification.GetReservationNotification()
177+
m.Lock()
178+
defer m.Unlock()
179+
180+
for _, sub := range m.subscribers[NotificationTypeReservation] {
181+
recvChan := sub.recvChan.(chan *swapserverrpc.
182+
ServerReservationNotification)
183+
184+
recvChan <- reservationNtfn
185+
}
186+
187+
default:
188+
log.Warnf("Received unknown notification type: %v",
189+
notification)
190+
}
191+
}
192+
193+
// addSubscriber adds a subscriber to the manager.
194+
func (m *Manager) addSubscriber(notifType NotificationType, sub subscriber) {
195+
m.Lock()
196+
defer m.Unlock()
197+
m.subscribers[notifType] = append(m.subscribers[notifType], sub)
198+
}
199+
200+
// removeSubscriber removes a subscriber from the manager.
201+
func (m *Manager) removeSubscriber(notifType NotificationType, sub subscriber) {
202+
m.Lock()
203+
defer m.Unlock()
204+
subs := m.subscribers[notifType]
205+
newSubs := make([]subscriber, 0, len(subs))
206+
for _, s := range subs {
207+
if s != sub {
208+
newSubs = append(newSubs, s)
209+
}
210+
}
211+
m.subscribers[notifType] = newSubs
212+
}

0 commit comments

Comments
 (0)