-
Notifications
You must be signed in to change notification settings - Fork 185
/
Copy pathblock_headers_provider.go
96 lines (80 loc) · 2.53 KB
/
block_headers_provider.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package data_providers
import (
"context"
"fmt"
"github.com/rs/zerolog"
"github.com/onflow/flow-go/access"
commonmodels "github.com/onflow/flow-go/engine/access/rest/common/models"
"github.com/onflow/flow-go/engine/access/rest/http/request"
"github.com/onflow/flow-go/engine/access/rest/websockets/models"
"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/model/flow"
)
// BlockHeadersDataProvider is responsible for providing block headers
type BlockHeadersDataProvider struct {
*baseDataProvider
logger zerolog.Logger
api access.API
}
var _ DataProvider = (*BlockHeadersDataProvider)(nil)
// NewBlockHeadersDataProvider creates a new instance of BlockHeadersDataProvider.
func NewBlockHeadersDataProvider(
ctx context.Context,
logger zerolog.Logger,
api access.API,
subscriptionID string,
topic string,
arguments models.Arguments,
send chan<- interface{},
) (*BlockHeadersDataProvider, error) {
p := &BlockHeadersDataProvider{
logger: logger.With().Str("component", "block-headers-data-provider").Logger(),
api: api,
}
// Parse arguments passed to the provider.
blockArgs, err := ParseBlocksArguments(arguments)
if err != nil {
return nil, fmt.Errorf("invalid arguments: %w", err)
}
subCtx, cancel := context.WithCancel(ctx)
p.baseDataProvider = newBaseDataProvider(
subscriptionID,
topic,
arguments,
cancel,
send,
p.createSubscription(subCtx, blockArgs), // Set up a subscription to block headers based on arguments.
)
return p, nil
}
// Run starts processing the subscription for block headers and handles responses.
//
// No errors are expected during normal operations.
func (p *BlockHeadersDataProvider) Run() error {
return run(
p.closedChan,
p.subscription,
func(h *flow.Header) error {
var header commonmodels.BlockHeader
header.Build(h)
var response models.BaseDataProvidersResponse
response.Build(
p.ID(),
p.Topic(),
&header,
)
p.send <- &response
return nil
},
)
}
// createSubscription creates a new subscription using the specified input arguments.
func (p *BlockHeadersDataProvider) createSubscription(ctx context.Context, args blocksArguments) subscription.Subscription {
if args.StartBlockID != flow.ZeroID {
return p.api.SubscribeBlockHeadersFromStartBlockID(ctx, args.StartBlockID, args.BlockStatus)
}
if args.StartBlockHeight != request.EmptyHeight {
return p.api.SubscribeBlockHeadersFromStartHeight(ctx, args.StartBlockHeight, args.BlockStatus)
}
return p.api.SubscribeBlockHeadersFromLatest(ctx, args.BlockStatus)
}