Skip to content

Commit 7488305

Browse files
checked exceptions in stores (#1179)
* checked exceptions in stores * makes asynciter as much exception safe as it gets * introduce "SafeAsyncIter" that uses Results and limits exceptions to cancellations * adds {.push raises: [].} to errors * uses SafeAsyncIter in "listBlocks" and in "getBlockExpirations" * simplifies safeasynciter (magic of auto) * gets rid of ugly casts * tiny fix in hte way we create raising futures in tests of safeasynciter * Removes two more casts caused by using checked exceptions * adds an extended explanation of one more complex SafeAsyncIter test * adds missing "finishOnErr" param in slice constructor of SafeAsyncIter * better fix for "Error: Exception can raise an unlisted exception: Exception" error. --------- Co-authored-by: Dmitriy Ryajov <[email protected]>
1 parent bde9873 commit 7488305

38 files changed

+1057
-270
lines changed

codex/blockexchange/engine/advertiser.nim

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
## This file may not be copied, modified, or distributed except according to
88
## those terms.
99

10+
{.push raises: [].}
11+
1012
import pkg/chronos
1113
import pkg/libp2p/cid
1214
import pkg/libp2p/multicodec
@@ -81,16 +83,12 @@ proc advertiseBlock(b: Advertiser, cid: Cid) {.async: (raises: [CancelledError])
8183
proc advertiseLocalStoreLoop(b: Advertiser) {.async: (raises: []).} =
8284
try:
8385
while b.advertiserRunning:
84-
try:
85-
if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest):
86-
trace "Advertiser begins iterating blocks..."
87-
for c in cids:
88-
if cid =? await c:
89-
await b.advertiseBlock(cid)
90-
trace "Advertiser iterating blocks finished."
91-
except CatchableError as e:
92-
error "Error in advertise local store loop", error = e.msgDetail
93-
raiseAssert("Unexpected exception in advertiseLocalStoreLoop")
86+
if cidsIter =? await b.localStore.listBlocks(blockType = BlockType.Manifest):
87+
trace "Advertiser begins iterating blocks..."
88+
for c in cidsIter:
89+
if cid =? await c:
90+
await b.advertiseBlock(cid)
91+
trace "Advertiser iterating blocks finished."
9492

9593
await sleepAsync(b.advertiseLocalStoreLoopSleep)
9694
except CancelledError:
@@ -126,8 +124,11 @@ proc start*(b: Advertiser) {.async: (raises: []).} =
126124

127125
trace "Advertiser start"
128126

129-
proc onBlock(cid: Cid) {.async.} =
130-
await b.advertiseBlock(cid)
127+
proc onBlock(cid: Cid) {.async: (raises: []).} =
128+
try:
129+
await b.advertiseBlock(cid)
130+
except CancelledError:
131+
trace "Cancelled advertise block", cid
131132

132133
doAssert(b.localStore.onBlockStored.isNone())
133134
b.localStore.onBlockStored = onBlock.some

codex/blockexchange/engine/engine.nim

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -202,8 +202,8 @@ proc downloadInternal(
202202
trace "Block download cancelled"
203203
if not handle.finished:
204204
await handle.cancelAndWait()
205-
except CatchableError as exc:
206-
warn "Error downloadloading block", exc = exc.msg
205+
except RetriesExhaustedError as exc:
206+
warn "Retries exhausted for block", address, exc = exc.msg
207207
if not handle.finished:
208208
handle.fail(exc)
209209
finally:
@@ -309,7 +309,7 @@ proc cancelBlocks(
309309
return peerCtx
310310

311311
try:
312-
let (succeededFuts, failedFuts) = await allFinishedFailed(
312+
let (succeededFuts, failedFuts) = await allFinishedFailed[BlockExcPeerCtx](
313313
toSeq(self.peers.peers.values).filterIt(it.peerHave.anyIt(it in addrs)).map(
314314
processPeer
315315
)

codex/codex.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ proc start*(s: CodexServer) {.async.} =
177177
proc stop*(s: CodexServer) {.async.} =
178178
notice "Stopping codex node"
179179

180-
let res = await noCancel allFinishedFailed(
180+
let res = await noCancel allFinishedFailed[void](
181181
@[
182182
s.restServer.stop(),
183183
s.codexNode.switch.stop(),

codex/errors.nim

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
## This file may not be copied, modified, or distributed except according to
88
## those terms.
99

10+
{.push raises: [].}
11+
1012
import std/options
1113
import std/sugar
1214
import std/sequtils
@@ -45,7 +47,7 @@ func toFailure*[T](exp: Option[T]): Result[T, ref CatchableError] {.inline.} =
4547
T.failure("Option is None")
4648
4749
proc allFinishedFailed*[T](
48-
futs: seq[Future[T]]
50+
futs: auto
4951
): Future[FinishedFailed[T]] {.async: (raises: [CancelledError]).} =
5052
## Check if all futures have finished or failed
5153
##
@@ -63,7 +65,7 @@ proc allFinishedFailed*[T](
6365
return res
6466
6567
proc allFinishedValues*[T](
66-
futs: seq[Future[T]]
68+
futs: auto
6769
): Future[?!seq[T]] {.async: (raises: [CancelledError]).} =
6870
## If all futures have finished, return corresponding values,
6971
## otherwise return failure

codex/node.nim

Lines changed: 37 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,9 @@ type
7878
CodexNodeRef* = ref CodexNode
7979

8080
OnManifest* = proc(cid: Cid, manifest: Manifest): void {.gcsafe, raises: [].}
81-
BatchProc* = proc(blocks: seq[bt.Block]): Future[?!void] {.gcsafe, raises: [].}
81+
BatchProc* = proc(blocks: seq[bt.Block]): Future[?!void] {.
82+
gcsafe, async: (raises: [CancelledError])
83+
.}
8284

8385
func switch*(self: CodexNodeRef): Switch =
8486
return self.switch
@@ -109,7 +111,9 @@ proc storeManifest*(
109111

110112
success blk
111113

112-
proc fetchManifest*(self: CodexNodeRef, cid: Cid): Future[?!Manifest] {.async.} =
114+
proc fetchManifest*(
115+
self: CodexNodeRef, cid: Cid
116+
): Future[?!Manifest] {.async: (raises: [CancelledError]).} =
113117
## Fetch and decode a manifest block
114118
##
115119

@@ -144,7 +148,7 @@ proc connect*(
144148

145149
proc updateExpiry*(
146150
self: CodexNodeRef, manifestCid: Cid, expiry: SecondsSince1970
147-
): Future[?!void] {.async.} =
151+
): Future[?!void] {.async: (raises: [CancelledError]).} =
148152
without manifest =? await self.fetchManifest(manifestCid), error:
149153
trace "Unable to fetch manifest for cid", manifestCid
150154
return failure(error)
@@ -154,7 +158,7 @@ proc updateExpiry*(
154158
self.networkStore.localStore.ensureExpiry(manifest.treeCid, it, expiry)
155159
)
156160

157-
let res = await allFinishedFailed(ensuringFutures)
161+
let res = await allFinishedFailed[?!void](ensuringFutures)
158162
if res.failure.len > 0:
159163
trace "Some blocks failed to update expiry", len = res.failure.len
160164
return failure("Some blocks failed to update expiry (" & $res.failure.len & " )")
@@ -172,7 +176,7 @@ proc fetchBatched*(
172176
batchSize = DefaultFetchBatch,
173177
onBatch: BatchProc = nil,
174178
fetchLocal = true,
175-
): Future[?!void] {.async, gcsafe.} =
179+
): Future[?!void] {.async: (raises: [CancelledError]), gcsafe.} =
176180
## Fetch blocks in batches of `batchSize`
177181
##
178182

@@ -190,7 +194,10 @@ proc fetchBatched*(
190194
if not (await address in self.networkStore) or fetchLocal:
191195
self.networkStore.getBlock(address)
192196

193-
without blockResults =? await allFinishedValues(blockFutures), err:
197+
if blockFutures.len == 0:
198+
continue
199+
200+
without blockResults =? await allFinishedValues[?!bt.Block](blockFutures), err:
194201
trace "Some blocks failed to fetch", err = err.msg
195202
return failure(err)
196203
@@ -215,7 +222,7 @@ proc fetchBatched*(
215222
batchSize = DefaultFetchBatch,
216223
onBatch: BatchProc = nil,
217224
fetchLocal = true,
218-
): Future[?!void] =
225+
): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} =
219226
## Fetch manifest in batches of `batchSize`
220227
##
221228
@@ -240,16 +247,16 @@ proc fetchDatasetAsync*(
240247
error "Unable to fetch blocks", err = err.msg
241248
except CancelledError as exc:
242249
trace "Cancelled fetching blocks", exc = exc.msg
243-
except CatchableError as exc:
244-
error "Error fetching blocks", exc = exc.msg
245250
246251
proc fetchDatasetAsyncTask*(self: CodexNodeRef, manifest: Manifest) =
247252
## Start fetching a dataset in the background.
248253
## The task will be tracked and cleaned up on node shutdown.
249254
##
250255
self.trackedFutures.track(self.fetchDatasetAsync(manifest, fetchLocal = false))
251256
252-
proc streamSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!LPStream] {.async.} =
257+
proc streamSingleBlock(
258+
self: CodexNodeRef, cid: Cid
259+
): Future[?!LPStream] {.async: (raises: [CancelledError]).} =
253260
## Streams the contents of a single block.
254261
##
255262
trace "Streaming single block", cid = cid
@@ -264,15 +271,17 @@ proc streamSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!LPStream] {.async
264271
defer:
265272
await stream.pushEof()
266273
await stream.pushData(blk.data)
267-
except CatchableError as exc:
274+
except CancelledError as exc:
275+
trace "Streaming block cancelled", cid, exc = exc.msg
276+
except LPStreamError as exc:
268277
trace "Unable to send block", cid, exc = exc.msg
269278
270279
self.trackedFutures.track(streamOneBlock())
271280
LPStream(stream).success
272281
273282
proc streamEntireDataset(
274283
self: CodexNodeRef, manifest: Manifest, manifestCid: Cid
275-
): Future[?!LPStream] {.async.} =
284+
): Future[?!LPStream] {.async: (raises: [CancelledError]).} =
276285
## Streams the contents of the entire dataset described by the manifest.
277286
##
278287
trace "Retrieving blocks from manifest", manifestCid
@@ -294,14 +303,14 @@ proc streamEntireDataset(
294303

295304
jobs.add(erasureJob())
296305

297-
jobs.add(self.fetchDatasetAsync(manifest))
306+
jobs.add(self.fetchDatasetAsync(manifest, fetchLocal = false))
298307

299308
# Monitor stream completion and cancel background jobs when done
300309
proc monitorStream() {.async: (raises: []).} =
301310
try:
302311
await stream.join()
303-
except CatchableError as exc:
304-
warn "Stream failed", exc = exc.msg
312+
except CancelledError as exc:
313+
warn "Stream cancelled", exc = exc.msg
305314
finally:
306315
await noCancel allFutures(jobs.mapIt(it.cancelAndWait))
307316

@@ -314,7 +323,7 @@ proc streamEntireDataset(
314323

315324
proc retrieve*(
316325
self: CodexNodeRef, cid: Cid, local: bool = true
317-
): Future[?!LPStream] {.async.} =
326+
): Future[?!LPStream] {.async: (raises: [CancelledError]).} =
318327
## Retrieve by Cid a single block or an entire dataset described by manifest
319328
##
320329

@@ -470,11 +479,11 @@ proc store*(
470479
return manifestBlk.cid.success
471480

472481
proc iterateManifests*(self: CodexNodeRef, onManifest: OnManifest) {.async.} =
473-
without cids =? await self.networkStore.listBlocks(BlockType.Manifest):
482+
without cidsIter =? await self.networkStore.listBlocks(BlockType.Manifest):
474483
warn "Failed to listBlocks"
475484
return
476485

477-
for c in cids:
486+
for c in cidsIter:
478487
if cid =? await c:
479488
without blk =? await self.networkStore.getBlock(cid):
480489
warn "Failed to get manifest block by cid", cid
@@ -617,7 +626,7 @@ proc onStore(
617626
slotIdx: uint64,
618627
blocksCb: BlocksCb,
619628
isRepairing: bool = false,
620-
): Future[?!void] {.async.} =
629+
): Future[?!void] {.async: (raises: [CancelledError]).} =
621630
## store data in local storage
622631
##
623632

@@ -648,13 +657,15 @@ proc onStore(
648657
trace "Slot index not in manifest", slotIdx
649658
return failure(newException(CodexError, "Slot index not in manifest"))
650659

651-
proc updateExpiry(blocks: seq[bt.Block]): Future[?!void] {.async.} =
660+
proc updateExpiry(
661+
blocks: seq[bt.Block]
662+
): Future[?!void] {.async: (raises: [CancelledError]).} =
652663
trace "Updating expiry for blocks", blocks = blocks.len
653664

654665
let ensureExpiryFutures =
655666
blocks.mapIt(self.networkStore.ensureExpiry(it.cid, expiry.toSecondsSince1970))
656667

657-
let res = await allFinishedFailed(ensureExpiryFutures)
668+
let res = await allFinishedFailed[?!void](ensureExpiryFutures)
658669
if res.failure.len > 0:
659670
trace "Some blocks failed to update expiry", len = res.failure.len
660671
return failure("Some blocks failed to update expiry (" & $res.failure.len & " )")
@@ -702,7 +713,7 @@ proc onStore(
702713

703714
proc onProve(
704715
self: CodexNodeRef, slot: Slot, challenge: ProofChallenge
705-
): Future[?!Groth16Proof] {.async.} =
716+
): Future[?!Groth16Proof] {.async: (raises: [CancelledError]).} =
706717
## Generats a proof for a given slot and challenge
707718
##
708719

@@ -758,7 +769,7 @@ proc onProve(
758769

759770
proc onExpiryUpdate(
760771
self: CodexNodeRef, rootCid: Cid, expiry: SecondsSince1970
761-
): Future[?!void] {.async.} =
772+
): Future[?!void] {.async: (raises: [CancelledError]).} =
762773
return await self.updateExpiry(rootCid, expiry)
763774

764775
proc onClear(self: CodexNodeRef, request: StorageRequest, slotIndex: uint64) =
@@ -781,12 +792,12 @@ proc start*(self: CodexNodeRef) {.async.} =
781792
slot: uint64,
782793
onBatch: BatchProc,
783794
isRepairing: bool = false,
784-
): Future[?!void] =
795+
): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} =
785796
self.onStore(request, slot, onBatch, isRepairing)
786797

787798
hostContracts.sales.onExpiryUpdate = proc(
788799
rootCid: Cid, expiry: SecondsSince1970
789-
): Future[?!void] =
800+
): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} =
790801
self.onExpiryUpdate(rootCid, expiry)
791802

792803
hostContracts.sales.onClear = proc(request: StorageRequest, slotIndex: uint64) =
@@ -795,7 +806,7 @@ proc start*(self: CodexNodeRef) {.async.} =
795806

796807
hostContracts.sales.onProve = proc(
797808
slot: Slot, challenge: ProofChallenge
798-
): Future[?!Groth16Proof] =
809+
): Future[?!Groth16Proof] {.async: (raw: true, raises: [CancelledError]).} =
799810
# TODO: generate proof
800811
self.onProve(slot, challenge)
801812

codex/rest/api.nim

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ proc retrieveCid(
7878
## manner
7979
##
8080

81-
var stream: LPStream
81+
var lpStream: LPStream
8282

8383
var bytes = 0
8484
try:
@@ -94,6 +94,8 @@ proc retrieveCid(
9494
await resp.sendBody(error.msg)
9595
return
9696

97+
lpStream = stream
98+
9799
# It is ok to fetch again the manifest because it will hit the cache
98100
without manifest =? (await node.fetchManifest(cid)), err:
99101
error "Failed to fetch manifest", err = err.msg
@@ -139,15 +141,15 @@ proc retrieveCid(
139141
codex_api_downloads.inc()
140142
except CancelledError as exc:
141143
raise exc
142-
except CatchableError as exc:
144+
except LPStreamError as exc:
143145
warn "Error streaming blocks", exc = exc.msg
144146
resp.status = Http500
145147
if resp.isPending():
146148
await resp.sendBody(exc.msg)
147149
finally:
148150
info "Sent bytes", cid = cid, bytes
149-
if not stream.isNil:
150-
await stream.close()
151+
if not lpStream.isNil:
152+
await lpStream.close()
151153

152154
proc buildCorsHeaders(
153155
httpMethod: string, allowedOrigin: Option[string]

codex/sales/salescontext.nim

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,17 @@ type
2424
slotQueue*: SlotQueue
2525
simulateProofFailures*: int
2626

27-
BlocksCb* = proc(blocks: seq[bt.Block]): Future[?!void] {.gcsafe, raises: [].}
27+
BlocksCb* = proc(blocks: seq[bt.Block]): Future[?!void] {.
28+
gcsafe, async: (raises: [CancelledError])
29+
.}
2830
OnStore* = proc(
2931
request: StorageRequest, slot: uint64, blocksCb: BlocksCb, isRepairing: bool
30-
): Future[?!void] {.gcsafe, upraises: [].}
32+
): Future[?!void] {.gcsafe, async: (raises: [CancelledError]).}
3133
OnProve* = proc(slot: Slot, challenge: ProofChallenge): Future[?!Groth16Proof] {.
32-
gcsafe, upraises: []
34+
gcsafe, async: (raises: [CancelledError])
3335
.}
3436
OnExpiryUpdate* = proc(rootCid: Cid, expiry: SecondsSince1970): Future[?!void] {.
35-
gcsafe, upraises: []
37+
gcsafe, async: (raises: [CancelledError])
3638
.}
37-
OnClear* = proc(request: StorageRequest, slotIndex: uint64) {.gcsafe, upraises: [].}
38-
OnSale* = proc(request: StorageRequest, slotIndex: uint64) {.gcsafe, upraises: [].}
39+
OnClear* = proc(request: StorageRequest, slotIndex: uint64) {.gcsafe, raises: [].}
40+
OnSale* = proc(request: StorageRequest, slotIndex: uint64) {.gcsafe, raises: [].}

codex/sales/states/downloading.nim

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ method run*(
5555
reservationId = reservation.id
5656
availabilityId = reservation.availabilityId
5757

58-
proc onBlocks(blocks: seq[bt.Block]): Future[?!void] {.async.} =
58+
proc onBlocks(
59+
blocks: seq[bt.Block]
60+
): Future[?!void] {.async: (raises: [CancelledError]).} =
5961
# release batches of blocks as they are written to disk and
6062
# update availability size
6163
var bytes: uint = 0

0 commit comments

Comments
 (0)