-
Notifications
You must be signed in to change notification settings - Fork 31
checked exceptions in stores #1179
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
dd3b320
to
a1b3846
Compare
91770af
to
1694a76
Compare
1694a76
to
4d74fff
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall this looks great, thanks for puting in the work!
Ok, I think it is safe to replace AsyncIter
with SafeAsyncIter
as it stands. This might happen in a follow up PR, to keep things moving.
Other than that and a few questions I have, this looks good.
@@ -63,7 +65,7 @@ proc allFinishedFailed*[T]( | |||
return res | |||
|
|||
proc allFinishedValues*[T]( | |||
futs: seq[Future[T]] | |||
futs: auto |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, why auto
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to support "raising" futures as arguments of generic procs. I cover it in the section "Generic Types" in the presentation I was showing during offsite: https://drive.proton.me/urls/Q562MNN8VG#6iy1sTyjagcW.
We cannot explicitly specify a generic raising future type as an argument and this function is used in fetchBatched
(codex/node.nim
):
while not iter.finished:
let blockFutures = collect:
for i in 0 ..< batchSize:
if not iter.finished:
let address = BlockAddress.init(cid, iter.next())
if not (await address in self.networkStore) or fetchLocal:
self.networkStore.getBlock(address)
if blockFutures.len == 0:
continue
without blockResults =? await allFinishedValues[?!bt.Block](blockFutures), err:
trace "Some blocks failed to fetch", err = err.msg
return failure(err)
Here, blockFutures
is a sequence of raising futures: seq[InternalRaisesFuture[Result[blocktype.Block, ref CatchableError], (CancelledError,)]]
. If we had seq[Future[T]]
instead of auto
in allFinishedValues
, we would get the following compilation error:
Error: type mismatch: got <seq[InternalRaisesFuture[Result[blocktype.Block, ref CatchableError], (CancelledError,)]]>
but expected one of:
proc (futs: seq[Future[Result[blocktype.Block, ref CatchableError]]]): InternalRaisesFuture[Result[seq[Result[blocktype.Block, ref CatchableError]], ref CatchableError], (CancelledError,)]{.raises: [], gcsafe.}
stack trace: (most recent call last)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this is fine, but 🤔
some(cid), | ||
) | ||
) | ||
success(iter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we need the map anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The map
was just converting the yielded value to an optional because advertiser expects an Option[Cid]
to be yielded. SafeAsyncIter
yields a Result
and so it can be directly used by the advertiser, and so mapping is not needed anymore. Also, in the original version, mapping was happening after filtering, and if there were something unexpected happening while iterating over self.cache.keys
it would result in an exception. With SafeAsyncIter
, if there is any exception while iterating it will result in a failure, which would then be detected by the advertiser. Because by default SafeAsyncIter
stops on failure, we maintain the same semantics as before where an exception would interrupt the flow.
proc mapFilter*[T, U]( | ||
iter: SafeAsyncIter[T], | ||
mapPredicate: SafeFunction[?!T, Option[?!U]], | ||
finishOnErr: bool = true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this flag now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean finishOnErr
? If so, this is because, previously an exception would be thrown if yielding next item raises. In SafeAsyncIter
, next
is not allowed to throw anything but CancelledError
and the error is communicated by returning failure
. This flag allows the user to choose what to do with that failure - should the failure be just yielded and allow iterator to continue, or shall it stop (the default).
I think this is good to go right now. |
Chasing one more approval... I asked @gmega, will see @markspanbroek tomorrow... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this a lot, thanks @marcinczenko! Just a few comments below.
) | ||
|
||
proc cancelFut(): Future[void] {.async.} = | ||
await sleepAsync(100.millis) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, not sure if we want to make this non-deterministic with a sleep
, perhaps it's better to use another future for signaling, which makes things less prone to intermittent failures?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, perhaps sleep is a bit too risky in that test. There will be follow-up on removing remaining references to the old async iter, and then I get back to it and use something more deterministic. I will then merge this one.
I think we can merge this, but I would address the non-deterministic sleep in #1179 (comment) (could be a follow up PR). |
While working on BitTorrent stuff and trying to keep exceptions checked, I continuously hit the wall of set of "stores" which mostly do not use anything but
.async.
.This PR adds checked exceptions to all stores and dependent abstractions.
To keep the PR still maintainable it focuses on the "stores", thus it has pretty much a minimal set of changes to get them compile.
Most of the changes are easy and syntactical, but there were a few exceptions where it was a big pain to move forward.
It all boils down to
AsynIter
, most probably created beforeasync: (raises: [CancelledError])
era, that makes checking exceptions much harder where it is used.The only way to get it clean was for me to create a version of AsyncIter (called
SafeAsyncIter
) that usesResults
and only ever allows forasync: (raises: [CancelledError])
. I applied it tolistBlocks
andgetBlockExpirations
and doing this I could remove problematic situations (especially in block maintenance).SafeAsynIter
has the same API asAsynIter
and requires very little work (pretty much syntactical) to use it in place ofAsyncIter
, which is still used in some other places, which can be dealt with in a separate PR to keep this one small.To get some insights about challenges that you may encounter when working with checked (raising) exception, you may like to check the following note: https://publish.obsidian.md/bittorrent/10+Notes/Async+checked+(raising)+exceptions.