Skip to content

Commit d5a249f

Browse files
authored
Merge pull request #7082 from onflow/illia-malachyn/7080-add-heatbeat-interval-to-providers
[Access] Add heartbeat interval to providers
2 parents 8e369c0 + 8b0bd4e commit d5a249f

12 files changed

+217
-51
lines changed

Diff for: engine/access/rest/websockets/data_providers/account_statuses_provider.go

+40-8
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/onflow/flow-go/engine/access/rest/common/parser"
1212
"github.com/onflow/flow-go/engine/access/rest/http/request"
13+
"github.com/onflow/flow-go/engine/access/rest/util"
1314
"github.com/onflow/flow-go/engine/access/rest/websockets/models"
1415
"github.com/onflow/flow-go/engine/access/state_stream"
1516
"github.com/onflow/flow-go/engine/access/state_stream/backend"
@@ -20,9 +21,10 @@ import (
2021

2122
// accountStatusesArguments contains the arguments required for subscribing to account statuses
2223
type accountStatusesArguments struct {
23-
StartBlockID flow.Identifier // ID of the block to start subscription from
24-
StartBlockHeight uint64 // Height of the block to start subscription from
25-
Filter state_stream.AccountStatusFilter // Filter applied to events for a given subscription
24+
StartBlockID flow.Identifier // ID of the block to start subscription from
25+
StartBlockHeight uint64 // Height of the block to start subscription from
26+
Filter state_stream.AccountStatusFilter // Filter applied to events for a given subscription
27+
HeartbeatInterval *uint64 // Maximum number of blocks message won't be sent. Nil if not set
2628
}
2729

2830
type AccountStatusesDataProvider struct {
@@ -64,6 +66,9 @@ func NewAccountStatusesDataProvider(
6466
if err != nil {
6567
return nil, fmt.Errorf("invalid arguments for account statuses data provider: %w", err)
6668
}
69+
if accountStatusesArgs.HeartbeatInterval != nil {
70+
p.heartbeatInterval = *accountStatusesArgs.HeartbeatInterval
71+
}
6772

6873
subCtx, cancel := context.WithCancel(ctx)
6974

@@ -140,10 +145,22 @@ func parseAccountStatusesArguments(
140145
chain flow.Chain,
141146
eventFilterConfig state_stream.EventFilterConfig,
142147
) (accountStatusesArguments, error) {
148+
allowedFields := []string{
149+
"start_block_id",
150+
"start_block_height",
151+
"event_types",
152+
"account_addresses",
153+
"heartbeat_interval",
154+
}
155+
err := ensureAllowedFields(arguments, allowedFields)
156+
if err != nil {
157+
return accountStatusesArguments{}, err
158+
}
159+
143160
var args accountStatusesArguments
144161

145162
// Parse block arguments
146-
startBlockID, startBlockHeight, err := ParseStartBlock(arguments)
163+
startBlockID, startBlockHeight, err := parseStartBlock(arguments)
147164
if err != nil {
148165
return args, err
149166
}
@@ -155,12 +172,12 @@ func parseAccountStatusesArguments(
155172
if eventTypesIn, ok := arguments["event_types"]; ok && eventTypesIn != "" {
156173
result, ok := eventTypesIn.([]string)
157174
if !ok {
158-
return args, fmt.Errorf("'event_types' must be an array of string")
175+
return accountStatusesArguments{}, fmt.Errorf("'event_types' must be an array of string")
159176
}
160177

161178
err := eventTypes.Parse(result)
162179
if err != nil {
163-
return args, fmt.Errorf("invalid 'event_types': %w", err)
180+
return accountStatusesArguments{}, fmt.Errorf("invalid 'event_types': %w", err)
164181
}
165182
}
166183

@@ -169,14 +186,29 @@ func parseAccountStatusesArguments(
169186
if accountAddressesIn, ok := arguments["account_addresses"]; ok && accountAddressesIn != "" {
170187
accountAddresses, ok = accountAddressesIn.([]string)
171188
if !ok {
172-
return args, fmt.Errorf("'account_addresses' must be an array of string")
189+
return accountStatusesArguments{}, fmt.Errorf("'account_addresses' must be an array of string")
173190
}
174191
}
175192

193+
var heartbeatInterval uint64
194+
if heartbeatIntervalIn, ok := arguments["heartbeat_interval"]; ok && heartbeatIntervalIn != "" {
195+
result, ok := heartbeatIntervalIn.(string)
196+
if !ok {
197+
return accountStatusesArguments{}, fmt.Errorf("'heartbeat_interval' must be a string")
198+
}
199+
200+
heartbeatInterval, err = util.ToUint64(result)
201+
if err != nil {
202+
return accountStatusesArguments{}, fmt.Errorf("invalid 'heartbeat_interval': %w", err)
203+
}
204+
205+
args.HeartbeatInterval = &heartbeatInterval
206+
}
207+
176208
// Initialize the event filter with the parsed arguments
177209
args.Filter, err = state_stream.NewAccountStatusFilter(eventFilterConfig, chain, eventTypes.Flow(), accountAddresses)
178210
if err != nil {
179-
return args, fmt.Errorf("failed to create event filter: %w", err)
211+
return accountStatusesArguments{}, fmt.Errorf("failed to create event filter: %w", err)
180212
}
181213

182214
return args, nil
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package data_providers
2+
3+
import (
4+
"fmt"
5+
)
6+
7+
func ensureAllowedFields(data map[string]interface{}, allowedFields []string) error {
8+
fieldsMap := make(map[string]bool, len(allowedFields))
9+
for _, field := range allowedFields {
10+
fieldsMap[field] = true
11+
}
12+
13+
for key := range data {
14+
if !fieldsMap[key] {
15+
return fmt.Errorf("unexpected field: '%s'", key)
16+
}
17+
}
18+
return nil
19+
}

Diff for: engine/access/rest/websockets/data_providers/block_digests_provider.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func NewBlockDigestsDataProvider(
3939
}
4040

4141
// Parse arguments passed to the provider.
42-
blockArgs, err := ParseBlocksArguments(arguments)
42+
blockArgs, err := parseBlocksArguments(arguments)
4343
if err != nil {
4444
return nil, fmt.Errorf("invalid arguments: %w", err)
4545
}

Diff for: engine/access/rest/websockets/data_providers/block_headers_provider.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func NewBlockHeadersDataProvider(
4040
}
4141

4242
// Parse arguments passed to the provider.
43-
blockArgs, err := ParseBlocksArguments(arguments)
43+
blockArgs, err := parseBlocksArguments(arguments)
4444
if err != nil {
4545
return nil, fmt.Errorf("invalid arguments: %w", err)
4646
}

Diff for: engine/access/rest/websockets/data_providers/blocks_provider.go

+19-9
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func NewBlocksDataProvider(
5454

5555
// Parse arguments passed to the provider.
5656
var err error
57-
p.arguments, err = ParseBlocksArguments(arguments)
57+
p.arguments, err = parseBlocksArguments(arguments)
5858
if err != nil {
5959
return nil, fmt.Errorf("invalid arguments: %w", err)
6060
}
@@ -108,37 +108,47 @@ func (p *BlocksDataProvider) createSubscription(ctx context.Context, args blocks
108108
return p.api.SubscribeBlocksFromLatest(ctx, args.BlockStatus)
109109
}
110110

111-
// ParseBlocksArguments validates and initializes the blocks arguments.
112-
func ParseBlocksArguments(arguments models.Arguments) (blocksArguments, error) {
111+
// parseBlocksArguments validates and initializes the blocks arguments.
112+
func parseBlocksArguments(arguments models.Arguments) (blocksArguments, error) {
113+
allowedFields := []string{
114+
"start_block_id",
115+
"start_block_height",
116+
"block_status",
117+
}
118+
err := ensureAllowedFields(arguments, allowedFields)
119+
if err != nil {
120+
return blocksArguments{}, err
121+
}
122+
113123
var args blocksArguments
114124

115125
// Parse 'block_status'
116126
if blockStatusIn, ok := arguments["block_status"]; ok {
117127
result, ok := blockStatusIn.(string)
118128
if !ok {
119-
return args, fmt.Errorf("'block_status' must be string")
129+
return blocksArguments{}, fmt.Errorf("'block_status' must be string")
120130
}
121131
blockStatus, err := parser.ParseBlockStatus(result)
122132
if err != nil {
123-
return args, err
133+
return blocksArguments{}, err
124134
}
125135
args.BlockStatus = blockStatus
126136
} else {
127-
return args, fmt.Errorf("'block_status' must be provided")
137+
return blocksArguments{}, fmt.Errorf("'block_status' must be provided")
128138
}
129139

130140
// Parse block arguments
131-
startBlockID, startBlockHeight, err := ParseStartBlock(arguments)
141+
startBlockID, startBlockHeight, err := parseStartBlock(arguments)
132142
if err != nil {
133-
return args, err
143+
return blocksArguments{}, err
134144
}
135145
args.StartBlockID = startBlockID
136146
args.StartBlockHeight = startBlockHeight
137147

138148
return args, nil
139149
}
140150

141-
func ParseStartBlock(arguments models.Arguments) (flow.Identifier, uint64, error) {
151+
func parseStartBlock(arguments models.Arguments) (flow.Identifier, uint64, error) {
142152
startBlockIDIn, hasStartBlockID := arguments["start_block_id"]
143153
startBlockHeightIn, hasStartBlockHeight := arguments["start_block_height"]
144154

Diff for: engine/access/rest/websockets/data_providers/blocks_provider_test.go

+11
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ func (s *BlocksProviderSuite) expectedBlockResponses(
217217
// 1. Missing 'block_status' argument.
218218
// 2. Invalid 'block_status' argument.
219219
// 3. Providing both 'start_block_id' and 'start_block_height' simultaneously.
220+
// 4. Providing unexpected argument.
220221
func (s *BlocksProviderSuite) TestBlocksDataProvider_InvalidArguments() {
221222
ctx := context.Background()
222223
send := make(chan interface{})
@@ -239,6 +240,7 @@ func (s *BlocksProviderSuite) TestBlocksDataProvider_InvalidArguments() {
239240
// 1. Missing the required 'block_status' argument.
240241
// 2. Providing an unknown or invalid 'block_status' value.
241242
// 3. Supplying both 'start_block_id' and 'start_block_height' simultaneously, which is not allowed.
243+
// 4. Providing unexpected argument.
242244
func (s *BlocksProviderSuite) invalidArgumentsTestCases() []testErrType {
243245
return []testErrType{
244246
{
@@ -264,5 +266,14 @@ func (s *BlocksProviderSuite) invalidArgumentsTestCases() []testErrType {
264266
},
265267
expectedErrorMsg: "can only provide either 'start_block_id' or 'start_block_height'",
266268
},
269+
{
270+
name: "unexpected argument",
271+
arguments: map[string]interface{}{
272+
"block_status": parser.Finalized,
273+
"start_block_id": unittest.BlockFixture().ID().String(),
274+
"unexpected_argument": "dummy",
275+
},
276+
expectedErrorMsg: "unexpected field: 'unexpected_argument'",
277+
},
267278
}
268279
}

Diff for: engine/access/rest/websockets/data_providers/events_provider.go

+43-10
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/onflow/flow-go/engine/access/rest/common/parser"
1010
"github.com/onflow/flow-go/engine/access/rest/http/request"
11+
"github.com/onflow/flow-go/engine/access/rest/util"
1112
"github.com/onflow/flow-go/engine/access/rest/websockets/models"
1213
"github.com/onflow/flow-go/engine/access/state_stream"
1314
"github.com/onflow/flow-go/engine/access/state_stream/backend"
@@ -16,11 +17,12 @@ import (
1617
"github.com/onflow/flow-go/module/counters"
1718
)
1819

19-
// eventsArguments contains the arguments required for subscribing to events
20+
// eventsArguments contains the arguments a user passes to subscribe to events
2021
type eventsArguments struct {
21-
StartBlockID flow.Identifier // ID of the block to start subscription from
22-
StartBlockHeight uint64 // Height of the block to start subscription from
23-
Filter state_stream.EventFilter // Filter applied to events for a given subscription
22+
StartBlockID flow.Identifier // ID of the block to start subscription from
23+
StartBlockHeight uint64 // Height of the block to start subscription from
24+
Filter state_stream.EventFilter // Filter applied to events for a given subscription
25+
HeartbeatInterval *uint64 // Maximum number of blocks message won't be sent. Nil if not set
2426
}
2527

2628
// EventsDataProvider is responsible for providing events
@@ -63,6 +65,9 @@ func NewEventsDataProvider(
6365
if err != nil {
6466
return nil, fmt.Errorf("invalid arguments for events data provider: %w", err)
6567
}
68+
if eventArgs.HeartbeatInterval != nil {
69+
p.heartbeatInterval = *eventArgs.HeartbeatInterval
70+
}
6671

6772
subCtx, cancel := context.WithCancel(ctx)
6873

@@ -139,10 +144,23 @@ func parseEventsArguments(
139144
chain flow.Chain,
140145
eventFilterConfig state_stream.EventFilterConfig,
141146
) (eventsArguments, error) {
147+
allowedFields := []string{
148+
"start_block_id",
149+
"start_block_height",
150+
"event_types",
151+
"addresses",
152+
"contracts",
153+
"heartbeat_interval",
154+
}
155+
err := ensureAllowedFields(arguments, allowedFields)
156+
if err != nil {
157+
return eventsArguments{}, err
158+
}
159+
142160
var args eventsArguments
143161

144162
// Parse block arguments
145-
startBlockID, startBlockHeight, err := ParseStartBlock(arguments)
163+
startBlockID, startBlockHeight, err := parseStartBlock(arguments)
146164
if err != nil {
147165
return args, err
148166
}
@@ -154,12 +172,12 @@ func parseEventsArguments(
154172
if eventTypesIn, ok := arguments["event_types"]; ok && eventTypesIn != "" {
155173
result, ok := eventTypesIn.([]string)
156174
if !ok {
157-
return args, fmt.Errorf("'event_types' must be an array of string")
175+
return eventsArguments{}, fmt.Errorf("'event_types' must be an array of string")
158176
}
159177

160178
err := eventTypes.Parse(result)
161179
if err != nil {
162-
return args, fmt.Errorf("invalid 'event_types': %w", err)
180+
return eventsArguments{}, fmt.Errorf("invalid 'event_types': %w", err)
163181
}
164182
}
165183

@@ -168,7 +186,7 @@ func parseEventsArguments(
168186
if addressesIn, ok := arguments["addresses"]; ok && addressesIn != "" {
169187
addresses, ok = addressesIn.([]string)
170188
if !ok {
171-
return args, fmt.Errorf("'addresses' must be an array of string")
189+
return eventsArguments{}, fmt.Errorf("'addresses' must be an array of string")
172190
}
173191
}
174192

@@ -177,14 +195,29 @@ func parseEventsArguments(
177195
if contractsIn, ok := arguments["contracts"]; ok && contractsIn != "" {
178196
contracts, ok = contractsIn.([]string)
179197
if !ok {
180-
return args, fmt.Errorf("'contracts' must be an array of string")
198+
return eventsArguments{}, fmt.Errorf("'contracts' must be an array of string")
181199
}
182200
}
183201

202+
var heartbeatInterval uint64
203+
if heartbeatIntervalIn, ok := arguments["heartbeat_interval"]; ok && heartbeatIntervalIn != "" {
204+
result, ok := heartbeatIntervalIn.(string)
205+
if !ok {
206+
return eventsArguments{}, fmt.Errorf("'heartbeat_interval' must be a string")
207+
}
208+
209+
heartbeatInterval, err = util.ToUint64(result)
210+
if err != nil {
211+
return eventsArguments{}, fmt.Errorf("invalid 'heartbeat_interval': %w", err)
212+
}
213+
214+
args.HeartbeatInterval = &heartbeatInterval
215+
}
216+
184217
// Initialize the event filter with the parsed arguments
185218
args.Filter, err = state_stream.NewEventFilter(eventFilterConfig, chain, eventTypes.Flow(), addresses, contracts)
186219
if err != nil {
187-
return args, fmt.Errorf("failed to create event filter: %w", err)
220+
return eventsArguments{}, fmt.Errorf("failed to create event filter: %w", err)
188221
}
189222

190223
return args, nil

Diff for: engine/access/rest/websockets/data_providers/events_provider_test.go

+17
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,8 @@ func (s *EventsProviderSuite) TestEventsDataProvider_StateStreamNotConfigured()
330330
// 1. Supplying both 'start_block_id' and 'start_block_height' simultaneously, which is not allowed.
331331
// 2. Providing invalid 'start_block_id' value.
332332
// 3. Providing invalid 'start_block_height' value.
333+
// 4. Providing invalid 'heartbeat_interval' value.
334+
// 5. Providing unexpected argument.
333335
func invalidArgumentsTestCases() []testErrType {
334336
return []testErrType{
335337
{
@@ -354,5 +356,20 @@ func invalidArgumentsTestCases() []testErrType {
354356
},
355357
expectedErrorMsg: "value must be an unsigned 64 bit integer",
356358
},
359+
{
360+
name: "invalid 'heartbeat_interval' argument",
361+
arguments: map[string]interface{}{
362+
"heartbeat_interval": "-1",
363+
},
364+
expectedErrorMsg: "value must be an unsigned 64 bit integer",
365+
},
366+
{
367+
name: "unexpected argument",
368+
arguments: map[string]interface{}{
369+
"start_block_id": unittest.BlockFixture().ID().String(),
370+
"unexpected_argument": "dummy",
371+
},
372+
expectedErrorMsg: "unexpected field: 'unexpected_argument'",
373+
},
357374
}
358375
}

0 commit comments

Comments
 (0)