Skip to content

store: performance improvements for repo store #1173

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 20 additions & 17 deletions codex/node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ logScope:
topics = "codex node"

const DefaultFetchBatch = 10
const DefaultLeafBatch = 100

type
Contracts* =
Expand Down Expand Up @@ -345,19 +346,9 @@ proc deleteEntireDataset(self: CodexNodeRef, cid: Cid): Future[?!void] {.async.}
without manifest =? Manifest.decode(manifestBlock), err:
return failure(err)

let runtimeQuota = initDuration(milliseconds = 100)
var lastIdle = getTime()
for i in 0 ..< manifest.blocksCount:
if (getTime() - lastIdle) >= runtimeQuota:
await idleAsync()
lastIdle = getTime()

if err =? (await store.delBlock(manifest.treeCid, i)).errorOption:
# The contract for delBlock is fuzzy, but we assume that if the block is
# simply missing we won't get an error. This is a best effort operation and
# can simply be retried.
error "Failed to delete block within dataset", index = i, err = err.msg
return failure(err)
if err =? (await store.delBlocks(manifest.treeCid, manifest.blocksCount)).errorOption:
error "Error deleting blocks", err = err.msg
return failure(err)

if err =? (await store.delBlock(cid)).errorOption:
error "Error deleting manifest block", err = err.msg
Expand Down Expand Up @@ -399,6 +390,8 @@ proc store*(
dataCodec = BlockCodec
chunker = LPStreamChunker.new(stream, chunkSize = blockSize)

var proofs = newSeq[CodexProof]()

var cids: seq[Cid]

try:
Expand Down Expand Up @@ -430,13 +423,23 @@ proc store*(
without treeCid =? tree.rootCid(CIDv1, dataCodec), err:
return failure(err)

var
batch = newSeq[(Cid, CodexProof)]()
batchStartIndex = 0

for index, cid in cids:
without proof =? tree.getProof(index), err:
return failure(err)
if err =?
(await self.networkStore.putCidAndProof(treeCid, index, cid, proof)).errorOption:
# TODO add log here
return failure(err)
batch.add((cid, proof))

if batch.len >= DefaultLeafBatch or index == cids.len - 1:
if err =? (
await self.networkStore.putCidAndProofBatch(treeCid, batchStartIndex, batch)
).errorOption:
# TODO add log here
return failure(err)
batch.setLen(0)
batchStartIndex = index + 1

let manifest = Manifest.new(
treeCid = treeCid,
Expand Down
2 changes: 1 addition & 1 deletion codex/rest/api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute
router.api(MethodGet, "/api/codex/v1/space") do() -> RestApiResponse:
let json =
%RestRepoStore(
totalBlocks: repoStore.totalBlocks,
totalBlocks: repoStore.storageStats.totalBlocks,
quotaMaxBytes: repoStore.quotaMaxBytes,
quotaUsedBytes: repoStore.quotaUsedBytes,
quotaReservedBytes: repoStore.quotaReservedBytes,
Expand Down
15 changes: 11 additions & 4 deletions codex/slots/builder/builder.nim
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ proc buildSlot*[T, H](
slotIndex = slotIndex

trace "Building slot tree"
var
batch = newSeq[(Cid, CodexProof)]()
batchStartIndex = 0

without tree =? (await self.buildSlotTree(slotIndex)) and
treeCid =? tree.root .? toSlotCid, err:
Expand All @@ -240,10 +243,14 @@ proc buildSlot*[T, H](
error "Failed to get proof for slot tree", err = err.msg
return failure(err)

if err =?
(await self.store.putCidAndProof(treeCid, i, cellCid, encodableProof)).errorOption:
error "Failed to store slot tree", err = err.msg
return failure(err)
batch.add((cellCid, encodableProof))

if batch.len >= 50 or i == tree.leaves.len - 1:
if err =? (await self.store.putCidAndProofBatch(treeCid, batchStartIndex, batch)).errorOption:
error "Failed to store slot tree", err = err.msg
return failure(err)
batch.setLen(0)
batchStartIndex = i + 1

tree.root()

Expand Down
16 changes: 16 additions & 0 deletions codex/stores/blockstore.nim
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ method putCidAndProof*(

raiseAssert("putCidAndProof not implemented!")

method putCidAndProofBatch*(
self: BlockStore, treeCid: Cid, startIndex: int, entries: seq[(Cid, CodexProof)]
): Future[?!void] {.base, gcsafe.} =
## Put a batch of block proofs to the blockstore
##

raiseAssert("putCidAndProofBatch not implemented!")

method getCidAndProof*(
self: BlockStore, treeCid: Cid, index: Natural
): Future[?!(Cid, CodexProof)] {.base, gcsafe.} =
Expand Down Expand Up @@ -127,6 +135,14 @@ method delBlock*(

raiseAssert("delBlock not implemented!")

method delBlocks*(
self: BlockStore, treeCid: Cid, count: int
): Future[?!void] {.base, gcsafe.} =
## Delete a block from the blockstore
##

raiseAssert("delBlock not implemented!")

method hasBlock*(self: BlockStore, cid: Cid): Future[?!bool] {.base, gcsafe.} =
## Check if the block exists in the blockstore
##
Expand Down
12 changes: 12 additions & 0 deletions codex/stores/networkstore.nim
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ method putBlock*(
await self.engine.resolveBlocks(@[blk])
return success()

method putCidAndProofBatch*(
self: NetworkStore, treeCid: Cid, startIndex: int, entries: seq[(Cid, CodexProof)]
): Future[?!void] =
self.localStore.putCidAndProofBatch(treeCid, startIndex, entries)

method putCidAndProof*(
self: NetworkStore, treeCid: Cid, index: Natural, blockCid: Cid, proof: CodexProof
): Future[?!void] =
Expand Down Expand Up @@ -128,6 +133,13 @@ method delBlock*(self: NetworkStore, cid: Cid): Future[?!void] =
trace "Deleting block from network store", cid
return self.localStore.delBlock(cid)

method delBlocks*(self: NetworkStore, treeCid: Cid, count: int): Future[?!void] =
## Delete a block from the blockstore
##

trace "Deleting blocks from network store", treeCid
return self.localStore.delBlocks(treeCid, count)

{.pop.}

method hasBlock*(self: NetworkStore, cid: Cid): Future[?!bool] {.async.} =
Expand Down
4 changes: 2 additions & 2 deletions codex/stores/repostore/coders.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import ../../errors
import ../../merkletree
import ../../utils/json

proc encode*(t: QuotaUsage): seq[byte] =
proc encode*(t: StorageStats): seq[byte] =
t.toJson().toBytes()

proc decode*(T: type QuotaUsage, bytes: seq[byte]): ?!T =
proc decode*(T: type StorageStats, bytes: seq[byte]): ?!T =
T.fromJson(bytes)

proc encode*(t: BlockMetadata): seq[byte] =
Expand Down
110 changes: 75 additions & 35 deletions codex/stores/repostore/operations.nim
Original file line number Diff line number Diff line change
Expand Up @@ -82,57 +82,97 @@ proc getLeafMetadata*(

success(leafMd)

proc updateTotalBlocksCount*(
self: RepoStore, plusCount: Natural = 0, minusCount: Natural = 0
): Future[?!void] {.async.} =
await self.metaDs.modify(
CodexTotalBlocksKey,
proc(maybeCurrCount: ?Natural): Future[?Natural] {.async.} =
let count: Natural =
if currCount =? maybeCurrCount:
currCount + plusCount - minusCount
else:
plusCount - minusCount

self.totalBlocks = count
codex_repostore_blocks.set(count.int64)
count.some,
)

proc updateQuotaUsage*(
proc updateQuotaAndBlockCount*(
self: RepoStore,
plusCount: Natural = 0,
minusCount: Natural = 0,
plusUsed: NBytes = 0.NBytes,
minusUsed: NBytes = 0.NBytes,
plusReserved: NBytes = 0.NBytes,
minusReserved: NBytes = 0.NBytes,
): Future[?!void] {.async.} =
await self.metaDs.modify(
QuotaUsedKey,
proc(maybeCurrUsage: ?QuotaUsage): Future[?QuotaUsage] {.async.} =
var usage: QuotaUsage

if currUsage =? maybeCurrUsage:
usage = QuotaUsage(
used: currUsage.used + plusUsed - minusUsed,
reserved: currUsage.reserved + plusReserved - minusReserved,
CodexTotalBlocksKey,
proc(maybeCurrStats: ?StorageStats): Future[?StorageStats] {.async.} =
var stats: StorageStats
if currStats =? maybeCurrStats:
stats = StorageStats(
quotaUsed: currStats.quotaUsed + plusUsed - minusUsed,
quotaReserved: currStats.quotaReserved + plusReserved - minusReserved,
totalBlocks: currStats.totalBlocks + plusCount - minusCount,
)
else:
usage =
QuotaUsage(used: plusUsed - minusUsed, reserved: plusReserved - minusReserved)
stats = StorageStats(
quotaUsed: plusUsed - minusUsed,
quotaReserved: plusReserved - minusReserved,
totalBlocks: plusCount - minusCount,
)

if usage.used + usage.reserved > self.quotaMaxBytes:
if stats.quotaUsed + stats.quotaReserved > self.quotaMaxBytes:
raise newException(
QuotaNotEnoughError,
"Quota usage would exceed the limit. Used: " & $usage.used & ", reserved: " &
$usage.reserved & ", limit: " & $self.quotaMaxBytes,
"Quota usage would exceed the limit. Used: " & $stats.quotaUsed &
", reserved: " & $stats.quotaReserved & ", limit: " & $self.quotaMaxBytes,
)
else:
self.quotaUsage = usage
codex_repostore_bytes_used.set(usage.used.int64)
codex_repostore_bytes_reserved.set(usage.reserved.int64)
return usage.some,
self.storageStats = stats
codex_repostore_bytes_used.set(stats.quotaUsed.int64)
codex_repostore_bytes_reserved.set(stats.quotaReserved.int64)
codex_repostore_blocks.set(stats.totalBlocks.int64)
return stats.some,
)

# proc updateTotalBlocksCount*(
# self: RepoStore, plusCount: Natural = 0, minusCount: Natural = 0
# ): Future[?!void] {.async.} =
# await self.metaDs.modify(
# CodexTotalBlocksKey,
# proc(maybeCurrCount: ?Natural): Future[?Natural] {.async.} =
# let count: Natural =
# if currCount =? maybeCurrCount:
# currCount + plusCount - minusCount
# else:
# plusCount - minusCount

# self.totalBlocks = count
# codex_repostore_blocks.set(count.int64)
# count.some,
# )

# proc updateQuotaUsage*(
# self: RepoStore,
# plusUsed: NBytes = 0.NBytes,
# minusUsed: NBytes = 0.NBytes,
# plusReserved: NBytes = 0.NBytes,
# minusReserved: NBytes = 0.NBytes,
# ): Future[?!void] {.async.} =
# await self.metaDs.modify(
# QuotaUsedKey,
# proc(maybeCurrUsage: ?QuotaUsage): Future[?QuotaUsage] {.async.} =
# var usage: QuotaUsage

# if currUsage =? maybeCurrUsage:
# usage = QuotaUsage(
# used: currUsage.used + plusUsed - minusUsed,
# reserved: currUsage.reserved + plusReserved - minusReserved,
# )
# else:
# usage =
# QuotaUsage(used: plusUsed - minusUsed, reserved: plusReserved - minusReserved)

# if usage.used + usage.reserved > self.quotaMaxBytes:
# raise newException(
# QuotaNotEnoughError,
# "Quota usage would exceed the limit. Used: " & $usage.used & ", reserved: " &
# $usage.reserved & ", limit: " & $self.quotaMaxBytes,
# )
# else:
# self.quotaUsage = usage
# codex_repostore_bytes_used.set(usage.used.int64)
# codex_repostore_bytes_reserved.set(usage.reserved.int64)
# return usage.some,
# )

proc updateBlockMetadata*(
self: RepoStore,
cid: Cid,
Expand Down
Loading