Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Properly handle subscription errors in data providers #7046

Merged
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
8f6ca1a
Properly handle subscription errors in data providers
illia-malachyn Feb 17, 2025
acb406d
change reciever name to match previous
illia-malachyn Feb 17, 2025
8230193
Add one shot close channel to all data providers
illia-malachyn Feb 18, 2025
24e95b8
do not spawn unnecessary routine in close()
illia-malachyn Feb 18, 2025
a5ef7f3
remove unused func
illia-malachyn Feb 19, 2025
be75dbc
null blockSinceLastMessage only if we sent a block
illia-malachyn Feb 19, 2025
dfa115e
remove unnecessary prevIndex variable
illia-malachyn Feb 21, 2025
feff7e5
Merge branch 'master' into illia-malachyn/7040-fix-context-canceled-e…
illia-malachyn Feb 21, 2025
7bcecd3
add comment for run()
illia-malachyn Feb 24, 2025
06891f8
Merge branch 'master' into illia-malachyn/7040-fix-context-canceled-e…
illia-malachyn Mar 13, 2025
41a80c5
Rework how data providers start serving data
illia-malachyn Mar 18, 2025
3b67bb3
fix comments
illia-malachyn Mar 18, 2025
cdf4d0f
Merge branch 'master' into illia-malachyn/7040-fix-context-canceled-e…
illia-malachyn Mar 18, 2025
772c40b
run code formatter
illia-malachyn Mar 18, 2025
a149dc5
Merge branch 'master' into illia-malachyn/7040-fix-context-canceled-e…
illia-malachyn Mar 19, 2025
cf95fcb
improve some comments for providers
illia-malachyn Mar 19, 2025
7351588
remove IDEs refactoring of comments
illia-malachyn Mar 19, 2025
fbab1b9
remove done channel. add comment about streamer contract for ctx.Canc…
illia-malachyn Mar 20, 2025
3c2f22f
pass ctx to provider constructor again
illia-malachyn Mar 20, 2025
6266d3b
remove subscription from provider struct
illia-malachyn Mar 20, 2025
18f0da5
Merge branch 'master' into illia-malachyn/7040-fix-context-canceled-e…
illia-malachyn Mar 20, 2025
429b961
remove unnecessary callbacks
illia-malachyn Mar 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions engine/access/rest/websockets/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ func (c *Controller) handleSubscribe(ctx context.Context, msg models.SubscribeMe
}

