Skip to content
This repository was archived by the owner on May 14, 2022. It is now read-only.

Commit 52f3479

Browse files
committed
improve compactionProgress stats
1 parent 04767b2 commit 52f3479

File tree

4 files changed

+95
-21
lines changed

4 files changed

+95
-21
lines changed

README.md

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,45 @@ log.since((offset) => {
178178
})
179179
```
180180

181+
### Compact the log (remove deleted records)
182+
183+
```js
184+
log.compact((err) => {
185+
// This callback will be called once, when the compaction is done.
186+
})
187+
```
188+
189+
### Track progress of compactions
190+
191+
As an [obz] observable:
192+
193+
```js
194+
log.compactionProgress((progress) => {
195+
console.log(progress)
196+
// {
197+
// startOffset,
198+
// compactedOffset,
199+
// unshiftedOffset,
200+
// percent,
201+
// done,
202+
// sizeDiff,
203+
// }
204+
})
205+
```
206+
207+
Where
208+
209+
- `startOffset`: the starting point for compaction. All offsets smaller than
210+
this have been left untouched by the compaction algorithm.
211+
- `compactedOffset`: all records up until this point have been compacted so far.
212+
- `unshiftedOffset`: offset for the first record that hasn't yet been "moved"
213+
to previous slots. Tracking this allows you to see the algorithm proceeding.
214+
- `percent`: a number between 0 and 1 to indicate the progress of compaction.
215+
- `done`: a boolean indicating whether compaction is ongoing (`false`) or done
216+
(`true`).
217+
- `sizeDiff`: number of bytes freed after compaction is finished. Only available
218+
if `done` is `true`.
219+
181220
### Close the log
182221

183222
```js

compaction.js

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,8 @@ function PersistentState(logFilename, blockSize) {
118118
*/
119119
function Compaction(log, onDone) {
120120
const persistentState = PersistentState(log.filename, log.blockSize)
121-
const since = Obv() // for the unshifted offset
121+
const progress = Obv() // for the unshifted offset
122+
let startOffset = 0
122123

123124
let compactedBlockIndex = -1
124125
let compactedBlockBuf = null
@@ -131,6 +132,7 @@ function Compaction(log, onDone) {
131132

132133
loadPersistentState(function onCompactionStateLoaded2(err) {
133134
if (err) return onDone(err)
135+
startOffset = compactedBlockIndex * log.blockSize
134136
compactedBlockIndex -= 1 // because it'll be incremented very soon
135137
compactNextBlock()
136138
})
@@ -341,17 +343,28 @@ function Compaction(log, onDone) {
341343
}
342344

343345
function compactNextBlock() {
344-
since.set(unshiftedOffset)
345346
compactedBlockIndex += 1
346347
compactedBlockBuf = Buffer.alloc(log.blockSize)
347348
compactedOffset = compactedBlockIndex * log.blockSize
348349
compactedBlockIdenticalToUnshifted = true
350+
progress.set(calculateProgressStats())
349351
savePersistentState(function onCompactionStateSaved(err) {
350352
if (err) return onDone(err)
351353
continueCompactingBlock()
352354
})
353355
}
354356

357+
function calculateProgressStats() {
358+
const percent =
359+
(unshiftedOffset - startOffset) / (log.since.value - startOffset)
360+
return {
361+
startOffset,
362+
compactedOffset,
363+
unshiftedOffset,
364+
percent,
365+
}
366+
}
367+
355368
function stop() {
356369
compactedBlockBuf = null
357370
unshiftedBlockBuf = null
@@ -362,7 +375,7 @@ function Compaction(log, onDone) {
362375
}
363376

364377
return {
365-
since,
378+
progress,
366379
}
367380
}
368381

index.js

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ module.exports = function AsyncAppendOnlyLog(filename, opts) {
6060
let nextOffsetInBlock = null
6161
const since = Obv() // offset of last written record
6262
let compaction = null
63-
const compactionSince = Obv()
63+
const compactionProgress = Obv()
6464
const waitingCompaction = []
6565

6666
onLoad(function maybeResumeCompaction() {
@@ -442,12 +442,12 @@ module.exports = function AsyncAppendOnlyLog(filename, opts) {
442442
compaction = new Compaction(self, (err, sizeDiff) => {
443443
compaction = null
444444
if (err) return cb(err)
445-
compactionSince.set({ value: sizeDiff, done: true })
445+
compactionProgress.set({ sizeDiff, percent: 1, done: true })
446446
while (waitingCompaction.length) waitingCompaction.shift()()
447447
cb()
448448
})
449-
compaction.since((unshiftedOffset) => {
450-
compactionSince.set({ value: unshiftedOffset, done: false })
449+
compaction.progress((stats) => {
450+
compactionProgress.set({ ...stats, done: false })
451451
})
452452
})
453453
}
@@ -499,7 +499,7 @@ module.exports = function AsyncAppendOnlyLog(filename, opts) {
499499
onDrain: onLoad(onDrain),
500500
compact: onLoad(compact),
501501
since,
502-
compactionSince,
502+
compactionProgress,
503503
stream(opts) {
504504
const stream = new Stream(self, opts)
505505
self.streams.push(stream)

test/compaction.js

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -178,14 +178,47 @@ tape('shift many blocks', async (t) => {
178178
})
179179

180180
const progressArr = []
181-
log.compactionSince((obj) => {
182-
progressArr.push(obj)
181+
log.compactionProgress((stats) => {
182+
progressArr.push(stats)
183183
})
184184

185185
const [err] = await run(log.compact)()
186186
await run(log.onDrain)()
187187
t.error(err, 'no error when compacting')
188188

189+
t.deepEquals(
190+
progressArr,
191+
[
192+
{
193+
startOffset: 11,
194+
compactedOffset: 11,
195+
unshiftedOffset: 11,
196+
percent: 0,
197+
done: false,
198+
},
199+
{
200+
startOffset: 11,
201+
compactedOffset: 22,
202+
unshiftedOffset: 28,
203+
percent: 0.4358974358974359,
204+
done: false,
205+
},
206+
{
207+
startOffset: 11,
208+
compactedOffset: 33,
209+
unshiftedOffset: 44,
210+
percent: 0.8461538461538461,
211+
done: false,
212+
},
213+
{
214+
sizeDiff: 11, // the log is now 1 block shorter
215+
percent: 1,
216+
done: true,
217+
},
218+
],
219+
'progress events'
220+
)
221+
189222
await new Promise((resolve) => {
190223
log.stream({ offsets: false }).pipe(
191224
push.collect((err, ary) => {
@@ -209,17 +242,6 @@ tape('shift many blocks', async (t) => {
209242
)
210243
})
211244

212-
t.deepEquals(
213-
progressArr,
214-
[
215-
{ value: 11 + 0, done: false },
216-
{ value: 22 + 6, done: false },
217-
{ value: 44 + 0, done: false },
218-
{ value: 11, done: true }, // the log is now 1 block shorter
219-
],
220-
'progress events'
221-
)
222-
223245
await run(log.close)()
224246
t.end()
225247
})

0 commit comments

Comments
 (0)