-
Notifications
You must be signed in to change notification settings - Fork 185
/
Copy pathblocks_provider.go
180 lines (153 loc) · 5.45 KB
/
blocks_provider.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package data_providers
import (
"context"
"fmt"
"github.com/rs/zerolog"
"github.com/onflow/flow-go/access"
commonmodels "github.com/onflow/flow-go/engine/access/rest/common/models"
"github.com/onflow/flow-go/engine/access/rest/common/parser"
"github.com/onflow/flow-go/engine/access/rest/http/request"
"github.com/onflow/flow-go/engine/access/rest/util"
"github.com/onflow/flow-go/engine/access/rest/websockets/models"
"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/model/flow"
)
// BlocksArguments contains the arguments required for subscribing to blocks / block headers / block digests
type blocksArguments struct {
StartBlockID flow.Identifier // ID of the block to start subscription from
StartBlockHeight uint64 // Height of the block to start subscription from
BlockStatus flow.BlockStatus // Status of blocks to subscribe to
}
// BlocksDataProvider is responsible for providing blocks
type BlocksDataProvider struct {
*baseDataProvider
logger zerolog.Logger
api access.API
arguments blocksArguments
linkGenerator commonmodels.LinkGenerator
}
var _ DataProvider = (*BlocksDataProvider)(nil)
// NewBlocksDataProvider creates a new instance of BlocksDataProvider.
func NewBlocksDataProvider(
ctx context.Context,
logger zerolog.Logger,
api access.API,
subscriptionID string,
linkGenerator commonmodels.LinkGenerator,
topic string,
arguments models.Arguments,
send chan<- interface{},
) (*BlocksDataProvider, error) {
p := &BlocksDataProvider{
logger: logger.With().Str("component", "blocks-data-provider").Logger(),
api: api,
linkGenerator: linkGenerator,
}
// Parse arguments passed to the provider.
var err error
p.arguments, err = ParseBlocksArguments(arguments)
if err != nil {
return nil, fmt.Errorf("invalid arguments: %w", err)
}
subCtx, cancel := context.WithCancel(ctx)
p.baseDataProvider = newBaseDataProvider(
subscriptionID,
topic,
arguments,
cancel,
send,
p.createSubscription(subCtx, p.arguments), // Set up a subscription to blocks based on arguments.
)
return p, nil
}
// Run starts processing the subscription for blocks and handles responses.
//
// No errors are expected during normal operations.
func (p *BlocksDataProvider) Run() error {
return run(
p.closedChan,
p.subscription,
func(block *flow.Block) error {
var blockResponse commonmodels.Block
expandPayload := map[string]bool{commonmodels.ExpandableFieldPayload: true}
err := blockResponse.Build(block, nil, p.linkGenerator, p.arguments.BlockStatus, expandPayload)
if err != nil {
return fmt.Errorf("failed to build block response :%w", err)
}
var response models.BaseDataProvidersResponse
response.Build(p.ID(), p.Topic(), &blockResponse)
p.send <- &response
return nil
},
)
}
// createSubscription creates a new subscription using the specified input arguments.
func (p *BlocksDataProvider) createSubscription(ctx context.Context, args blocksArguments) subscription.Subscription {
if args.StartBlockID != flow.ZeroID {
return p.api.SubscribeBlocksFromStartBlockID(ctx, args.StartBlockID, args.BlockStatus)
}
if args.StartBlockHeight != request.EmptyHeight {
return p.api.SubscribeBlocksFromStartHeight(ctx, args.StartBlockHeight, args.BlockStatus)
}
return p.api.SubscribeBlocksFromLatest(ctx, args.BlockStatus)
}
// ParseBlocksArguments validates and initializes the blocks arguments.
func ParseBlocksArguments(arguments models.Arguments) (blocksArguments, error) {
var args blocksArguments
// Parse 'block_status'
if blockStatusIn, ok := arguments["block_status"]; ok {
result, ok := blockStatusIn.(string)
if !ok {
return args, fmt.Errorf("'block_status' must be string")
}
blockStatus, err := parser.ParseBlockStatus(result)
if err != nil {
return args, err
}
args.BlockStatus = blockStatus
} else {
return args, fmt.Errorf("'block_status' must be provided")
}
// Parse block arguments
startBlockID, startBlockHeight, err := ParseStartBlock(arguments)
if err != nil {
return args, err
}
args.StartBlockID = startBlockID
args.StartBlockHeight = startBlockHeight
return args, nil
}
func ParseStartBlock(arguments models.Arguments) (flow.Identifier, uint64, error) {
startBlockIDIn, hasStartBlockID := arguments["start_block_id"]
startBlockHeightIn, hasStartBlockHeight := arguments["start_block_height"]
// Check for mutual exclusivity of start_block_id and start_block_height early
if hasStartBlockID && hasStartBlockHeight {
return flow.ZeroID, 0, fmt.Errorf("can only provide either 'start_block_id' or 'start_block_height'")
}
// Parse 'start_block_id'
if hasStartBlockID {
result, ok := startBlockIDIn.(string)
if !ok {
return flow.ZeroID, request.EmptyHeight, fmt.Errorf("'start_block_id' must be a string")
}
var startBlockID parser.ID
err := startBlockID.Parse(result)
if err != nil {
return flow.ZeroID, request.EmptyHeight, fmt.Errorf("invalid 'start_block_id': %w", err)
}
return startBlockID.Flow(), request.EmptyHeight, nil
}
// Parse 'start_block_height'
if hasStartBlockHeight {
result, ok := startBlockHeightIn.(string)
if !ok {
return flow.ZeroID, 0, fmt.Errorf("'start_block_height' must be a string")
}
startBlockHeight, err := util.ToUint64(result)
if err != nil {
return flow.ZeroID, request.EmptyHeight, fmt.Errorf("invalid 'start_block_height': %w", err)
}
return flow.ZeroID, startBlockHeight, nil
}
return flow.ZeroID, request.EmptyHeight, nil
}