Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Properly handle subscription errors in data providers #7046

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
8f6ca1a
Properly handle subscription errors in data providers
illia-malachyn Feb 17, 2025
acb406d
change reciever name to match previous
illia-malachyn Feb 17, 2025
8230193
Add one shot close channel to all data providers
illia-malachyn Feb 18, 2025
24e95b8
do not spawn unnecessary routine in close()
illia-malachyn Feb 18, 2025
a5ef7f3
remove unused func
illia-malachyn Feb 19, 2025
be75dbc
null blockSinceLastMessage only if we sent a block
illia-malachyn Feb 19, 2025
dfa115e
remove unnecessary prevIndex variable
illia-malachyn Feb 21, 2025
feff7e5
Merge branch 'master' into illia-malachyn/7040-fix-context-canceled-e…
illia-malachyn Feb 21, 2025
7bcecd3
add comment for run()
illia-malachyn Feb 24, 2025
06891f8
Merge branch 'master' into illia-malachyn/7040-fix-context-canceled-e…
illia-malachyn Mar 13, 2025
41a80c5
Rework how data providers start serving data
illia-malachyn Mar 18, 2025
3b67bb3
fix comments
illia-malachyn Mar 18, 2025
cdf4d0f
Merge branch 'master' into illia-malachyn/7040-fix-context-canceled-e…
illia-malachyn Mar 18, 2025
772c40b
run code formatter
illia-malachyn Mar 18, 2025
a149dc5
Merge branch 'master' into illia-malachyn/7040-fix-context-canceled-e…
illia-malachyn Mar 19, 2025
cf95fcb
improve some comments for providers
illia-malachyn Mar 19, 2025
7351588
remove IDEs refactoring of comments
illia-malachyn Mar 19, 2025
fbab1b9
remove done channel. add comment about streamer contract for ctx.Canc…
illia-malachyn Mar 20, 2025
3c2f22f
pass ctx to provider constructor again
illia-malachyn Mar 20, 2025
6266d3b
remove subscription from provider struct
illia-malachyn Mar 20, 2025
18f0da5
Merge branch 'master' into illia-malachyn/7040-fix-context-canceled-e…
illia-malachyn Mar 20, 2025
429b961
remove unnecessary callbacks
illia-malachyn Mar 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions engine/access/rest/common/models/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,68 @@ import (
const ExpandableFieldPayload = "payload"
const ExpandableExecutionResult = "execution_result"

func NewBlock(
block *flow.Block,
execResult *flow.ExecutionResult,
link LinkGenerator,
blockStatus flow.BlockStatus,
expand map[string]bool,
) (*Block, error) {
self, err := SelfLink(block.ID(), link.BlockLink)
if err != nil {
return nil, err
}

var result Block
result.Header = NewBlockHeader(block.Header)

// add the payload to the response if it is specified as an expandable field
result.Expandable = &BlockExpandable{}
if expand[ExpandableFieldPayload] {
var payload BlockPayload
err := payload.Build(block.Payload)
if err != nil {
return nil, err
}
result.Payload = &payload
} else {
// else add the payload expandable link
payloadExpandable, err := link.PayloadLink(block.ID())
if err != nil {
return nil, err
}
result.Expandable.Payload = payloadExpandable
}

// execution result might not yet exist
if execResult != nil {
// add the execution result to the response if it is specified as an expandable field
if expand[ExpandableExecutionResult] {
var exeResult ExecutionResult
err := exeResult.Build(execResult, link)
if err != nil {
return nil, err
}
result.ExecutionResult = &exeResult
} else {
// else add the execution result expandable link
executionResultExpandable, err := link.ExecutionResultLink(execResult.ID())
if err != nil {
return nil, err
}
result.Expandable.ExecutionResult = executionResultExpandable
}
}

result.Links = self

var status BlockStatus
status.Build(blockStatus)
result.BlockStatus = &status

return &result, nil
}

func (b *Block) Build(
block *flow.Block,
execResult *flow.ExecutionResult,
Expand Down Expand Up @@ -99,6 +161,16 @@ func (b *BlockPayload) Build(payload *flow.Payload) error {
return nil
}

func NewBlockHeader(header *flow.Header) *BlockHeader {
return &BlockHeader{
Id: header.ID().String(),
ParentId: header.ParentID.String(),
Height: util.FromUint(header.Height),
Timestamp: header.Timestamp,
ParentVoterSignature: util.ToBase64(header.ParentVoterSigData),
}
}

func (b *BlockHeader) Build(header *flow.Header) {
b.Id = header.ID().String()
b.ParentId = header.ParentID.String()
Expand Down
4 changes: 1 addition & 3 deletions engine/access/rest/websockets/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,9 +476,7 @@ func (c *Controller) handleSubscribe(ctx context.Context, msg models.SubscribeMe
c.dataProvidersGroup.Add(1)
go func() {
err = provider.Run()
// return the error to the client for all errors except context.Canceled.
// context.Canceled is returned during graceful shutdown of a subscription
if err != nil && !errors.Is(err, context.Canceled) {
if err != nil {
err = fmt.Errorf("internal error: %w", err)
c.writeErrorResponse(
ctx,
Expand Down
12 changes: 6 additions & 6 deletions engine/access/rest/websockets/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (s *WsControllerSuite) TestSubscribeRequest() {
// data provider might finish on its own or controller will close it via Close()
dataProvider.On("Close").Return(nil).Maybe()
dataProvider.
On("Run").
On("Run", mock.Anything).
Run(func(args mock.Arguments) {
<-done
}).
Expand Down Expand Up @@ -206,7 +206,7 @@ func (s *WsControllerSuite) TestSubscribeRequest() {
// data provider might finish on its own or controller will close it via Close()
dataProvider.On("Close").Return(nil).Maybe()
dataProvider.
On("Run").
On("Run", mock.Anything).
Run(func(args mock.Arguments) {}).
Return(fmt.Errorf("error running data provider")).
Once()
Expand Down Expand Up @@ -261,7 +261,7 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() {
// data provider might finish on its own or controller will close it via Close()
dataProvider.On("Close").Return(nil).Maybe()
dataProvider.
On("Run").
On("Run", mock.Anything).
Run(func(args mock.Arguments) {
<-done
}).
Expand Down Expand Up @@ -329,7 +329,7 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() {
// data provider might finish on its own or controller will close it via Close()
dataProvider.On("Close").Return(nil).Maybe()
dataProvider.
On("Run").
On("Run", mock.Anything).
Run(func(args mock.Arguments) {
<-done
}).
Expand Down Expand Up @@ -399,7 +399,7 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() {
// data provider might finish on its own or controller will close it via Close()
dataProvider.On("Close").Return(nil).Maybe()
dataProvider.
On("Run").
On("Run", mock.Anything).
Run(func(args mock.Arguments) {
<-done
}).
Expand Down Expand Up @@ -480,7 +480,7 @@ func (s *WsControllerSuite) TestListSubscriptions() {
// data provider might finish on its own or controller will close it via Close()
dataProvider.On("Close").Return(nil).Maybe()
dataProvider.
On("Run").
On("Run", mock.Anything).
Run(func(args mock.Arguments) {
<-done
}).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"fmt"

"github.com/rs/zerolog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/onflow/flow-go/engine/access/rest/http/request"
"github.com/onflow/flow-go/engine/access/rest/websockets/data_providers/models"
Expand All @@ -23,16 +21,16 @@ type accountStatusesArguments struct {
StartBlockID flow.Identifier // ID of the block to start subscription from
StartBlockHeight uint64 // Height of the block to start subscription from
Filter state_stream.AccountStatusFilter // Filter applied to events for a given subscription
HeartbeatInterval *uint64 // Maximum number of blocks message won't be sent. Nil if not set
HeartbeatInterval uint64 // Maximum number of blocks message won't be sent
}

type AccountStatusesDataProvider struct {
*baseDataProvider

logger zerolog.Logger
stateStreamApi state_stream.API

heartbeatInterval uint64
arguments accountStatusesArguments
messageIndex counters.StrictMonotonicCounter
blocksSinceLastMessage uint64
stateStreamApi state_stream.API
}

var _ DataProvider = (*AccountStatusesDataProvider)(nil)
Expand All @@ -44,55 +42,86 @@ func NewAccountStatusesDataProvider(
stateStreamApi state_stream.API,
subscriptionID string,
topic string,
arguments wsmodels.Arguments,
rawArguments wsmodels.Arguments,
send chan<- interface{},
chain flow.Chain,
eventFilterConfig state_stream.EventFilterConfig,
heartbeatInterval uint64,
defaultHeartbeatInterval uint64,
) (*AccountStatusesDataProvider, error) {
if stateStreamApi == nil {
return nil, fmt.Errorf("this access node does not support streaming account statuses")
}

p := &AccountStatusesDataProvider{
logger: logger.With().Str("component", "account-statuses-data-provider").Logger(),
stateStreamApi: stateStreamApi,
heartbeatInterval: heartbeatInterval,
}

// Initialize arguments passed to the provider.
accountStatusesArgs, err := parseAccountStatusesArguments(arguments, chain, eventFilterConfig)
args, err := parseAccountStatusesArguments(rawArguments, chain, eventFilterConfig, defaultHeartbeatInterval)
if err != nil {
return nil, fmt.Errorf("invalid arguments for account statuses data provider: %w", err)
}
if accountStatusesArgs.HeartbeatInterval != nil {
p.heartbeatInterval = *accountStatusesArgs.HeartbeatInterval
}

subCtx, cancel := context.WithCancel(ctx)

p.baseDataProvider = newBaseDataProvider(
provider := newBaseDataProvider(
ctx,
logger.With().Str("component", "account-statuses-data-provider").Logger(),
nil,
subscriptionID,
topic,
arguments,
cancel,
rawArguments,
send,
p.createSubscription(subCtx, accountStatusesArgs), // Set up a subscription to account statuses based on arguments.
)

return p, nil
return &AccountStatusesDataProvider{
baseDataProvider: provider,
arguments: args,
messageIndex: counters.NewMonotonicCounter(0),
blocksSinceLastMessage: 0,
stateStreamApi: stateStreamApi,
}, nil
}

// Run starts processing the subscription for events and handles responses.
// Must be called once.
//
// Expected errors during normal operations:
// - context.Canceled: if the operation is canceled, during an unsubscribe action.
// No errors expected during normal operations.
func (p *AccountStatusesDataProvider) Run() error {
return subscription.HandleSubscription(p.subscription, p.handleResponse())
return run(
p.createAndStartSubscription(p.ctx, p.arguments),
func(response *backend.AccountStatusesResponse) error {
return p.sendResponse(response)
},
)
}

// createSubscription creates a new subscription using the specified input arguments.
func (p *AccountStatusesDataProvider) createSubscription(ctx context.Context, args accountStatusesArguments) subscription.Subscription {
// sendResponse processes an account statuses message and sends it to data provider's channel.
// This function is not safe to call concurrently.
//
// No errors are expected during normal operations
func (p *AccountStatusesDataProvider) sendResponse(response *backend.AccountStatusesResponse) error {
// Only send a response if there's meaningful data to send
// or the heartbeat interval limit is reached
p.blocksSinceLastMessage += 1
accountEmittedEvents := len(response.AccountEvents) != 0
reachedHeartbeatLimit := p.blocksSinceLastMessage >= p.arguments.HeartbeatInterval
if !accountEmittedEvents && !reachedHeartbeatLimit {
return nil
}

accountStatusesPayload := models.NewAccountStatusesResponse(response, p.messageIndex.Value())
resp := models.BaseDataProvidersResponse{
SubscriptionID: p.ID(),
Topic: p.Topic(),
Payload: accountStatusesPayload,
}
p.send <- &resp

p.blocksSinceLastMessage = 0
p.messageIndex.Increment()

return nil
}

// createAndStartSubscription creates a new subscription using the specified input arguments.
func (p *AccountStatusesDataProvider) createAndStartSubscription(
ctx context.Context,
args accountStatusesArguments,
) subscription.Subscription {
if args.StartBlockID != flow.ZeroID {
return p.stateStreamApi.SubscribeAccountStatusesFromStartBlockID(ctx, args.StartBlockID, args.Filter)
}
Expand All @@ -104,46 +133,12 @@ func (p *AccountStatusesDataProvider) createSubscription(ctx context.Context, ar
return p.stateStreamApi.SubscribeAccountStatusesFromLatestBlock(ctx, args.Filter)
}

// handleResponse processes an account statuses and sends the formatted response.
//
// No errors are expected during normal operations.
func (p *AccountStatusesDataProvider) handleResponse() func(accountStatusesResponse *backend.AccountStatusesResponse) error {
blocksSinceLastMessage := uint64(0)
messageIndex := counters.NewMonotonicCounter(0)

return func(accountStatusesResponse *backend.AccountStatusesResponse) error {
// check if there are any events in the response. if not, do not send a message unless the last
// response was more than HeartbeatInterval blocks ago
if len(accountStatusesResponse.AccountEvents) == 0 {
blocksSinceLastMessage++
if blocksSinceLastMessage < p.heartbeatInterval {
return nil
}
}
blocksSinceLastMessage = 0

index := messageIndex.Value()
if ok := messageIndex.Set(messageIndex.Value() + 1); !ok {
return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value())
}

accountStatusesPayload := models.NewAccountStatusesResponse(accountStatusesResponse, index)
response := models.BaseDataProvidersResponse{
SubscriptionID: p.ID(),
Topic: p.Topic(),
Payload: accountStatusesPayload,
}
p.send <- &response

return nil
}
}

// parseAccountStatusesArguments validates and initializes the account statuses arguments.
func parseAccountStatusesArguments(
arguments wsmodels.Arguments,
chain flow.Chain,
eventFilterConfig state_stream.EventFilterConfig,
defaultHeartbeatInterval uint64,
) (accountStatusesArguments, error) {
allowedFields := map[string]struct{}{
"start_block_id": {},
Expand All @@ -168,7 +163,7 @@ func parseAccountStatusesArguments(
args.StartBlockHeight = startBlockHeight

// Parse 'heartbeat_interval' argument
heartbeatInterval, err := extractHeartbeatInterval(arguments)
heartbeatInterval, err := extractHeartbeatInterval(arguments, defaultHeartbeatInterval)
if err != nil {
return accountStatusesArguments{}, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,11 @@ func (s *AccountStatusesProviderSuite) TestAccountStatusesDataProvider_HappyPath
}

func (s *AccountStatusesProviderSuite) TestAccountStatusesDataProvider_StateStreamNotConfigured() {
ctx := context.Background()
send := make(chan interface{})

topic := AccountStatusesTopic

provider, err := NewAccountStatusesDataProvider(
ctx,
context.Background(),
s.log,
nil,
"dummy-id",
Expand Down Expand Up @@ -206,15 +204,13 @@ func (s *AccountStatusesProviderSuite) expectedAccountStatusesResponses(backendR
// when invalid arguments are provided. It verifies that appropriate errors are returned
// for missing or conflicting arguments.
func (s *AccountStatusesProviderSuite) TestAccountStatusesDataProvider_InvalidArguments() {
ctx := context.Background()
send := make(chan interface{})

topic := AccountStatusesTopic

for _, test := range invalidAccountStatusesArgumentsTestCases() {
s.Run(test.name, func() {
provider, err := NewAccountStatusesDataProvider(
ctx,
context.Background(),
s.log,
s.api,
"dummy-id",
Expand All @@ -234,7 +230,6 @@ func (s *AccountStatusesProviderSuite) TestAccountStatusesDataProvider_InvalidAr

// TestMessageIndexAccountStatusesProviderResponse_HappyPath tests that MessageIndex values in response are strictly increasing.
func (s *AccountStatusesProviderSuite) TestMessageIndexAccountStatusesProviderResponse_HappyPath() {
ctx := context.Background()
send := make(chan interface{}, 10)
topic := AccountStatusesTopic
accountStatusesCount := 4
Expand All @@ -258,7 +253,7 @@ func (s *AccountStatusesProviderSuite) TestMessageIndexAccountStatusesProviderRe

// Create the AccountStatusesDataProvider instance
provider, err := NewAccountStatusesDataProvider(
ctx,
context.Background(),
s.log,
s.api,
"dummy-id",
Expand Down
Loading
Loading