-
Notifications
You must be signed in to change notification settings - Fork 185
/
Copy pathbase_provider.go
101 lines (89 loc) · 2.35 KB
/
base_provider.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package data_providers
import (
"context"
"fmt"
"sync"
"github.com/onflow/flow-go/engine/access/rest/websockets/models"
"github.com/onflow/flow-go/engine/access/subscription"
)
// baseDataProvider holds common objects for the provider
type baseDataProvider struct {
subscriptionID string
topic string
arguments models.Arguments
cancel context.CancelFunc
send chan<- interface{}
subscription subscription.Subscription
// Ensures the closedChan has been closed once.
closedFlag sync.Once
closedChan chan struct{}
}
// newBaseDataProvider creates a new instance of baseDataProvider.
func newBaseDataProvider(
subscriptionID string,
topic string,
arguments models.Arguments,
cancel context.CancelFunc,
send chan<- interface{},
subscription subscription.Subscription,
) *baseDataProvider {
return &baseDataProvider{
subscriptionID: subscriptionID,
topic: topic,
arguments: arguments,
cancel: cancel,
send: send,
subscription: subscription,
closedFlag: sync.Once{},
closedChan: make(chan struct{}, 1),
}
}
// ID returns the subscription ID associated with current data provider
func (b *baseDataProvider) ID() string {
return b.subscriptionID
}
// Topic returns the topic associated with the data provider.
func (b *baseDataProvider) Topic() string {
return b.topic
}
// Arguments returns the arguments associated with the data provider.
func (b *baseDataProvider) Arguments() models.Arguments {
return b.arguments
}
// Close terminates the data provider.
//
// No errors are expected during normal operations.
func (b *baseDataProvider) Close() {
b.cancel()
b.closedFlag.Do(func() {
close(b.closedChan)
})
}
type sendResponseCallback[T any] func(T) error
func run[T any](
closedChan <-chan struct{},
subscription subscription.Subscription,
sendResponse sendResponseCallback[T],
) error {
for {
select {
case <-closedChan:
return nil
case value, ok := <-subscription.Channel():
if !ok {
if subscription.Err() != nil {
return fmt.Errorf("subscription finished with error: %w", subscription.Err())
}
return nil
}
response, ok := value.(T)
if !ok {
return fmt.Errorf("unexpected response type: %T", value)
}
err := sendResponse(response)
if err != nil {
return fmt.Errorf("error sending response: %w", err)
}
}
}
}