Skip to content

[Access] Properly handle subscription errors in data providers #7046

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
8f6ca1a
Properly handle subscription errors in data providers
illia-malachyn Feb 17, 2025
acb406d
change reciever name to match previous
illia-malachyn Feb 17, 2025
8230193
Add one shot close channel to all data providers
illia-malachyn Feb 18, 2025
24e95b8
do not spawn unnecessary routine in close()
illia-malachyn Feb 18, 2025
a5ef7f3
remove unused func
illia-malachyn Feb 19, 2025
be75dbc
null blockSinceLastMessage only if we sent a block
illia-malachyn Feb 19, 2025
dfa115e
remove unnecessary prevIndex variable
illia-malachyn Feb 21, 2025
feff7e5
Merge branch 'master' into illia-malachyn/7040-fix-context-canceled-e…
illia-malachyn Feb 21, 2025
7bcecd3
add comment for run()
illia-malachyn Feb 24, 2025
06891f8
Merge branch 'master' into illia-malachyn/7040-fix-context-canceled-e…
illia-malachyn Mar 13, 2025
41a80c5
Rework how data providers start serving data
illia-malachyn Mar 18, 2025
3b67bb3
fix comments
illia-malachyn Mar 18, 2025
cdf4d0f
Merge branch 'master' into illia-malachyn/7040-fix-context-canceled-e…
illia-malachyn Mar 18, 2025
772c40b
run code formatter
illia-malachyn Mar 18, 2025
a149dc5
Merge branch 'master' into illia-malachyn/7040-fix-context-canceled-e…
illia-malachyn Mar 19, 2025
cf95fcb
improve some comments for providers
illia-malachyn Mar 19, 2025
7351588
remove IDEs refactoring of comments
illia-malachyn Mar 19, 2025
fbab1b9
remove done channel. add comment about streamer contract for ctx.Canc…
illia-malachyn Mar 20, 2025
3c2f22f
pass ctx to provider constructor again
illia-malachyn Mar 20, 2025
6266d3b
remove subscription from provider struct
illia-malachyn Mar 20, 2025
18f0da5
Merge branch 'master' into illia-malachyn/7040-fix-context-canceled-e…
illia-malachyn Mar 20, 2025
429b961
remove unnecessary callbacks
illia-malachyn Mar 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"fmt"

"github.com/rs/zerolog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/onflow/flow-go/engine/access/rest/common"
"github.com/onflow/flow-go/engine/access/rest/common/parser"
Expand Down Expand Up @@ -90,7 +88,46 @@ func NewAccountStatusesDataProvider(
// Expected errors during normal operations:
// - context.Canceled: if the operation is canceled, during an unsubscribe action.
func (p *AccountStatusesDataProvider) Run() error {
return subscription.HandleSubscription(p.subscription, p.handleResponse())
messageIndex := counters.NewMonotonicCounter(0)
blocksSinceLastMessage := uint64(0)

return run(
p.closedChan,
p.subscription,
func(response *backend.AccountStatusesResponse) error {
return p.sendResponse(response, &messageIndex, &blocksSinceLastMessage)
},
)
}

// sendResponse processes an account statuses message and sends it to data provider's channel.
//
// No errors are expected during normal operations.
func (p *AccountStatusesDataProvider) sendResponse(
response *backend.AccountStatusesResponse,
messageIndex *counters.StrictMonotonicCounter,
blocksSinceLastMessage *uint64,
) error {
// Only send a response if there's meaningful data to send
// or the heartbeat interval limit is reached
*blocksSinceLastMessage += 1
accountEmittedEvents := len(response.AccountEvents) != 0
reachedHeartbeatLimit := *blocksSinceLastMessage >= p.heartbeatInterval
if !accountEmittedEvents && !reachedHeartbeatLimit {
return nil
}

var accountStatusesPayload models.AccountStatusesResponse
accountStatusesPayload.Build(response, messageIndex.Value())
messageIndex.Increment()

var resp models.BaseDataProvidersResponse
resp.Build(p.ID(), p.Topic(), &accountStatusesPayload)

p.send <- &resp
*blocksSinceLastMessage = 0

return nil
}

// createSubscription creates a new subscription using the specified input arguments.
Expand All @@ -106,41 +143,6 @@ func (p *AccountStatusesDataProvider) createSubscription(ctx context.Context, ar
return p.stateStreamApi.SubscribeAccountStatusesFromLatestBlock(ctx, args.Filter)
}

// handleResponse processes an account statuses and sends the formatted response.
//
// No errors are expected during normal operations.
func (p *AccountStatusesDataProvider) handleResponse() func(accountStatusesResponse *backend.AccountStatusesResponse) error {
blocksSinceLastMessage := uint64(0)
messageIndex := counters.NewMonotonicCounter(0)

return func(accountStatusesResponse *backend.AccountStatusesResponse) error {
// check if there are any events in the response. if not, do not send a message unless the last
// response was more than HeartbeatInterval blocks ago
if len(accountStatusesResponse.AccountEvents) == 0 {
blocksSinceLastMessage++
if blocksSinceLastMessage < p.heartbeatInterval {
return nil
}
}
blocksSinceLastMessage = 0

index := messageIndex.Value()
if ok := messageIndex.Set(messageIndex.Value() + 1); !ok {
return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value())
}

var accountStatusesPayload models.AccountStatusesResponse
accountStatusesPayload.Build(accountStatusesResponse, index)

var response models.BaseDataProvidersResponse
response.Build(p.ID(), p.Topic(), &accountStatusesPayload)

p.send <- &response

return nil
}
}

