Skip to content

Commit f5ea783

Browse files
committed
uses SafeAsyncIter in "listBlocks" and in "getBlockExpirations"
1 parent ecf515d commit f5ea783

File tree

15 files changed

+178
-129
lines changed

15 files changed

+178
-129
lines changed

codex/blockexchange/engine/advertiser.nim

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
## This file may not be copied, modified, or distributed except according to
88
## those terms.
99

10+
{.push raises: [].}
11+
1012
import pkg/chronos
1113
import pkg/libp2p/cid
1214
import pkg/libp2p/multicodec
@@ -81,16 +83,12 @@ proc advertiseBlock(b: Advertiser, cid: Cid) {.async: (raises: [CancelledError])
8183
proc advertiseLocalStoreLoop(b: Advertiser) {.async: (raises: []).} =
8284
try:
8385
while b.advertiserRunning:
84-
try:
85-
if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest):
86-
trace "Advertiser begins iterating blocks..."
87-
for c in cids:
88-
if cid =? await c:
89-
await b.advertiseBlock(cid)
90-
trace "Advertiser iterating blocks finished."
91-
except CatchableError as e:
92-
error "Error in advertise local store loop", error = e.msgDetail
93-
raiseAssert("Unexpected exception in advertiseLocalStoreLoop")
86+
if cidsIter =? await b.localStore.listBlocks(blockType = BlockType.Manifest):
87+
trace "Advertiser begins iterating blocks..."
88+
for c in cidsIter:
89+
if cid =? (await cast[Future[?!Cid].Raising([CancelledError])](c)):
90+
await b.advertiseBlock(cid)
91+
trace "Advertiser iterating blocks finished."
9492

9593
await sleepAsync(b.advertiseLocalStoreLoopSleep)
9694
except CancelledError:

codex/node.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -485,7 +485,7 @@ proc iterateManifests*(self: CodexNodeRef, onManifest: OnManifest) {.async.} =
485485
return
486486

487487
for c in cidsIter:
488-
if cid =? await c:
488+
if cid =? (await cast[Future[?!Cid].Raising([CancelledError])](c)):
489489
without blk =? await self.networkStore.getBlock(cid):
490490
warn "Failed to get manifest block by cid", cid
491491
return

codex/stores/blockstore.nim

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,7 @@
77
## This file may not be copied, modified, or distributed except according to
88
## those terms.
99

10-
import pkg/upraises
11-
12-
push:
13-
{.upraises: [].}
10+
{.push raises: [].}
1411

1512
import pkg/chronos
1613
import pkg/libp2p
@@ -152,7 +149,7 @@ method hasBlock*(
152149

153150
method listBlocks*(
154151
self: BlockStore, blockType = BlockType.Manifest
155-
): Future[?!AsyncIter[?Cid]] {.base, async: (raises: [CancelledError]), gcsafe.} =
152+
): Future[?!SafeAsyncIter[Cid]] {.base, async: (raises: [CancelledError]), gcsafe.} =
156153
## Get the list of blocks in the BlockStore. This is an intensive operation
157154
##
158155

codex/stores/cachestore.nim

Lines changed: 28 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,7 @@
77
## This file may not be copied, modified, or distributed except according to
88
## those terms.
99

10-
import pkg/upraises
11-
12-
push:
13-
{.upraises: [].}
10+
{.push raises: [].}
1411

1512
import std/options
1613

@@ -142,7 +139,7 @@ func cids(self: CacheStore): (iterator (): Cid {.gcsafe.}) =
142139

143140
method listBlocks*(
144141
self: CacheStore, blockType = BlockType.Manifest
145-
): Future[?!AsyncIter[?Cid]] {.async: (raises: [CancelledError]).} =
142+
): Future[?!SafeAsyncIter[Cid]] {.async: (raises: [CancelledError]).} =
146143
## Get the list of blocks in the BlockStore. This is an intensive operation
147144
##
148145

