Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Properly handle subscription errors in data providers #7046

Merged
Merged
Changes from 8 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
8f6ca1a
Properly handle subscription errors in data providers
illia-malachyn Feb 17, 2025
acb406d
change reciever name to match previous
illia-malachyn Feb 17, 2025
8230193
Add one shot close channel to all data providers
illia-malachyn Feb 18, 2025
24e95b8
do not spawn unnecessary routine in close()
illia-malachyn Feb 18, 2025
a5ef7f3
remove unused func
illia-malachyn Feb 19, 2025
be75dbc
null blockSinceLastMessage only if we sent a block
illia-malachyn Feb 19, 2025
dfa115e
remove unnecessary prevIndex variable
illia-malachyn Feb 21, 2025
feff7e5
Merge branch 'master' into illia-malachyn/7040-fix-context-canceled-e…
illia-malachyn Feb 21, 2025
7bcecd3
add comment for run()
illia-malachyn Feb 24, 2025
06891f8
Merge branch 'master' into illia-malachyn/7040-fix-context-canceled-e…
illia-malachyn Mar 13, 2025
41a80c5
Rework how data providers start serving data
illia-malachyn Mar 18, 2025
3b67bb3
fix comments
illia-malachyn Mar 18, 2025
cdf4d0f
Merge branch 'master' into illia-malachyn/7040-fix-context-canceled-e…
illia-malachyn Mar 18, 2025
772c40b
run code formatter
illia-malachyn Mar 18, 2025
a149dc5
Merge branch 'master' into illia-malachyn/7040-fix-context-canceled-e…
illia-malachyn Mar 19, 2025
cf95fcb
improve some comments for providers
illia-malachyn Mar 19, 2025
7351588
remove IDEs refactoring of comments
illia-malachyn Mar 19, 2025
fbab1b9
remove done channel. add comment about streamer contract for ctx.Canc…
illia-malachyn Mar 20, 2025
3c2f22f
pass ctx to provider constructor again
illia-malachyn Mar 20, 2025
6266d3b
remove subscription from provider struct
illia-malachyn Mar 20, 2025
18f0da5
Merge branch 'master' into illia-malachyn/7040-fix-context-canceled-e…
illia-malachyn Mar 20, 2025
429b961
remove unnecessary callbacks
illia-malachyn Mar 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -5,8 +5,6 @@ import (
"fmt"

"github.com/rs/zerolog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/onflow/flow-go/engine/access/rest/common/parser"
"github.com/onflow/flow-go/engine/access/rest/http/request"
@@ -83,7 +81,48 @@ func NewAccountStatusesDataProvider(
//
// No errors are expected during normal operations.
func (p *AccountStatusesDataProvider) Run() error {
return subscription.HandleSubscription(p.subscription, p.handleResponse())
messageIndex := counters.NewMonotonicCounter(0)
blocksSinceLastMessage := uint64(0)

return run(
p.closedChan,
p.subscription,
func(response *backend.AccountStatusesResponse) error {
return p.sendResponse(response, &messageIndex, &blocksSinceLastMessage)
},
)
}

// sendResponse processes an account statuses message and sends it to data provider's channel.
//
// No errors are expected during normal operations.
func (p *AccountStatusesDataProvider) sendResponse(
response *backend.AccountStatusesResponse,
messageIndex *counters.StrictMonotonicCounter,
blocksSinceLastMessage *uint64,
) error {
// Only send a response if there's meaningful data to send.
// The block counter increments until either:
// 1. The account emits events
// 2. The heartbeat interval is reached
*blocksSinceLastMessage += 1
accountEmittedEvents := len(response.AccountEvents) != 0
reachedHeartbeatLimit := *blocksSinceLastMessage >= p.heartbeatInterval
if !accountEmittedEvents && !reachedHeartbeatLimit {
return nil
}

var accountStatusesPayload models.AccountStatusesResponse
defer messageIndex.Increment()
accountStatusesPayload.Build(response, messageIndex.Value())

var resp models.BaseDataProvidersResponse
resp.Build(p.ID(), p.Topic(), &accountStatusesPayload)

p.send <- &resp
*blocksSinceLastMessage = 0

return nil
}

// createSubscription creates a new subscription using the specified input arguments.
@@ -99,41 +138,6 @@ func (p *AccountStatusesDataProvider) createSubscription(ctx context.Context, ar
return p.stateStreamApi.SubscribeAccountStatusesFromLatestBlock(ctx, args.Filter)
}

// handleResponse processes an account statuses and sends the formatted response.
//
// No errors are expected during normal operations.
func (p *AccountStatusesDataProvider) handleResponse() func(accountStatusesResponse *backend.AccountStatusesResponse) error {
blocksSinceLastMessage := uint64(0)
messageIndex := counters.NewMonotonicCounter(0)

return func(accountStatusesResponse *backend.AccountStatusesResponse) error {
// check if there are any events in the response. if not, do not send a message unless the last
// response was more than HeartbeatInterval blocks ago
if len(accountStatusesResponse.AccountEvents) == 0 {
blocksSinceLastMessage++
if blocksSinceLastMessage < p.heartbeatInterval {
return nil
}
}
blocksSinceLastMessage = 0

index := messageIndex.Value()
if ok := messageIndex.Set(messageIndex.Value() + 1); !ok {
return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value())
}

var accountStatusesPayload models.AccountStatusesResponse
accountStatusesPayload.Build(accountStatusesResponse, index)

var response models.BaseDataProvidersResponse
response.Build(p.ID(), p.Topic(), &accountStatusesPayload)

p.send <- &response

return nil
}
}

// parseAccountStatusesArguments validates and initializes the account statuses arguments.
func parseAccountStatusesArguments(
arguments models.Arguments,
42 changes: 42 additions & 0 deletions engine/access/rest/websockets/data_providers/base_provider.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,8 @@ package data_providers

import (
"context"
"fmt"
"sync"

"github.com/onflow/flow-go/engine/access/rest/websockets/models"
"github.com/onflow/flow-go/engine/access/subscription"
@@ -15,6 +17,9 @@ type baseDataProvider struct {
cancel context.CancelFunc
send chan<- interface{}
subscription subscription.Subscription
// Ensures the closedChan has been closed once.
closedFlag sync.Once
closedChan chan struct{}
}

// newBaseDataProvider creates a new instance of baseDataProvider.
@@ -33,6 +38,8 @@ func newBaseDataProvider(
cancel: cancel,
send: send,
subscription: subscription,
closedFlag: sync.Once{},
closedChan: make(chan struct{}, 1),
}
}

@@ -56,4 +63,39 @@ func (b *baseDataProvider) Arguments() models.Arguments {
// No errors are expected during normal operations.
func (b *baseDataProvider) Close() {
b.cancel()
b.closedFlag.Do(func() {
close(b.closedChan)
})
}

type sendResponseCallback[T any] func(T) error

func run[T any](
closedChan <-chan struct{},
subscription subscription.Subscription,
sendResponse sendResponseCallback[T],
) error {
for {
select {
case <-closedChan:
return nil
case value, ok := <-subscription.Channel():
if !ok {
if subscription.Err() != nil {
return fmt.Errorf("subscription finished with error: %w", subscription.Err())
}
return nil
}

response, ok := value.(T)
if !ok {
return fmt.Errorf("unexpected response type: %T", value)
}

err := sendResponse(response)
if err != nil {
return fmt.Errorf("error sending response: %w", err)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -61,21 +61,23 @@ func NewBlockDigestsDataProvider(
//
// No errors are expected during normal operations.
func (p *BlockDigestsDataProvider) Run() error {
return subscription.HandleSubscription(
return run(
p.closedChan,
p.subscription,
subscription.HandleResponse(p.send, func(b *flow.BlockDigest) (interface{}, error) {
var block models.BlockDigest
block.Build(b)
func(b *flow.BlockDigest) error {
var blockDigest models.BlockDigest
blockDigest.Build(b)

var response models.BaseDataProvidersResponse
response.Build(
p.ID(),
p.Topic(),
&block,
&blockDigest,
)
p.send <- &response

return &response, nil
}),
return nil
},
)
}

Original file line number Diff line number Diff line change
@@ -62,9 +62,10 @@ func NewBlockHeadersDataProvider(
//
// No errors are expected during normal operations.
func (p *BlockHeadersDataProvider) Run() error {
return subscription.HandleSubscription(
return run(
p.closedChan,
p.subscription,
subscription.HandleResponse(p.send, func(h *flow.Header) (interface{}, error) {
func(h *flow.Header) error {
var header commonmodels.BlockHeader
header.Build(h)

@@ -74,9 +75,10 @@ func (p *BlockHeadersDataProvider) Run() error {
p.Topic(),
&header,
)
p.send <- &response

return &response, nil
}),
return nil
},
)
}

18 changes: 10 additions & 8 deletions engine/access/rest/websockets/data_providers/blocks_provider.go
Original file line number Diff line number Diff line change
@@ -76,22 +76,24 @@ func NewBlocksDataProvider(
//
// No errors are expected during normal operations.
func (p *BlocksDataProvider) Run() error {
return subscription.HandleSubscription(
return run(
p.closedChan,
p.subscription,
subscription.HandleResponse(p.send, func(b *flow.Block) (interface{}, error) {
var block commonmodels.Block
func(block *flow.Block) error {
var blockResponse commonmodels.Block

expandPayload := map[string]bool{commonmodels.ExpandableFieldPayload: true}
err := block.Build(b, nil, p.linkGenerator, p.arguments.BlockStatus, expandPayload)
err := blockResponse.Build(block, nil, p.linkGenerator, p.arguments.BlockStatus, expandPayload)
if err != nil {
return nil, fmt.Errorf("failed to build block response :%w", err)
return fmt.Errorf("failed to build block response :%w", err)
}

var response models.BaseDataProvidersResponse
response.Build(p.ID(), p.Topic(), &block)
response.Build(p.ID(), p.Topic(), &blockResponse)
p.send <- &response

return &response, nil
}),
return nil
},
)
}

61 changes: 32 additions & 29 deletions engine/access/rest/websockets/data_providers/events_provider.go
Original file line number Diff line number Diff line change
@@ -82,42 +82,45 @@ func NewEventsDataProvider(
//
// No errors are expected during normal operations.
func (p *EventsDataProvider) Run() error {
return subscription.HandleSubscription(p.subscription, p.handleResponse())
}

// handleResponse processes events and sends the formatted response.
//
// No errors are expected during normal operations.
func (p *EventsDataProvider) handleResponse() func(eventsResponse *backend.EventsResponse) error {
blocksSinceLastMessage := uint64(0)
messageIndex := counters.NewMonotonicCounter(0)
blocksSinceLastMessage := uint64(0)

return func(eventsResponse *backend.EventsResponse) error {
// check if there are any events in the response. if not, do not send a message unless the last
// response was more than HeartbeatInterval blocks ago
if len(eventsResponse.Events) == 0 {
blocksSinceLastMessage++
if blocksSinceLastMessage < p.heartbeatInterval {
return nil
}
}
blocksSinceLastMessage = 0
return run(
p.closedChan,
p.subscription,
func(response *backend.EventsResponse) error {
return p.sendResponse(response, &messageIndex, &blocksSinceLastMessage)
},
)
}

index := messageIndex.Value()
if ok := messageIndex.Set(messageIndex.Value() + 1); !ok {
return fmt.Errorf("message index already incremented to: %d", messageIndex.Value())
}
func (p *EventsDataProvider) sendResponse(
eventsResponse *backend.EventsResponse,
messageIndex *counters.StrictMonotonicCounter,
blocksSinceLastMessage *uint64,
) error {
// Only send a response if there's meaningful data to send.
// The block counter increments until either:
// 1. The contract emits events
// 2. The heartbeat interval is reached
*blocksSinceLastMessage += 1
contractEmittedEvents := len(eventsResponse.Events) != 0
reachedHeartbeatLimit := *blocksSinceLastMessage >= p.heartbeatInterval
if !contractEmittedEvents && !reachedHeartbeatLimit {
return nil
}

var eventsPayload models.EventResponse
eventsPayload.Build(eventsResponse, index)
var eventsPayload models.EventResponse
defer messageIndex.Increment()
eventsPayload.Build(eventsResponse, messageIndex.Value())

var response models.BaseDataProvidersResponse
response.Build(p.ID(), p.Topic(), &eventsPayload)
var response models.BaseDataProvidersResponse
response.Build(p.ID(), p.Topic(), &eventsPayload)

p.send <- &response
p.send <- &response
*blocksSinceLastMessage = 0

return nil
}
return nil
}

// createSubscription creates a new subscription using the specified input arguments.
Loading