// parseAccountStatusesArguments validates and initializes the account statuses arguments.
func parseAccountStatusesArguments(
arguments models.Arguments,
Expand Down
66 changes: 66 additions & 0 deletions engine/access/rest/websockets/data_providers/base_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package data_providers

import (
"context"
"fmt"
"sync"

"github.com/onflow/flow-go/engine/access/rest/websockets/models"
"github.com/onflow/flow-go/engine/access/subscription"
Expand All @@ -15,6 +17,9 @@ type baseDataProvider struct {
cancel context.CancelFunc
send chan<- interface{}
subscription subscription.Subscription
// Ensures the closedChan has been closed once.
closedFlag sync.Once
closedChan chan struct{}
}

// newBaseDataProvider creates a new instance of baseDataProvider.
Expand All @@ -33,6 +38,8 @@ func newBaseDataProvider(
cancel: cancel,
send: send,
subscription: subscription,
closedFlag: sync.Once{},
closedChan: make(chan struct{}),
}
}

Expand All @@ -56,4 +63,63 @@ func (b *baseDataProvider) Arguments() models.Arguments {
// No errors are expected during normal operations.
func (b *baseDataProvider) Close() {
b.cancel()
b.closedFlag.Do(func() {
close(b.closedChan)
})
}

type sendResponseCallback[T any] func(T) error

// run reads data from a subscription and sends it to clients using the provided
// sendResponse callback. It continuously listens to the subscription's data
// channel and forwards the received values until the closedChan is closed or
// the subscription ends. It is used as a helper function for each data provider's
// Run() function.
//
// Parameters:
// - closedChan: A channel to signal the termination of the function. When closed,
// the function stops reading from the subscription and exits gracefully.
// - subscription: An instance of the Subscription interface, which provides a
// data stream through its Channel() method and an optional error through Err().
// - sendResponse: A callback function that processes and forwards the received
// data to the clients (e.g. a WebSocket controller). If the callback
// returns an error, the function terminates with that error.
//
// Returns:
// - error: If any error occurs while reading from the subscription or sending
// responses, it returns an error wrapped with additional context. If the
// closedChan is closed or the subscription ends without errors, it returns nil.
//
// Errors:
// - If the subscription ends with an error, it is wrapped and returned.
// - If a received value is not of the expected type (T), an error is returned.
// - If the sendResponse callback encounters an error, it is wrapped and returned.
func run[T any](
closedChan <-chan struct{},
subscription subscription.Subscription,
sendResponse sendResponseCallback[T],
) error {
for {
select {
case <-closedChan:
return nil
case value, ok := <-subscription.Channel():
if !ok {
if subscription.Err() != nil {
return fmt.Errorf("subscription finished with error: %w", subscription.Err())
}
return nil
}

response, ok := value.(T)
if !ok {
return fmt.Errorf("unexpected response type: %T", value)
}

err := sendResponse(response)
if err != nil {
return fmt.Errorf("error sending response: %w", err)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,19 @@ func NewBlockDigestsDataProvider(
// Expected errors during normal operations:
// - context.Canceled: if the operation is canceled, during an unsubscribe action.
func (p *BlockDigestsDataProvider) Run() error {
return subscription.HandleSubscription(
return run(
p.closedChan,
p.subscription,
subscription.HandleResponse(p.send, func(b *flow.BlockDigest) (interface{}, error) {
var block models.BlockDigest
block.Build(b)
func(b *flow.BlockDigest) error {
var blockDigest models.BlockDigest
blockDigest.Build(b)

var response models.BaseDataProvidersResponse
response.Build(
p.ID(),
p.Topic(),
&block,
)

return &response, nil
}),
response.Build(p.ID(), p.Topic(), &blockDigest)
p.send <- &response

return nil
},
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,19 @@ func NewBlockHeadersDataProvider(
// Expected errors during normal operations:
// - context.Canceled: if the operation is canceled, during an unsubscribe action.
func (p *BlockHeadersDataProvider) Run() error {
return subscription.HandleSubscription(
return run(
p.closedChan,
p.subscription,
subscription.HandleResponse(p.send, func(h *flow.Header) (interface{}, error) {
func(h *flow.Header) error {
var header commonmodels.BlockHeader
header.Build(h)

var response models.BaseDataProvidersResponse
response.Build(
p.ID(),
p.Topic(),
&header,
)

return &response, nil
}),
response.Build(p.ID(), p.Topic(), &header)
p.send <- &response

return nil
},
)
}

Expand Down
18 changes: 10 additions & 8 deletions engine/access/rest/websockets/data_providers/blocks_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,24 @@ func NewBlocksDataProvider(
// Expected errors during normal operations:
// - context.Canceled: if the operation is canceled, during an unsubscribe action.
func (p *BlocksDataProvider) Run() error {
return subscription.HandleSubscription(
return run(
p.closedChan,
p.subscription,
subscription.HandleResponse(p.send, func(b *flow.Block) (interface{}, error) {
var block commonmodels.Block
func(block *flow.Block) error {
var blockResponse commonmodels.Block

expandPayload := map[string]bool{commonmodels.ExpandableFieldPayload: true}
err := block.Build(b, nil, p.linkGenerator, p.arguments.BlockStatus, expandPayload)
err := blockResponse.Build(block, nil, p.linkGenerator, p.arguments.BlockStatus, expandPayload)
if err != nil {
return nil, fmt.Errorf("failed to build block response :%w", err)
return fmt.Errorf("failed to build block response :%w", err)
}

var response models.BaseDataProvidersResponse
response.Build(p.ID(), p.Topic(), &block)
response.Build(p.ID(), p.Topic(), &blockResponse)
p.send <- &response

return &response, nil
}),
return nil
},
)
}

Expand Down
59 changes: 30 additions & 29 deletions engine/access/rest/websockets/data_providers/events_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,42 +89,43 @@ func NewEventsDataProvider(
// Expected errors during normal operations:
// - context.Canceled: if the operation is canceled, during an unsubscribe action.
func (p *EventsDataProvider) Run() error {
return subscription.HandleSubscription(p.subscription, p.handleResponse())
}

// handleResponse processes events and sends the formatted response.
//
// No errors are expected during normal operations.
func (p *EventsDataProvider) handleResponse() func(eventsResponse *backend.EventsResponse) error {
blocksSinceLastMessage := uint64(0)
messageIndex := counters.NewMonotonicCounter(0)
blocksSinceLastMessage := uint64(0)

return func(eventsResponse *backend.EventsResponse) error {
// check if there are any events in the response. if not, do not send a message unless the last
// response was more than HeartbeatInterval blocks ago
if len(eventsResponse.Events) == 0 {
blocksSinceLastMessage++
if blocksSinceLastMessage < p.heartbeatInterval {
return nil
}
}
blocksSinceLastMessage = 0
return run(
p.closedChan,
p.subscription,
func(response *backend.EventsResponse) error {
return p.sendResponse(response, &messageIndex, &blocksSinceLastMessage)
},
)
}

index := messageIndex.Value()
if ok := messageIndex.Set(messageIndex.Value() + 1); !ok {
return fmt.Errorf("message index already incremented to: %d", messageIndex.Value())
}
func (p *EventsDataProvider) sendResponse(
eventsResponse *backend.EventsResponse,
messageIndex *counters.StrictMonotonicCounter,
blocksSinceLastMessage *uint64,
) error {
// Only send a response if there's meaningful data to send
// or the heartbeat interval limit is reached
*blocksSinceLastMessage += 1
contractEmittedEvents := len(eventsResponse.Events) != 0
reachedHeartbeatLimit := *blocksSinceLastMessage >= p.heartbeatInterval
if !contractEmittedEvents && !reachedHeartbeatLimit {
return nil
}

var eventsPayload models.EventResponse
eventsPayload.Build(eventsResponse, index)
var eventsPayload models.EventResponse
eventsPayload.Build(eventsResponse, messageIndex.Value())
messageIndex.Increment()

var response models.BaseDataProvidersResponse
response.Build(p.ID(), p.Topic(), &eventsPayload)
var response models.BaseDataProvidersResponse
response.Build(p.ID(), p.Topic(), &eventsPayload)

p.send <- &response
p.send <- &response
*blocksSinceLastMessage = 0

return nil
}
return nil
}

// createSubscription creates a new subscription using the specified input arguments.
Expand Down
Loading
Loading