@@ -151,36 +148,33 @@ method listBlocks*(
151148
proc isFinished(): bool =
152149
return finished(cids)
153150

154-
proc genNext(): Future[Cid] {.async.} =
155-
cids()
156-
157-
try:
158-
let iter = await (
159-
AsyncIter[Cid].new(genNext, isFinished).filter(
160-
proc(cid: Cid): Future[bool] {.async.} =
161-
without isManifest =? cid.isManifest, err:
162-
trace "Error checking if cid is a manifest", err = err.msg
163-
return false
164-
165-
case blockType
166-
of BlockType.Both:
167-
return true
168-
of BlockType.Manifest:
169-
return isManifest
170-
of BlockType.Block:
171-
return not isManifest
172-
)
173-
)
174-
175-
return success(
176-
map[Cid, ?Cid](
177-
iter,
178-
proc(cid: Cid): Future[?Cid] {.async.} =
179-
some(cid),
180-
)
151+
proc genNext(): Future[?!Cid] {.async: (raises: [CancelledError]).} =
152+
try:
153+
let cid = cids()
154+
success(cid)
155+
except Exception as err:
156+
failure(err.msg)
157+
158+
let iter = await (
159+
SafeAsyncIter[Cid].new(genNext, isFinished).filter(
160+
proc(cid: ?!Cid): Future[bool] {.async: (raises: [CancelledError]).} =
161+
without cid =? cid, err:
162+
trace "Cannot get Cid from the iterator", err = err.msg
163+
return false
164+
without isManifest =? cid.isManifest, err:
165+
trace "Error checking if cid is a manifest", err = err.msg
166+
return false
167+
168+
case blockType
169+
of BlockType.Both:
170+
return true
171+
of BlockType.Manifest:
172+
return isManifest
173+
of BlockType.Block:
174+
return not isManifest
181175
)
182-
except CatchableError as err:
183-
raiseAssert err.msg
176+
)
177+
success(iter)
184178

185179
func putBlockSync(self: CacheStore, blk: Block): bool =
186180
let blkSize = blk.data.len.NBytes # in bytes

codex/stores/maintenance.nim

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,15 @@
1010
## Store maintenance module
1111
## Looks for and removes expired blocks from blockstores.
1212

13+
{.push raises: [].}
14+
1315
import pkg/chronos
1416
import pkg/questionable
1517
import pkg/questionable/results
1618

1719
import ./repostore
1820
import ../utils/timer
19-
import ../utils/asynciter
21+
import ../utils/safeasynciter
2022
import ../clock
2123
import ../logutils
2224
import ../systemclock
@@ -81,29 +83,28 @@ proc runBlockCheck(
8183

8284
var numberReceived = 0
8385
for beFut in iter:
84-
try:
85-
let be = await beFut
86-
inc numberReceived
87-
await self.processBlockExpiration(be)
88-
await sleepAsync(1.millis) # cooperative scheduling
89-
except CatchableError as err:
90-
raiseAssert err.msg
86+
let beRes = await cast[Future[?!BlockExpiration].Raising([CancelledError])](beFut)
87+
without be =? beRes, err:
88+
trace "Unable to obtain blockExpiration from iterator"
89+
continue
90+
inc numberReceived
91+
await self.processBlockExpiration(be)
92+
await sleepAsync(1.millis) # cooperative scheduling
9193

9294
# If we received fewer blockExpirations from the iterator than we asked for,
9395
# We're at the end of the dataset and should start from 0 next time.
9496
if numberReceived < self.numberOfBlocksPerInterval:
9597
self.offset = 0
9698

9799
proc start*(self: BlockMaintainer) =
98-
proc onTimer(): Future[void] {.async.} =
100+
proc onTimer(): Future[void] {.async: (raises: []).} =
99101
try:
100102
await self.runBlockCheck()
101-
except CancelledError as error:
102-
raise error
103-
except CatchableError as exc:
104-
error "Unexpected exception in BlockMaintainer.onTimer(): ", msg = exc.msg
103+
except CancelledError as err:
104+
trace "Running block check in block maintenance timer callback cancelled: ",
105+
err = err.msg
105106

106107
self.timer.start(onTimer, self.interval)
107108

108-
proc stop*(self: BlockMaintainer): Future[void] {.async.} =
109+
proc stop*(self: BlockMaintainer): Future[void] {.async: (raises: []).} =
109110
await self.timer.stop()

codex/stores/networkstore.nim

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import ../blockexchange
1919
import ../logutils
2020
import ../merkletree
2121
import ../utils/asyncheapqueue
22-
import ../utils/asynciter
22+
import ../utils/safeasynciter
2323
import ./blockstore
2424

2525
export blockstore, blockexchange, asyncheapqueue
@@ -124,7 +124,7 @@ method ensureExpiry*(
124124

125125
method listBlocks*(
126126
self: NetworkStore, blockType = BlockType.Manifest
127-
): Future[?!AsyncIter[?Cid]] {.async: (raw: true, raises: [CancelledError]).} =
127+
): Future[?!SafeAsyncIter[Cid]] {.async: (raw: true, raises: [CancelledError]).} =
128128
self.localStore.listBlocks(blockType)
129129

130130
method delBlock*(

codex/stores/queryiterhelper.nim

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ import pkg/chronicles
55
import pkg/datastore/typedds
66

77
import ../utils/asynciter
8+
import ../utils/safeasynciter
9+
10+
{.push raises: [].}
811

912
type KeyVal*[T] = tuple[key: Key, value: T]
1013

@@ -42,6 +45,40 @@ proc toAsyncIter*[T](
4245

4346
AsyncIter[?!QueryResponse[T]].new(genNext, isFinished).success
4447

48+
proc toSafeAsyncIter*[T](
49+
queryIter: QueryIter[T], finishOnErr: bool = true
50+
): Future[?!SafeAsyncIter[QueryResponse[T]]] {.async: (raises: [CancelledError]).} =
51+
## Converts `QueryIter[T]` to `SafeAsyncIter[QueryResponse[T]]` and automatically
52+
## runs dispose whenever `QueryIter` finishes or whenever an error occurs (only
53+
## if the flag finishOnErr is set to true)
54+
##
55+
56+
if queryIter.finished:
57+
trace "Disposing iterator"
58+
if error =? (await queryIter.dispose()).errorOption:
59+
return failure(error)
60+
return success(SafeAsyncIter[QueryResponse[T]].empty())
61+
62+
var errOccurred = false
63+
64+
proc genNext(): Future[?!QueryResponse[T]] {.async: (raises: [CancelledError]).} =
65+
let queryResOrErr = await queryIter.next()
66+
67+
if queryResOrErr.isErr:
68+
errOccurred = true
69+
70+
if queryIter.finished or (errOccurred and finishOnErr):
71+
trace "Disposing iterator"
72+
if error =? (await queryIter.dispose()).errorOption:
73+
return failure(error)
74+
75+
return queryResOrErr
76+
77+
proc isFinished(): bool =
78+
queryIter.finished
79+
80+
SafeAsyncIter[QueryResponse[T]].new(genNext, isFinished).success
81+
4582
proc filterSuccess*[T](
4683
iter: AsyncIter[?!QueryResponse[T]]
4784
): Future[AsyncIter[tuple[key: Key, value: T]]] {.async: (raises: [CancelledError]).} =
@@ -62,7 +99,30 @@ proc filterSuccess*[T](
6299

63100
(key: key, value: value).some
64101

65-
try:
66-
await mapFilter[?!QueryResponse[T], KeyVal[T]](iter, mapping)
67-
except CatchableError as err:
68-
raiseAssert err.msg
102+
await mapFilter[?!QueryResponse[T], KeyVal[T]](iter, mapping)
103+
104+
proc filterSuccess*[T](
105+
iter: SafeAsyncIter[QueryResponse[T]]
106+
): Future[SafeAsyncIter[tuple[key: Key, value: T]]] {.
107+
async: (raises: [CancelledError])
108+
.} =
109+
## Filters out any items that are not success
110+
111+
proc mapping(
112+
resOrErr: ?!QueryResponse[T]
113+
): Future[Option[?!KeyVal[T]]] {.async: (raises: [CancelledError]).} =
114+
without res =? resOrErr, error:
115+
error "Error occurred when getting QueryResponse", msg = error.msg
116+
return Result[KeyVal[T], ref CatchableError].none
117+
118+
without key =? res.key:
119+
warn "No key for a QueryResponse"
120+
return Result[KeyVal[T], ref CatchableError].none
121+
122+
without value =? res.value, error:
123+
error "Error occurred when getting a value from QueryResponse", msg = error.msg
124+
return Result[KeyVal[T], ref CatchableError].none
125+
126+
some(success((key: key, value: value)))
127+
128+
await mapFilter[QueryResponse[T], KeyVal[T]](iter, mapping)

0 commit comments

Comments
 (0)