Skip to content
This repository was archived by the owner on May 14, 2022. It is now read-only.

Commit 649ec86

Browse files
authored
Merge pull request #68 from ssb-ngi-pointer/batch-delete
speed up deletes and add onDeletesFlushed()
2 parents 0465b75 + 80b7f5f commit 649ec86

File tree

4 files changed

+174
-55
lines changed

4 files changed

+174
-55
lines changed

README.md

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,14 @@ log.appendTransaction([item1, item2, item3], (err, offset3) => {
117117
})
118118
```
119119

120+
### Wait for all ongoing appends to be flushed to disk
121+
122+
```js
123+
log.onDrain(() => {
124+
// ...
125+
})
126+
```
127+
120128
### Scan all records as a `push-stream`
121129

122130
```js
@@ -160,10 +168,10 @@ log.del(offset, (err) => {
160168
})
161169
```
162170

163-
### Wait for all ongoing writes to complete
171+
### Wait for all ongoing deletes to be flushed to disk
164172

165173
```js
166-
log.onDrain(() => {
174+
log.onDeletesFlushed(() => {
167175
// ...
168176
})
169177
```

index.js

Lines changed: 67 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ const DEFAULT_WRITE_TIMEOUT = 250
4242
const DEFAULT_VALIDATE = () => true
4343

4444
module.exports = function AsyncAppendOnlyLog(filename, opts) {
45-
const cache = new Cache(1024) // this is potentially 65mb!
45+
const cache = new Cache(1024) // This is potentially 64 MiB!
4646
const raf = RAF(filename)
4747
const blockSize = (opts && opts.blockSize) || DEFAULT_BLOCK_SIZE
4848
const codec = (opts && opts.codec) || DEFAULT_CODEC
@@ -52,7 +52,9 @@ module.exports = function AsyncAppendOnlyLog(filename, opts) {
5252

5353
const waitingLoad = []
5454
const waitingDrain = new Map() // blockIndex -> []
55+
const waitingFlushDelete = []
5556
const blocksToBeWritten = new Map() // blockIndex -> { blockBuf, offset }
57+
const blocksWithDeletables = new Map() // blockIndex -> blockBuf
5658
let writingBlockIndex = -1
5759

5860
let latestBlockBuf = null
@@ -244,25 +246,64 @@ module.exports = function AsyncAppendOnlyLog(filename, opts) {
244246
cb(delDuringCompactErr())
245247
return
246248
}
247-
if (blocksToBeWritten.has(getBlockIndex(offset))) {
249+
const blockIndex = getBlockIndex(offset)
250+
if (blocksToBeWritten.has(blockIndex)) {
248251
onDrain(function delAfterDrained() {
249252
del(offset, cb)
250253
})
251254
return
252255
}
253-
getBlock(offset, function gotBlockForDelete(err, blockBuf) {
256+
257+
if (blocksWithDeletables.has(blockIndex)) {
258+
const blockBuf = blocksWithDeletables.get(blockIndex)
259+
gotBlockForDelete(null, blockBuf)
260+
} else {
261+
getBlock(offset, gotBlockForDelete)
262+
}
263+
function gotBlockForDelete(err, blockBuf) {
254264
if (err) return cb(err)
255-
Record.overwriteWithZeroes(blockBuf, getOffsetInBlock(offset))
256-
// we write directly here to make normal write simpler
257-
const blockStart = getBlockStart(offset)
258-
writeWithFSync(blockStart, blockBuf, null, cb)
259-
})
265+
const actualBlockBuf = blocksWithDeletables.get(blockIndex) || blockBuf
266+
Record.overwriteWithZeroes(actualBlockBuf, getOffsetInBlock(offset))
267+
blocksWithDeletables.set(blockIndex, actualBlockBuf)
268+
scheduleFlushDelete()
269+
cb()
270+
}
260271
}
261272

262273
function hasNoSpaceFor(dataBuf, offsetInBlock) {
263274
return offsetInBlock + Record.size(dataBuf) + EOB.SIZE > blockSize
264275
}
265276

277+
const scheduleFlushDelete = debounce(flushDelete, writeTimeout)
278+
279+
function flushDelete() {
280+
if (blocksWithDeletables.size === 0) {
281+
for (const cb of waitingFlushDelete) cb()
282+
waitingFlushDelete.length = 0
283+
return
284+
}
285+
const blockIndex = blocksWithDeletables.keys().next().value
286+
const blockStart = blockIndex * blockSize
287+
const blockBuf = blocksWithDeletables.get(blockIndex)
288+
blocksWithDeletables.delete(blockIndex)
289+
blocksWithDeletables.set(-1, null) // indicate that flush is active
290+
291+
writeWithFSync(blockStart, blockBuf, null, function flushedDelete(err) {
292+
blocksWithDeletables.delete(-1) // indicate that flush is not active
293+
if (err) {
294+
for (const cb of waitingFlushDelete) cb(err)
295+
waitingFlushDelete.length = 0
296+
return
297+
}
298+
flushDelete() // next
299+
})
300+
}
301+
302+
function onDeletesFlushed(cb) {
303+
if (blocksWithDeletables.size === 0) cb()
304+
else waitingFlushDelete.push(cb)
305+
}
306+
266307
function appendSingle(data) {
267308
let encodedData = codec.encode(data)
268309
if (typeof encodedData === 'string') encodedData = Buffer.from(encodedData)
@@ -439,26 +480,29 @@ module.exports = function AsyncAppendOnlyLog(filename, opts) {
439480
return
440481
}
441482
onDrain(function startCompactAfterDrain() {
442-
compaction = new Compaction(self, (err, sizeDiff) => {
443-
compaction = null
444-
if (err) return cb(err)
445-
compactionProgress.set({ sizeDiff, percent: 1, done: true })
446-
for (let i = 0, n = waitingCompaction.length; i < n; ++i) {
447-
waitingCompaction[i]()
448-
}
449-
waitingCompaction.length = 0
450-
cb()
451-
})
452-
compaction.progress((stats) => {
453-
compactionProgress.set({ ...stats, done: false })
483+
onDeletesFlushed(function startCompactAfterDeletes() {
484+
compaction = new Compaction(self, (err, sizeDiff) => {
485+
compaction = null
486+
if (err) return cb(err)
487+
compactionProgress.set({ sizeDiff, percent: 1, done: true })
488+
for (const callback of waitingCompaction) callback()
489+
waitingCompaction.length = 0
490+
cb()
491+
})
492+
compaction.progress((stats) => {
493+
compactionProgress.set({ ...stats, done: false })
494+
})
454495
})
455496
})
456497
}
457498

458499
function close(cb) {
459500
onDrain(function closeAfterHavingDrained() {
460-
while (self.streams.length) self.streams.shift().abort(streamClosedErr())
461-
raf.close(cb)
501+
onDeletesFlushed(function closeAfterDeletesFlushed() {
502+
for (const stream of self.streams) stream.abort(streamClosedErr())
503+
self.streams = []
504+
raf.close(cb)
505+
})
462506
})
463507
}
464508

@@ -500,6 +544,7 @@ module.exports = function AsyncAppendOnlyLog(filename, opts) {
500544
appendTransaction: onLoad(appendTransaction),
501545
close: onLoad(close),
502546
onDrain: onLoad(onDrain),
547+
onDeletesFlushed: onLoad(onDeletesFlushed),
503548
compact: onLoad(compact),
504549
since,
505550
compactionProgress,

test/compaction.js

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ tape('delete first record, compact, stream', async (t) => {
5757
t.pass('append two records')
5858

5959
await run(log.del)(offset1)
60-
await run(log.onDrain)()
60+
await run(log.onDeletesFlushed)()
6161
t.pass('delete first record')
6262

6363
const [err] = await run(log.compact)()
@@ -93,7 +93,7 @@ tape('delete last record, compact, stream', async (t) => {
9393
t.pass('append three records')
9494

9595
await run(log.del)(offset3)
96-
await run(log.onDrain)()
96+
await run(log.onDeletesFlushed)()
9797
t.pass('delete third record')
9898

9999
await new Promise((resolve) => {
@@ -150,7 +150,7 @@ tape('shift many blocks', async (t) => {
150150
await run(log.del)(11 + 3)
151151
await run(log.del)(11 + 6)
152152
await run(log.del)(33 + 3)
153-
await run(log.onDrain)()
153+
await run(log.onDeletesFlushed)()
154154
t.pass('deleted some records in the middle')
155155

156156
await new Promise((resolve) => {
@@ -265,7 +265,7 @@ tape('cannot read truncated regions of the log', async (t) => {
265265
await run(log.del)(11 + 3)
266266
await run(log.del)(11 + 6)
267267
await run(log.del)(22 + 0)
268-
await run(log.onDrain)()
268+
await run(log.onDeletesFlushed)()
269269
t.pass('delete some records')
270270

271271
await new Promise((resolve) => {
@@ -340,7 +340,7 @@ tape('compact handling last deleted record on last block', async (t) => {
340340

341341
await run(log.del)(11 + 3)
342342
await run(log.del)(22 + 6)
343-
await run(log.onDrain)()
343+
await run(log.onDeletesFlushed)()
344344
t.pass('deleted some records in the middle')
345345

346346
await new Promise((resolve) => {
@@ -413,7 +413,7 @@ tape('compact handling holes of different sizes', async (t) => {
413413

414414
await run(log.del)(3)
415415
await run(log.del)(14 + 0)
416-
await run(log.onDrain)()
416+
await run(log.onDeletesFlushed)()
417417
t.pass('deleted some records in the middle')
418418

419419
await new Promise((resolve) => {
@@ -606,7 +606,7 @@ tape('append during compaction is postponed', async (t) => {
606606
t.pass('append two records')
607607

608608
await run(log.del)(offset1)
609-
await run(log.onDrain)()
609+
await run(log.onDeletesFlushed)()
610610
t.pass('delete first record')
611611

612612
let appendDone = false
@@ -651,7 +651,7 @@ tape('appendTransaction during compaction is postponed', async (t) => {
651651
t.pass('append two records')
652652

653653
await run(log.del)(offset1)
654-
await run(log.onDrain)()
654+
await run(log.onDeletesFlushed)()
655655
t.pass('delete first record')
656656

657657
let appendTransactionDone = false
@@ -695,17 +695,27 @@ tape('del during compaction is forbidden', async (t) => {
695695
t.pass('append two records')
696696

697697
await run(log.del)(offset1)
698-
await run(log.onDrain)()
698+
await run(log.onDeletesFlushed)()
699699
t.pass('delete first record')
700700

701+
let compactDone = false
701702
log.compact((err) => {
702703
t.error(err, 'no error when compacting')
704+
compactDone = true
703705
})
704706
const [err, offset3] = await run(log.del)(10)
705707
t.ok(err, 'del is forbidden')
706708
t.match(err.message, /Cannot delete/)
707709
t.notOk(offset3)
708-
await run(log.onDrain)()
710+
711+
await new Promise((resolve) => {
712+
const interval = setInterval(() => {
713+
if (compactDone) {
714+
clearInterval(interval)
715+
resolve()
716+
}
717+
}, 100)
718+
})
709719

710720
await new Promise((resolve) => {
711721
log.stream({ offsets: false }).pipe(
@@ -734,7 +744,7 @@ tape('there can only be one compact at a time', async (t) => {
734744
t.pass('append two records')
735745

736746
await run(log.del)(offset1)
737-
await run(log.onDrain)()
747+
await run(log.onDeletesFlushed)()
738748
t.pass('delete first record')
739749

740750
let compact1Done = false

0 commit comments

Comments
 (0)