-
Notifications
You must be signed in to change notification settings - Fork 185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Access] Properly handle subscription errors in data providers #7046
base: master
Are you sure you want to change the base?
[Access] Properly handle subscription errors in data providers #7046
Conversation
Distinguish between `context.Canceled` errors originating from the streamer and those triggered by the DataProvider’s `Close()` method. Use `wasClosedByClient()` to suppress expected cancellations while propagating unexpected ones
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #7046 +/- ##
========================================
Coverage 41.27% 41.27%
========================================
Files 2170 2170
Lines 190047 190154 +107
========================================
+ Hits 78438 78484 +46
- Misses 105070 105122 +52
- Partials 6539 6548 +9
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me!
After discussing with Yurii, we agreed to use a different approach here. |
As this will changed, will re-aprove final implementation
We use it to distinguish place where cxt.Canceled error comes from. Also, I refactored each data provider's Run() function. Now it's more readable and clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this refactor needed? can you point out the important changes, it's not clear to me
Hey. I updated PR's description. Added a context of what has been done and why. |
engine/access/rest/websockets/data_providers/account_statuses_provider.go
Outdated
Show resolved
Hide resolved
engine/access/rest/websockets/data_providers/account_statuses_provider.go
Outdated
Show resolved
Hide resolved
@peterargue I also pointed out the most important lines of code |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first round of review - DONE! I have a few small comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
engine/access/rest/websockets/data_providers/account_statuses_provider.go
Outdated
Show resolved
Hide resolved
engine/access/rest/websockets/data_providers/account_statuses_provider.go
Outdated
Show resolved
Hide resolved
@@ -56,4 +63,63 @@ func (b *baseDataProvider) Arguments() models.Arguments { | |||
// No errors are expected during normal operations. | |||
func (b *baseDataProvider) Close() { | |||
b.cancel() | |||
b.closedFlag.Do(func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why was this needed? this type of pattern often points to a design problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we just use the context's Done()
channel for closedChan
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use of closedFlag
ensures that the done channel is closed exactly once, even if Close()
is called multiple times.
We expect clients to call Run()
and Close()
from different goroutines—this makes sense because Run()
is blocking, and clients need to call Close()
concurrently to stop receiving data.
Since a client might have multiple exit conditions triggering Close()
, we handle the idempotency internally for simplicity rather than requiring clients to manage it themselves. This prevents potential panics from closing an already-closed channel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed we can use a ctx over a done channel here. However, I'm not sure how to do so correctly without storing ctx in struct. Will ping you once it is ready
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, we cannot use a ctx safely. Read a PR & linked issue description for it.
As discussed at the meeting, the usage of context.Context
is possible but has disadvantages. If we use it, we may run in the case where we suppress a real error from an access node and silently shut down without notifying a user.
@peterargue I thought maybe it is still better to leave the done
channel? In this case, we fix the potential issue and we don't have to worry about it. It is basically a couple of lines of code. I think it is worth it for such a 'complex' problem. I pushed a refactored version with a done
channel. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Read a PR & linked issue description for it.
@illia-malachyn can you add a pointer to that PR/issue here.
In this case, we fix the potential issue and we don't have to worry about it.
which issue are you referring to?
I'm not clear how using a separate done channel is different from a context's done channel wrt lost errors. It seems like it's possible in both cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean this PR's description and the related issue. I'll describe it once again.
Let's assume we're in this place where we got the context.Canceled
error from a subscription.
https://github.com/onflow/flow-go/pull/7046/files#diff-5cbca3503bb00261318db4a8b8b1714447f7348a4d87e11aaf8b37b43b36e2bbR120-R128
We cannot know exactly what happened - is it a controller who initiated a shutdown or has something happened in the subscription? Depending on who initiated a shutdown, we have to behave differently. if it is a streamer's/subscription's error, we have to propagate it and react to it. If it is the controller, we wanna suppress it and return nil.
As we cannot differentiate who created this error, I suggest using a done
channel for the graceful shutdown of a provider. In such a case, we treat every ctx.Canceled
error as sth bad and react appropriately. If done
is closed, we return nil
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure the above link was created well. I'm speaking of this place in the run()
function
case value, ok := <-subscription.Channel():
if !ok {
err := subscription.Err()
if err != nil {
return fmt.Errorf("subscription finished with error: %w", err) // who cancelled the context??
}
return nil
}
```
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still may run into a race condition if 2 events happen concurrently (controller shutdowns provider and error happening in subscription/streamer) but it is fine as it is a "natural" concurrency.
With done
channel, we disambiguate this place with an error and can handle a request for shutdown gracefully when there's no concurrency but normal flow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot know exactly what happened - is it a controller who initiated a shutdown or has something happened in the subscription?
Looking over the code, I think we're over thinking this.
The only way the subscription's error is context.Canceled
is if the context passed to the streamer was canceled. That context is passed by the data provider, which originally comes from the controller. This means that if the error is context canceled, either:
- The controller called the provider's
Close()
method. - The controller's parent context was cancelled, signalling the node is shutting down.
I think a simple check if the error is context canceled in the conditional should be all that's needed.
I think it's OK for now to handle server shutdowns by gracefully shutting down the connection. Ideally, we'd signal the shutdown with an error to the user, but that can come later. One note from looking over the code, when the controller's context is canceled (server shutdown), we start dropping all messages, so any error response would be dropped anyway:
flow-go/engine/access/rest/websockets/controller.go
Lines 568 to 574 in a149dc5
func (c *Controller) writeResponse(ctx context.Context, response interface{}) { | |
select { | |
case <-ctx.Done(): | |
return | |
case c.multiplexedStream <- response: | |
} | |
} |
We still may run into a race condition if 2 events happen concurrently (controller shutdowns provider and error happening in subscription/streamer)
I don't think it's possible for there to be 2 errors happening simultaneously. The streamer is single threaded and handles each message serially. If the context is canceled, it will detect that before trying processes more data. If the subscription encountered an error at the same time as the context was canceled, the streamer would handle that error before checking the context at the beginning of the loop. If there are any codepaths that violate this, we should fix them.
my proposal is to simply use
value, ok := <-subscription.Channel()
if !ok {
err := subscription.Err()
if err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("subscription finished with error: %w", err)
}
return nil
}
...
this removes the need for the extra done channel, and we can simply call cancel in the Close()
method
- Moved creation and start of subscription and streamer to Run() function instead of having it in constructor - Ged rid of ctx in constructor. Moved it to Run() function - Refactored the structure of the base provider
engine/access/rest/websockets/data_providers/account_statuses_provider.go
Outdated
Show resolved
Hide resolved
// set to nils in case Run() called for the second time | ||
p.messageIndex = counters.NewMonotonicCounter(0) | ||
p.blocksSinceLastMessage = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These seem like defensive checks against a developer incorrectly calling Run
multiple times. If that's the concern, I think we should just ensure Run
is only called once and skip reinitializing here.
You could use an atomic bool with a CompareAndSwap
check done at the beginning of the function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, this is the question of whether we allow clients to reuse a provider after Run()
/Close()
pair. With the current code, we allow do so and it works just fine. You wanna restrict it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should not allow it.
messageIndex: counters.NewMonotonicCounter(0), | ||
blocksSinceLastMessage: 0, | ||
stateStreamApi: stateStreamApi, | ||
}, nil | ||
} | ||
|
||
// Run starts processing the subscription for events and handles responses. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Run starts processing the subscription for events and handles responses. | |
// Run starts processing the subscription for events and handles responses. | |
// Must only be called once |
engine/access/rest/websockets/data_providers/account_statuses_provider.go
Outdated
Show resolved
Hide resolved
@@ -56,4 +63,63 @@ func (b *baseDataProvider) Arguments() models.Arguments { | |||
// No errors are expected during normal operations. | |||
func (b *baseDataProvider) Close() { | |||
b.cancel() | |||
b.closedFlag.Do(func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Read a PR & linked issue description for it.
@illia-malachyn can you add a pointer to that PR/issue here.
In this case, we fix the potential issue and we don't have to worry about it.
which issue are you referring to?
I'm not clear how using a separate done channel is different from a context's done channel wrt lost errors. It seems like it's possible in both cases.
b.doneOnce.Do(func() { | ||
close(b.done) | ||
}) | ||
b.subscriptionState.cancelSubscriptionContext() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from #7046 (comment)
We expect clients to call
Run()
andClose()
from different goroutines—this makes sense becauseRun()
is blocking, and clients need to callClose()
concurrently to stop receiving data.
Since Close()
is called in a separate gorountine, it's possible for it to be called before subscriptionState
is set in Run()
. e.g. if the client disconnects while subscribing. This is likely to cause occasional crashes.
I think we need to go back to passing the context during initialization so we can initialize the cancel before returning the object.
Distinguish between
context.Canceled
errors originating from the streamer and those triggered by the DataProvider’sClose()
method.closeChan
that is used inDataProvider.Close()
method to indicateDataProvider.Run()
that users of data providers (WebSocket controller in our case) want to finish receiving data.HandleSubscription()
function is replaced byrun()
function that is aware ofcloseChan
. I made a new function for it becauseHandleSubscription()
is widely used in the access package (HandleRPCSubscription
has 22 usages atm).run()
returns nil ifcloseChan
was closed.HandleSubscription
returnedctx.Canceled
which lead to confusion asctx.Canceled
could come from 2 sources (streamer and websocket controller).sendResponse()
functions to make it more readable.Closes #7040 #7047