-
Notifications
You must be signed in to change notification settings - Fork 185
/
Copy pathaccount_statuses_provider.go
187 lines (156 loc) · 6.01 KB
/
account_statuses_provider.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
package data_providers
import (
"context"
"fmt"
"github.com/rs/zerolog"
"github.com/onflow/flow-go/engine/access/rest/common/parser"
"github.com/onflow/flow-go/engine/access/rest/http/request"
"github.com/onflow/flow-go/engine/access/rest/websockets/models"
"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/engine/access/state_stream/backend"
"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/counters"
)
// accountStatusesArguments contains the arguments required for subscribing to account statuses
type accountStatusesArguments struct {
StartBlockID flow.Identifier // ID of the block to start subscription from
StartBlockHeight uint64 // Height of the block to start subscription from
Filter state_stream.AccountStatusFilter // Filter applied to events for a given subscription
}
type AccountStatusesDataProvider struct {
*baseDataProvider
logger zerolog.Logger
stateStreamApi state_stream.API
heartbeatInterval uint64
}
var _ DataProvider = (*AccountStatusesDataProvider)(nil)
// NewAccountStatusesDataProvider creates a new instance of AccountStatusesDataProvider.
func NewAccountStatusesDataProvider(
ctx context.Context,
logger zerolog.Logger,
stateStreamApi state_stream.API,
subscriptionID string,
topic string,
arguments models.Arguments,
send chan<- interface{},
chain flow.Chain,
eventFilterConfig state_stream.EventFilterConfig,
heartbeatInterval uint64,
) (*AccountStatusesDataProvider, error) {
if stateStreamApi == nil {
return nil, fmt.Errorf("this access node does not support streaming account statuses")
}
p := &AccountStatusesDataProvider{
logger: logger.With().Str("component", "account-statuses-data-provider").Logger(),
stateStreamApi: stateStreamApi,
heartbeatInterval: heartbeatInterval,
}
// Initialize arguments passed to the provider.
accountStatusesArgs, err := parseAccountStatusesArguments(arguments, chain, eventFilterConfig)
if err != nil {
return nil, fmt.Errorf("invalid arguments for account statuses data provider: %w", err)
}
subCtx, cancel := context.WithCancel(ctx)
p.baseDataProvider = newBaseDataProvider(
subscriptionID,
topic,
arguments,
cancel,
send,
p.createSubscription(subCtx, accountStatusesArgs), // Set up a subscription to account statuses based on arguments.
)
return p, nil
}
// Run starts processing the subscription for events and handles responses.
//
// No errors are expected during normal operations.
func (p *AccountStatusesDataProvider) Run() error {
messageIndex := counters.NewMonotonicCounter(0)
blocksSinceLastMessage := uint64(0)
return run(
p.closedChan,
p.subscription,
func(response *backend.AccountStatusesResponse) error {
return p.sendResponse(response, &messageIndex, &blocksSinceLastMessage)
},
)
}
// sendResponse processes an account statuses message and sends it to data provider's channel.
//
// No errors are expected during normal operations.
func (p *AccountStatusesDataProvider) sendResponse(
response *backend.AccountStatusesResponse,
messageIndex *counters.StrictMonotonicCounter,
blocksSinceLastMessage *uint64,
) error {
// Only send a response if there's meaningful data to send.
// The block counter increments until either:
// 1. The account emits events
// 2. The heartbeat interval is reached
*blocksSinceLastMessage += 1
accountEmittedEvents := len(response.AccountEvents) != 0
reachedHeartbeatLimit := *blocksSinceLastMessage >= p.heartbeatInterval
if !accountEmittedEvents && !reachedHeartbeatLimit {
return nil
}
var accountStatusesPayload models.AccountStatusesResponse
defer messageIndex.Increment()
accountStatusesPayload.Build(response, messageIndex.Value())
var resp models.BaseDataProvidersResponse
resp.Build(p.ID(), p.Topic(), &accountStatusesPayload)
p.send <- &resp
*blocksSinceLastMessage = 0
return nil
}
// createSubscription creates a new subscription using the specified input arguments.
func (p *AccountStatusesDataProvider) createSubscription(ctx context.Context, args accountStatusesArguments) subscription.Subscription {
if args.StartBlockID != flow.ZeroID {
return p.stateStreamApi.SubscribeAccountStatusesFromStartBlockID(ctx, args.StartBlockID, args.Filter)
}
if args.StartBlockHeight != request.EmptyHeight {
return p.stateStreamApi.SubscribeAccountStatusesFromStartHeight(ctx, args.StartBlockHeight, args.Filter)
}
return p.stateStreamApi.SubscribeAccountStatusesFromLatestBlock(ctx, args.Filter)
}
// parseAccountStatusesArguments validates and initializes the account statuses arguments.
func parseAccountStatusesArguments(
arguments models.Arguments,
chain flow.Chain,
eventFilterConfig state_stream.EventFilterConfig,
) (accountStatusesArguments, error) {
var args accountStatusesArguments
// Parse block arguments
startBlockID, startBlockHeight, err := ParseStartBlock(arguments)
if err != nil {
return args, err
}
args.StartBlockID = startBlockID
args.StartBlockHeight = startBlockHeight
// Parse 'event_types' as a JSON array
var eventTypes parser.EventTypes
if eventTypesIn, ok := arguments["event_types"]; ok && eventTypesIn != "" {
result, ok := eventTypesIn.([]string)
if !ok {
return args, fmt.Errorf("'event_types' must be an array of string")
}
err := eventTypes.Parse(result)
if err != nil {
return args, fmt.Errorf("invalid 'event_types': %w", err)
}
}
// Parse 'accountAddresses' as []string{}
var accountAddresses []string
if accountAddressesIn, ok := arguments["account_addresses"]; ok && accountAddressesIn != "" {
accountAddresses, ok = accountAddressesIn.([]string)
if !ok {
return args, fmt.Errorf("'account_addresses' must be an array of string")
}
}
// Initialize the event filter with the parsed arguments
args.Filter, err = state_stream.NewAccountStatusFilter(eventFilterConfig, chain, eventTypes.Flow(), accountAddresses)
if err != nil {
return args, fmt.Errorf("failed to create event filter: %w", err)
}
return args, nil
}