// register new provider
provider, err := c.dataProviderFactory.NewDataProvider(ctx, subscriptionID.String(), msg.Topic, msg.Arguments, c.multiplexedStream)
provider, err := c.dataProviderFactory.NewDataProvider(subscriptionID.String(), msg.Topic, msg.Arguments, c.multiplexedStream)
if err != nil {
err = fmt.Errorf("error creating data provider: %w", err)
c.writeErrorResponse(
Expand All @@ -478,10 +478,8 @@ func (c *Controller) handleSubscribe(ctx context.Context, msg models.SubscribeMe
// run provider
c.dataProvidersGroup.Add(1)
go func() {
err = provider.Run()
// return the error to the client for all errors except context.Canceled.
// context.Canceled is returned during graceful shutdown of a subscription
if err != nil && !errors.Is(err, context.Canceled) {
err = provider.Run(ctx)
if err != nil {
err = fmt.Errorf("internal error: %w", err)
c.writeErrorResponse(
ctx,
Expand Down
32 changes: 16 additions & 16 deletions engine/access/rest/websockets/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ func (s *WsControllerSuite) TestSubscribeRequest() {
controller := NewWebSocketController(s.logger, s.wsConfig, conn, dataProviderFactory)

dataProviderFactory.
On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(dataProvider, nil).
Once()

done := make(chan struct{})
// data provider might finish on its own or controller will close it via Close()
dataProvider.On("Close").Return(nil).Maybe()
dataProvider.
On("Run").
On("Run", mock.Anything).
Run(func(args mock.Arguments) {
<-done
}).
Expand Down Expand Up @@ -167,7 +167,7 @@ func (s *WsControllerSuite) TestSubscribeRequest() {
controller := NewWebSocketController(s.logger, s.wsConfig, conn, dataProviderFactory)

dataProviderFactory.
On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil, fmt.Errorf("error creating data provider")).
Once()

Expand Down Expand Up @@ -206,13 +206,13 @@ func (s *WsControllerSuite) TestSubscribeRequest() {
// data provider might finish on its own or controller will close it via Close()
dataProvider.On("Close").Return(nil).Maybe()
dataProvider.
On("Run").
On("Run", mock.Anything).
Run(func(args mock.Arguments) {}).
Return(fmt.Errorf("error running data provider")).
Once()

dataProviderFactory.
On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(dataProvider, nil).
Once()

Expand Down Expand Up @@ -253,15 +253,15 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() {
controller := NewWebSocketController(s.logger, s.wsConfig, conn, dataProviderFactory)

dataProviderFactory.
On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(dataProvider, nil).
Once()

done := make(chan struct{})
// data provider might finish on its own or controller will close it via Close()
dataProvider.On("Close").Return(nil).Maybe()
dataProvider.
On("Run").
On("Run", mock.Anything).
Run(func(args mock.Arguments) {
<-done
}).
Expand Down Expand Up @@ -321,15 +321,15 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() {
controller := NewWebSocketController(s.logger, s.wsConfig, conn, dataProviderFactory)

dataProviderFactory.
On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(dataProvider, nil).
Once()

done := make(chan struct{})
// data provider might finish on its own or controller will close it via Close()
dataProvider.On("Close").Return(nil).Maybe()
dataProvider.
On("Run").
On("Run", mock.Anything).
Run(func(args mock.Arguments) {
<-done
}).
Expand Down Expand Up @@ -391,15 +391,15 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() {
controller := NewWebSocketController(s.logger, s.wsConfig, conn, dataProviderFactory)

dataProviderFactory.
On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(dataProvider, nil).
Once()

done := make(chan struct{})
// data provider might finish on its own or controller will close it via Close()
dataProvider.On("Close").Return(nil).Maybe()
dataProvider.
On("Run").
On("Run", mock.Anything).
Run(func(args mock.Arguments) {
<-done
}).
Expand Down Expand Up @@ -464,7 +464,7 @@ func (s *WsControllerSuite) TestListSubscriptions() {
controller := NewWebSocketController(s.logger, s.wsConfig, conn, dataProviderFactory)

dataProviderFactory.
On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(dataProvider, nil).
Once()

Expand All @@ -480,7 +480,7 @@ func (s *WsControllerSuite) TestListSubscriptions() {
// data provider might finish on its own or controller will close it via Close()
dataProvider.On("Close").Return(nil).Maybe()
dataProvider.
On("Run").
On("Run", mock.Anything).
Run(func(args mock.Arguments) {
<-done
}).
Expand Down Expand Up @@ -546,7 +546,7 @@ func (s *WsControllerSuite) TestSubscribeBlocks() {
controller := NewWebSocketController(s.logger, s.wsConfig, conn, dataProviderFactory)

dataProviderFactory.
On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(dataProvider, nil).
Once()

Expand Down Expand Up @@ -600,7 +600,7 @@ func (s *WsControllerSuite) TestSubscribeBlocks() {
controller := NewWebSocketController(s.logger, s.wsConfig, conn, dataProviderFactory)

dataProviderFactory.
On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(dataProvider, nil).
Once()

Expand Down Expand Up @@ -805,7 +805,7 @@ func (s *WsControllerSuite) TestControllerShutdown() {
controller := NewWebSocketController(s.logger, s.wsConfig, conn, dataProviderFactory)

dataProviderFactory.
On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(dataProvider, nil).
Once()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"fmt"

"github.com/rs/zerolog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/onflow/flow-go/engine/access/rest/common"
"github.com/onflow/flow-go/engine/access/rest/common/parser"
Expand All @@ -25,76 +23,115 @@ type accountStatusesArguments struct {
StartBlockID flow.Identifier // ID of the block to start subscription from
StartBlockHeight uint64 // Height of the block to start subscription from
Filter state_stream.AccountStatusFilter // Filter applied to events for a given subscription
HeartbeatInterval *uint64 // Maximum number of blocks message won't be sent. Nil if not set
HeartbeatInterval uint64 // Maximum number of blocks message won't be sent
}

type AccountStatusesDataProvider struct {
*baseDataProvider

logger zerolog.Logger
stateStreamApi state_stream.API

heartbeatInterval uint64
arguments accountStatusesArguments
messageIndex counters.StrictMonotonicCounter
blocksSinceLastMessage uint64
stateStreamApi state_stream.API
}

var _ DataProvider = (*AccountStatusesDataProvider)(nil)

// NewAccountStatusesDataProvider creates a new instance of AccountStatusesDataProvider.
func NewAccountStatusesDataProvider(
ctx context.Context,
logger zerolog.Logger,
stateStreamApi state_stream.API,
subscriptionID string,
topic string,
arguments models.Arguments,
rawArguments models.Arguments,
send chan<- interface{},
chain flow.Chain,
eventFilterConfig state_stream.EventFilterConfig,
heartbeatInterval uint64,
defaultHeartbeatInterval uint64,
) (*AccountStatusesDataProvider, error) {
if stateStreamApi == nil {
return nil, fmt.Errorf("this access node does not support streaming account statuses")
}

p := &AccountStatusesDataProvider{
logger: logger.With().Str("component", "account-statuses-data-provider").Logger(),
stateStreamApi: stateStreamApi,
heartbeatInterval: heartbeatInterval,
}

// Initialize arguments passed to the provider.
accountStatusesArgs, err := parseAccountStatusesArguments(arguments, chain, eventFilterConfig)
args, err := parseAccountStatusesArguments(rawArguments, chain, eventFilterConfig, defaultHeartbeatInterval)
if err != nil {
return nil, fmt.Errorf("invalid arguments for account statuses data provider: %w", err)
}
if accountStatusesArgs.HeartbeatInterval != nil {
p.heartbeatInterval = *accountStatusesArgs.HeartbeatInterval
}

subCtx, cancel := context.WithCancel(ctx)

p.baseDataProvider = newBaseDataProvider(
provider := newBaseDataProvider(
logger.With().Str("component", "account-statuses-data-provider").Logger(),
nil,
subscriptionID,
topic,
arguments,
cancel,
rawArguments,
send,
p.createSubscription(subCtx, accountStatusesArgs), // Set up a subscription to account statuses based on arguments.
)

return p, nil
return &AccountStatusesDataProvider{
baseDataProvider: provider,
arguments: args,
messageIndex: counters.NewMonotonicCounter(0),
blocksSinceLastMessage: 0,
stateStreamApi: stateStreamApi,
}, nil
}

// Run starts processing the subscription for events and handles responses.
//
// Expected errors during normal operations:
// - context.Canceled: if the operation is canceled, during an unsubscribe action.
func (p *AccountStatusesDataProvider) Run() error {
return subscription.HandleSubscription(p.subscription, p.handleResponse())
func (p *AccountStatusesDataProvider) Run(ctx context.Context) error {
// we read data from the subscription and send them to client's channel
ctx, cancel := context.WithCancel(ctx)
defer cancel()
p.subscriptionState = newSubscriptionState(cancel, p.createAndStartSubscription(ctx, p.arguments))

// set to nils in case Run() called for the second time
p.messageIndex = counters.NewMonotonicCounter(0)
p.blocksSinceLastMessage = 0

return run(
p.baseDataProvider.done,
p.subscriptionState.subscription,
func(response *backend.AccountStatusesResponse) error {
return p.sendResponse(response, &p.messageIndex, &p.blocksSinceLastMessage)
},
)
}

// sendResponse processes an account statuses message and sends it to data provider's channel.
// This function is not expected to be called concurrently.
//
// No errors are expected during normal operations.
func (p *AccountStatusesDataProvider) sendResponse(
response *backend.AccountStatusesResponse,
messageIndex *counters.StrictMonotonicCounter,
blocksSinceLastMessage *uint64,
) error {
// Only send a response if there's meaningful data to send
// or the heartbeat interval limit is reached
*blocksSinceLastMessage += 1
accountEmittedEvents := len(response.AccountEvents) != 0
reachedHeartbeatLimit := *blocksSinceLastMessage >= p.arguments.HeartbeatInterval
if !accountEmittedEvents && !reachedHeartbeatLimit {
return nil
}

var accountStatusesPayload models.AccountStatusesResponse
accountStatusesPayload.Build(response, messageIndex.Value())
messageIndex.Increment()

var resp models.BaseDataProvidersResponse
resp.Build(p.ID(), p.Topic(), &accountStatusesPayload)

p.send <- &resp
*blocksSinceLastMessage = 0

return nil
}

// createSubscription creates a new subscription using the specified input arguments.
func (p *AccountStatusesDataProvider) createSubscription(ctx context.Context, args accountStatusesArguments) subscription.Subscription {
// createAndStartSubscription creates a new subscription using the specified input arguments.
func (p *AccountStatusesDataProvider) createAndStartSubscription(ctx context.Context, args accountStatusesArguments) subscription.Subscription {
if args.StartBlockID != flow.ZeroID {
return p.stateStreamApi.SubscribeAccountStatusesFromStartBlockID(ctx, args.StartBlockID, args.Filter)
}
Expand All @@ -106,46 +143,12 @@ func (p *AccountStatusesDataProvider) createSubscription(ctx context.Context, ar
return p.stateStreamApi.SubscribeAccountStatusesFromLatestBlock(ctx, args.Filter)
}

// handleResponse processes an account statuses and sends the formatted response.
//
// No errors are expected during normal operations.
func (p *AccountStatusesDataProvider) handleResponse() func(accountStatusesResponse *backend.AccountStatusesResponse) error {
blocksSinceLastMessage := uint64(0)
messageIndex := counters.NewMonotonicCounter(0)

return func(accountStatusesResponse *backend.AccountStatusesResponse) error {
// check if there are any events in the response. if not, do not send a message unless the last
// response was more than HeartbeatInterval blocks ago
if len(accountStatusesResponse.AccountEvents) == 0 {
blocksSinceLastMessage++
if blocksSinceLastMessage < p.heartbeatInterval {
return nil
}
}
blocksSinceLastMessage = 0

index := messageIndex.Value()
if ok := messageIndex.Set(messageIndex.Value() + 1); !ok {
return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value())
}

var accountStatusesPayload models.AccountStatusesResponse
accountStatusesPayload.Build(accountStatusesResponse, index)

var response models.BaseDataProvidersResponse
response.Build(p.ID(), p.Topic(), &accountStatusesPayload)

p.send <- &response

return nil
}
}

// parseAccountStatusesArguments validates and initializes the account statuses arguments.
func parseAccountStatusesArguments(
arguments models.Arguments,
chain flow.Chain,
eventFilterConfig state_stream.EventFilterConfig,
defaultHeartbeatInterval uint64,
) (accountStatusesArguments, error) {
allowedFields := []string{
"start_block_id",
Expand Down Expand Up @@ -192,19 +195,20 @@ func parseAccountStatusesArguments(
}
}

var heartbeatInterval uint64
if heartbeatIntervalIn, ok := arguments["heartbeat_interval"]; ok && heartbeatIntervalIn != "" {
result, ok := heartbeatIntervalIn.(string)
if !ok {
return accountStatusesArguments{}, fmt.Errorf("'heartbeat_interval' must be a string")
}

heartbeatInterval, err = util.ToUint64(result)
heartbeatInterval, err := util.ToUint64(result)
if err != nil {
return accountStatusesArguments{}, fmt.Errorf("invalid 'heartbeat_interval': %w", err)
}

args.HeartbeatInterval = &heartbeatInterval
args.HeartbeatInterval = heartbeatInterval
} else {
args.HeartbeatInterval = defaultHeartbeatInterval
}

// Initialize the event filter with the parsed arguments
Expand Down
Loading
Loading