Skip to content
This repository was archived by the owner on May 14, 2022. It is now read-only.

Commit 89a164d

Browse files
authored
Merge pull request #70 from ssb-ngi-pointer/compact-reindex
first compactionProgress events should have startOffset
2 parents 3ffd529 + 531baac commit 89a164d

File tree

3 files changed

+147
-16
lines changed

3 files changed

+147
-16
lines changed

compaction.js

+22-15
Original file line numberDiff line numberDiff line change
@@ -25,23 +25,25 @@ const NO_TRUNCATE = 0xffffffff
2525
*
2626
* - bytes 0..3: UInt32LE for the version of this file format
2727
* smallest version is 1.
28-
* - bytes 4..7: UInt32LE for block index where to perform truncation
29-
* where 0xFFFFFFFF means no truncation to-be-done yet
30-
* - bytes 8..11: UInt32LE for the blockIndex to-be-compacted
31-
* - bytes 12..15: UInt32LE for the 1st unshifted record's offset
32-
* - bytes 16..(16+blockSize-1): blockBuf containing the 1st unshifted record
28+
* - bytes 4..7: UInt32LE for the startOffset, usually the start of some block
29+
* - bytes 8..11: UInt32LE for block index where to perform truncation
30+
* where 0xFFFFFFFF means no truncation to-be-done yet
31+
* - bytes 12..15: UInt32LE for the blockIndex to-be-compacted
32+
* - bytes 16..19: UInt32LE for the 1st unshifted record's offset
33+
* - bytes 20..(20+blockSize-1): blockBuf containing the 1st unshifted record
3334
*/
3435
function PersistentState(logFilename, blockSize) {
3536
const raf = RAF(getStateFilename(logFilename))
3637
const writeLock = mutexify()
37-
const stateFileSize = 4 + 4 + 4 + 4 + blockSize
38+
const stateFileSize = 4 + 4 + 4 + 4 + 4 + blockSize
3839

3940
function load(cb) {
4041
raf.stat(function onRAFStatDone(err, stat) {
4142
const fileSize = !err && stat ? stat.size : -1
4243
if (fileSize <= 0) {
4344
const state = {
4445
version: 1,
46+
startOffset: 0,
4547
truncateBlockIndex: NO_TRUNCATE,
4648
compactedBlockIndex: 0,
4749
unshiftedOffset: 0,
@@ -54,10 +56,11 @@ function PersistentState(logFilename, blockSize) {
5456
if (err) return cb(err)
5557
const state = {
5658
version: buf.readUInt32LE(0),
57-
truncateBlockIndex: buf.readUInt32LE(4),
58-
compactedBlockIndex: buf.readUInt32LE(8),
59-
unshiftedOffset: buf.readUInt32LE(12),
60-
unshiftedBlockBuf: buf.slice(16),
59+
startOffset: buf.readUInt32LE(4),
60+
truncateBlockIndex: buf.readUInt32LE(8),
61+
compactedBlockIndex: buf.readUInt32LE(12),
62+
unshiftedOffset: buf.readUInt32LE(16),
63+
unshiftedBlockBuf: buf.slice(20),
6164
initial: false,
6265
}
6366
cb(null, state)
@@ -69,10 +72,11 @@ function PersistentState(logFilename, blockSize) {
6972
function save(state, cb) {
7073
const buf = Buffer.alloc(stateFileSize)
7174
buf.writeUInt32LE(state.version, 0)
72-
buf.writeUInt32LE(state.truncateBlockIndex, 4)
73-
buf.writeUInt32LE(state.compactedBlockIndex, 8)
74-
buf.writeUInt32LE(state.unshiftedOffset, 12)
75-
state.unshiftedBlockBuf.copy(buf, 16)
75+
buf.writeUint32LE(state.startOffset, 4)
76+
buf.writeUInt32LE(state.truncateBlockIndex, 8)
77+
buf.writeUInt32LE(state.compactedBlockIndex, 12)
78+
buf.writeUInt32LE(state.unshiftedOffset, 16)
79+
state.unshiftedBlockBuf.copy(buf, 20)
7680
writeLock((unlock) => {
7781
raf.write(0, buf, function onRafWriteDone(err) {
7882
if (err) return unlock(cb, err)
@@ -147,7 +151,6 @@ function Compaction(log, onDone) {
147151
if (truncateBlockIndex !== NO_TRUNCATE) {
148152
truncateAndBeDone()
149153
} else {
150-
startOffset = compactedBlockIndex * log.blockSize
151154
compactedBlockIndex -= 1 // because it'll be incremented very soon
152155
compactNextBlock()
153156
}
@@ -158,6 +161,7 @@ function Compaction(log, onDone) {
158161
if (err) return cb(err)
159162
if (state.version !== 1) return cb(new Error('unsupported state version'))
160163
version = state.version
164+
startOffset = state.startOffset
161165
truncateBlockIndex = state.truncateBlockIndex
162166
compactedBlockIndex = state.compactedBlockIndex
163167
unshiftedOffset = state.unshiftedOffset
@@ -166,6 +170,7 @@ function Compaction(log, onDone) {
166170
if (state.initial) {
167171
findStateFromLog(function foundStateFromLog(err, state) {
168172
if (err) return cb(err)
173+
startOffset = state.compactedBlockIndex * log.blockSize
169174
compactedBlockIndex = state.compactedBlockIndex
170175
unshiftedOffset = state.unshiftedOffset
171176
unshiftedBlockBuf = state.unshiftedBlockBuf
@@ -191,6 +196,7 @@ function Compaction(log, onDone) {
191196
persistentState.save(
192197
{
193198
version,
199+
startOffset,
194200
truncateBlockIndex,
195201
compactedBlockIndex,
196202
unshiftedOffset,
@@ -388,6 +394,7 @@ function Compaction(log, onDone) {
388394
truncateBlockIndex = compactedBlockIndex
389395
const state = {
390396
version,
397+
startOffset,
391398
truncateBlockIndex,
392399
compactedBlockIndex: 0,
393400
unshiftedOffset: 0,

index.js

+5-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,11 @@ module.exports = function AsyncAppendOnlyLog(filename, opts) {
6262
let nextOffsetInBlock = null
6363
const since = Obv() // offset of last written record
6464
let compaction = null
65-
const compactionProgress = Obv()
65+
const compactionProgress = Obv().set(
66+
Compaction.stateFileExists(filename)
67+
? { done: false }
68+
: { sizeDiff: 0, percent: 1, done: true }
69+
)
6670
const waitingCompaction = []
6771

6872
onLoad(function maybeResumeCompaction() {

test/compaction.js

+120
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,11 @@ tape('shift many blocks', async (t) => {
189189
t.deepEquals(
190190
progressArr,
191191
[
192+
{
193+
sizeDiff: 0,
194+
percent: 1,
195+
done: true,
196+
},
192197
{
193198
startOffset: 11,
194199
compactedOffset: 11,
@@ -463,6 +468,88 @@ tape('compact handling holes of different sizes', async (t) => {
463468
t.end()
464469
})
465470

471+
tape('startOffset is correct', async (t) => {
472+
t.timeoutAfter(6000)
473+
const file = '/tmp/compaction-test_' + Date.now() + '.log'
474+
const log = Log(file, { blockSize: 9, codec: hexCodec })
475+
476+
await run(log.append)(
477+
[
478+
// block 0
479+
[0x11, 0x22], // offsets: 0, 3
480+
// block 1
481+
[0x33, 0x44], // offsets: 9+0, 9+3
482+
].flat()
483+
)
484+
await run(log.onDrain)()
485+
t.pass('append four records')
486+
487+
await run(log.del)(0)
488+
await run(log.onDeletesFlushed)()
489+
t.pass('delete 1st record')
490+
491+
const progressArr = []
492+
log.compactionProgress((stats) => {
493+
progressArr.push(stats)
494+
})
495+
496+
const [err] = await run(log.compact)()
497+
t.error(err, 'no error when compacting')
498+
499+
t.deepEquals(
500+
progressArr,
501+
[
502+
{
503+
sizeDiff: 0,
504+
percent: 1,
505+
done: true,
506+
},
507+
{
508+
startOffset: 0,
509+
compactedOffset: 0,
510+
unshiftedOffset: 3,
511+
percent: 0.25,
512+
done: false,
513+
},
514+
{
515+
startOffset: 0,
516+
compactedOffset: 9,
517+
unshiftedOffset: 12,
518+
percent: 1,
519+
done: false,
520+
},
521+
{
522+
sizeDiff: 0,
523+
percent: 1,
524+
done: true,
525+
},
526+
],
527+
'progress events'
528+
)
529+
530+
await new Promise((resolve) => {
531+
log.stream({ offsets: false }).pipe(
532+
push.collect((err, ary) => {
533+
t.error(err, 'no error when streaming compacted log')
534+
t.deepEqual(
535+
ary,
536+
[
537+
// block 0
538+
[0x22, 0x33],
539+
// block 1
540+
[0x44],
541+
].flat(),
542+
'log has 2 blocks'
543+
)
544+
resolve()
545+
})
546+
)
547+
})
548+
549+
await run(log.close)()
550+
t.end()
551+
})
552+
466553
tape('recovers from crash just after persisting state', async (t) => {
467554
t.timeoutAfter(6000)
468555
const file = '/tmp/compaction-test_' + Date.now() + '.log'
@@ -482,6 +569,7 @@ tape('recovers from crash just after persisting state', async (t) => {
482569
t.pass('suppose compaction was in progress: [0x22, 0x33] and [0x33, 0x44]')
483570

484571
const version = [1, 0, 0, 0] // uint32LE
572+
const startOffset = [0, 0, 0, 0] // uint32LE
485573
const truncateBlockIndex = [255, 255, 255, 255] //uint32LE
486574
const compactingBlockIndex = [1, 0, 0, 0] // uint32LE
487575
const unshiftedOffset = [9 + 3, 0, 0, 0] // uint32LE
@@ -494,6 +582,7 @@ tape('recovers from crash just after persisting state', async (t) => {
494582
file + '.compaction',
495583
Buffer.from([
496584
...version,
585+
...startOffset,
497586
...truncateBlockIndex,
498587
...compactingBlockIndex,
499588
...unshiftedOffset,
@@ -506,8 +595,35 @@ tape('recovers from crash just after persisting state', async (t) => {
506595
log = Log(file, { blockSize: 9, codec: hexCodec })
507596
t.pass('start log, compaction should autostart')
508597

598+
const progressArr = []
599+
log.compactionProgress((stats) => {
600+
progressArr.push(stats)
601+
})
602+
509603
await timer(1000)
510604

605+
t.deepEquals(
606+
progressArr,
607+
[
608+
{
609+
done: false,
610+
},
611+
{
612+
startOffset: 0,
613+
compactedOffset: 9,
614+
unshiftedOffset: 12,
615+
percent: 1,
616+
done: false,
617+
},
618+
{
619+
sizeDiff: 0,
620+
percent: 1,
621+
done: true,
622+
},
623+
],
624+
'progress events'
625+
)
626+
511627
await new Promise((resolve) => {
512628
log.stream({ offsets: false }).pipe(
513629
push.collect((err, ary) => {
@@ -552,6 +668,7 @@ tape('recovers from crash just after persisting block', async (t) => {
552668
t.pass('suppose compaction was in progress: [0x22, 0x33] and [0x33, 0x44]')
553669

554670
const version = [1, 0, 0, 0] // uint32LE
671+
const startOffset = [0,0,0,0] // uint32LE
555672
const truncateBlockIndex = [255, 255, 255, 255] //uint32LE
556673
const compactingBlockIndex = [0, 0, 0, 0] // uint32LE
557674
const unshiftedOffset = [0, 0, 0, 0] // uint32LE
@@ -564,6 +681,7 @@ tape('recovers from crash just after persisting block', async (t) => {
564681
file + '.compaction',
565682
Buffer.from([
566683
...version,
684+
...startOffset,
567685
...truncateBlockIndex,
568686
...compactingBlockIndex,
569687
...unshiftedOffset,
@@ -621,6 +739,7 @@ tape('restarts from crash just before truncating log', async (t) => {
621739
t.pass('suppose compaction ready: [0x22, 0x44], [0x55, 0x66], [0x55, 0x66]')
622740

623741
const version = [1, 0, 0, 0] // uint32LE
742+
const startOffset = [0,0,0,0] // uint32LE
624743
const truncateBlockIndex = [1, 0, 0, 0] //uint32LE
625744
const compactingBlockIndex = [0, 0, 0, 0] // uint32LE
626745
const unshiftedOffset = [0, 0, 0, 0] // uint32LE
@@ -629,6 +748,7 @@ tape('restarts from crash just before truncating log', async (t) => {
629748
file + '.compaction',
630749
Buffer.from([
631750
...version,
751+
...startOffset,
632752
...truncateBlockIndex,
633753
...compactingBlockIndex,
634754
...unshiftedOffset,

0 commit comments

Comments
 (0)