Skip to content
This repository was archived by the owner on May 14, 2022. It is now read-only.

Commit 3ffd529

Browse files
authored
Merge pull request #69 from ssb-ngi-pointer/crash-before-truncate
Crash before truncate
2 parents bec87f2 + b77609b commit 3ffd529

File tree

2 files changed

+139
-28
lines changed

2 files changed

+139
-28
lines changed

compaction.js

+66-28
Original file line numberDiff line numberDiff line change
@@ -18,36 +18,46 @@ function stateFileExists(logFilename) {
1818
return fs.existsSync(getStateFilename(logFilename))
1919
}
2020

21+
const NO_TRUNCATE = 0xffffffff
22+
2123
/**
2224
* This file has state describing the continuation of the compaction algorithm.
2325
*
24-
* - bytes 0..3: UInt32LE for the blockIndex to-be-compacted
25-
* - bytes 4..7: UInt32LE for the 1st unshifted record's offset
26-
* - bytes 8..(8+blockSize-1): blockBuf containing the 1st unshifted record
26+
* - bytes 0..3: UInt32LE for the version of this file format
27+
* smallest version is 1.
28+
* - bytes 4..7: UInt32LE for block index where to perform truncation
29+
* where 0xFFFFFFFF means no truncation to-be-done yet
30+
* - bytes 8..11: UInt32LE for the blockIndex to-be-compacted
31+
* - bytes 12..15: UInt32LE for the 1st unshifted record's offset
32+
* - bytes 16..(16+blockSize-1): blockBuf containing the 1st unshifted record
2733
*/
2834
function PersistentState(logFilename, blockSize) {
2935
const raf = RAF(getStateFilename(logFilename))
3036
const writeLock = mutexify()
37+
const stateFileSize = 4 + 4 + 4 + 4 + blockSize
3138

3239
function load(cb) {
3340
raf.stat(function onRAFStatDone(err, stat) {
3441
const fileSize = !err && stat ? stat.size : -1
3542
if (fileSize <= 0) {
3643
const state = {
44+
version: 1,
45+
truncateBlockIndex: NO_TRUNCATE,
3746
compactedBlockIndex: 0,
3847
unshiftedOffset: 0,
3948
unshiftedBlockBuffer: null,
4049
initial: true,
4150
}
4251
cb(null, state)
4352
} else {
44-
const stateFileSize = 4 + 4 + blockSize
4553
raf.read(0, stateFileSize, function onFirstRAFReadDone(err, buf) {
4654
if (err) return cb(err)
4755
const state = {
48-
compactedBlockIndex: buf.readUInt32LE(0),
49-
unshiftedOffset: buf.readUInt32LE(4),
50-
unshiftedBlockBuf: buf.slice(8),
56+
version: buf.readUInt32LE(0),
57+
truncateBlockIndex: buf.readUInt32LE(4),
58+
compactedBlockIndex: buf.readUInt32LE(8),
59+
unshiftedOffset: buf.readUInt32LE(12),
60+
unshiftedBlockBuf: buf.slice(16),
5161
initial: false,
5262
}
5363
cb(null, state)
@@ -57,10 +67,12 @@ function PersistentState(logFilename, blockSize) {
5767
}
5868

5969
function save(state, cb) {
60-
const buf = Buffer.alloc(4 + 4 + blockSize)
61-
buf.writeUInt32LE(state.compactedBlockIndex, 0)
62-
buf.writeUInt32LE(state.unshiftedOffset, 4)
63-
state.unshiftedBlockBuf.copy(buf, 8)
70+
const buf = Buffer.alloc(stateFileSize)
71+
buf.writeUInt32LE(state.version, 0)
72+
buf.writeUInt32LE(state.truncateBlockIndex, 4)
73+
buf.writeUInt32LE(state.compactedBlockIndex, 8)
74+
buf.writeUInt32LE(state.unshiftedOffset, 12)
75+
state.unshiftedBlockBuf.copy(buf, 16)
6476
writeLock((unlock) => {
6577
raf.write(0, buf, function onRafWriteDone(err) {
6678
if (err) return unlock(cb, err)
@@ -78,10 +90,7 @@ function PersistentState(logFilename, blockSize) {
7890
if (stateFileExists(logFilename)) {
7991
raf.close(function onRAFClosed(err) {
8092
if (err) return cb(err)
81-
fs.unlink(raf.filename, function onStateFileDeleted(err) {
82-
if (err) return cb(err)
83-
else cb()
84-
})
93+
fs.unlink(raf.filename, cb)
8594
})
8695
} else {
8796
cb()
@@ -120,6 +129,7 @@ function Compaction(log, onDone) {
120129
const persistentState = PersistentState(log.filename, log.blockSize)
121130
const progress = Obv() // for the unshifted offset
122131
let startOffset = 0
132+
let version = 0
123133

124134
let compactedBlockIndex = -1
125135
let compactedBlockBuf = null
@@ -130,16 +140,25 @@ function Compaction(log, onDone) {
130140
let unshiftedBlockBuf = null
131141
let unshiftedOffset = 0
132142

143+
let truncateBlockIndex = NO_TRUNCATE
144+
133145
loadPersistentState(function onCompactionStateLoaded2(err) {
134146
if (err) return onDone(err)
135-
startOffset = compactedBlockIndex * log.blockSize
136-
compactedBlockIndex -= 1 // because it'll be incremented very soon
137-
compactNextBlock()
147+
if (truncateBlockIndex !== NO_TRUNCATE) {
148+
truncateAndBeDone()
149+
} else {
150+
startOffset = compactedBlockIndex * log.blockSize
151+
compactedBlockIndex -= 1 // because it'll be incremented very soon
152+
compactNextBlock()
153+
}
138154
})
139155

140156
function loadPersistentState(cb) {
141157
persistentState.load(function onCompactionStateLoaded(err, state) {
142158
if (err) return cb(err)
159+
if (state.version !== 1) return cb(new Error('unsupported state version'))
160+
version = state.version
161+
truncateBlockIndex = state.truncateBlockIndex
143162
compactedBlockIndex = state.compactedBlockIndex
144163
unshiftedOffset = state.unshiftedOffset
145164
unshiftedBlockBuf = state.unshiftedBlockBuf
@@ -163,16 +182,16 @@ function Compaction(log, onDone) {
163182

164183
function savePersistentState(cb) {
165184
if (!unshiftedBlockBuf) {
166-
loadUnshiftedBlock(function onUnshiftedBlockLoaded() {
167-
saveIt.call(this)
168-
})
185+
loadUnshiftedBlock(saveIt)
169186
} else {
170-
saveIt.call(this)
187+
saveIt()
171188
}
172189

173190
function saveIt() {
174191
persistentState.save(
175192
{
193+
version,
194+
truncateBlockIndex,
176195
compactedBlockIndex,
177196
unshiftedOffset,
178197
unshiftedBlockBuf,
@@ -249,9 +268,7 @@ function Compaction(log, onDone) {
249268
while (true) {
250269
// Fetch the unshifted block, if necessary
251270
if (!unshiftedBlockBuf) {
252-
loadUnshiftedBlock(function onUnshiftedBlockLoaded() {
253-
continueCompactingBlock()
254-
})
271+
loadUnshiftedBlock(continueCompactingBlock)
255272
return
256273
}
257274
// When all records have been shifted (thus end of log), stop compacting
@@ -273,7 +290,7 @@ function Compaction(log, onDone) {
273290
// Proceed to compact the next block if this block is full
274291
if (log.hasNoSpaceFor(unshiftedDataBuf, offsetInCompactedBlock)) {
275292
saveCompactedBlock()
276-
setImmediate(() => compactNextBlock())
293+
setImmediate(compactNextBlock)
277294
return
278295
}
279296

@@ -368,9 +385,30 @@ function Compaction(log, onDone) {
368385
function stop() {
369386
compactedBlockBuf = null
370387
unshiftedBlockBuf = null
371-
persistentState.destroy(function onCompactionStateDestroyed(err) {
388+
truncateBlockIndex = compactedBlockIndex
389+
const state = {
390+
version,
391+
truncateBlockIndex,
392+
compactedBlockIndex: 0,
393+
unshiftedOffset: 0,
394+
unshiftedBlockBuf: Buffer.alloc(0),
395+
}
396+
persistentState.save(state, function onTruncateStateSaved(err) {
372397
if (err) return onDone(err)
373-
log.truncate(compactedBlockIndex, onDone)
398+
truncateAndBeDone()
399+
})
400+
}
401+
402+
function truncateAndBeDone() {
403+
if (truncateAndBeDone === NO_TRUNCATE) {
404+
return onDone(new Error('Cannot truncate log yet'))
405+
}
406+
log.truncate(truncateBlockIndex, function onTruncatedLog(err, sizeDiff) {
407+
if (err) return onDone(err)
408+
persistentState.destroy(function onStateDestroyed(err) {
409+
if (err) return onDone(err)
410+
onDone(null, sizeDiff)
411+
})
374412
})
375413
}
376414

test/compaction.js

+73
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,8 @@ tape('recovers from crash just after persisting state', async (t) => {
481481
await run(log.close)()
482482
t.pass('suppose compaction was in progress: [0x22, 0x33] and [0x33, 0x44]')
483483

484+
const version = [1, 0, 0, 0] // uint32LE
485+
const truncateBlockIndex = [255, 255, 255, 255] //uint32LE
484486
const compactingBlockIndex = [1, 0, 0, 0] // uint32LE
485487
const unshiftedOffset = [9 + 3, 0, 0, 0] // uint32LE
486488
const unshiftedBlock = [
@@ -491,6 +493,8 @@ tape('recovers from crash just after persisting state', async (t) => {
491493
await fs.promises.writeFile(
492494
file + '.compaction',
493495
Buffer.from([
496+
...version,
497+
...truncateBlockIndex,
494498
...compactingBlockIndex,
495499
...unshiftedOffset,
496500
...unshiftedBlock,
@@ -547,6 +551,8 @@ tape('recovers from crash just after persisting block', async (t) => {
547551
await run(log.close)()
548552
t.pass('suppose compaction was in progress: [0x22, 0x33] and [0x33, 0x44]')
549553

554+
const version = [1, 0, 0, 0] // uint32LE
555+
const truncateBlockIndex = [255, 255, 255, 255] //uint32LE
550556
const compactingBlockIndex = [0, 0, 0, 0] // uint32LE
551557
const unshiftedOffset = [0, 0, 0, 0] // uint32LE
552558
const unshiftedBlock = [
@@ -557,6 +563,8 @@ tape('recovers from crash just after persisting block', async (t) => {
557563
await fs.promises.writeFile(
558564
file + '.compaction',
559565
Buffer.from([
566+
...version,
567+
...truncateBlockIndex,
560568
...compactingBlockIndex,
561569
...unshiftedOffset,
562570
...unshiftedBlock,
@@ -592,6 +600,71 @@ tape('recovers from crash just after persisting block', async (t) => {
592600
t.end()
593601
})
594602

603+
tape('restarts from crash just before truncating log', async (t) => {
604+
t.timeoutAfter(6000)
605+
const file = '/tmp/compaction-test_' + Date.now() + '.log'
606+
let log = Log(file, { blockSize: 9, codec: hexCodec })
607+
t.pass('suppose the log has blockSize 9')
608+
t.pass('suppose we had blocks: [null, 0x22], [null, 0x44] and [0x55, 0x66]')
609+
610+
await run(log.append)(
611+
[
612+
// block 0
613+
[0x22, 0x44], // offsets: 0, 3
614+
// block 1
615+
[0x55, 0x66], // offsets: 9+0, 9+3
616+
// block 2
617+
[0x55, 0x66], // offsets: 18+0, 18+3
618+
].flat()
619+
)
620+
await run(log.close)()
621+
t.pass('suppose compaction ready: [0x22, 0x44], [0x55, 0x66], [0x55, 0x66]')
622+
623+
const version = [1, 0, 0, 0] // uint32LE
624+
const truncateBlockIndex = [1, 0, 0, 0] //uint32LE
625+
const compactingBlockIndex = [0, 0, 0, 0] // uint32LE
626+
const unshiftedOffset = [0, 0, 0, 0] // uint32LE
627+
const unshiftedBlock = [0, 0, 0, 0, 0, 0, 0, 0, 0]
628+
await fs.promises.writeFile(
629+
file + '.compaction',
630+
Buffer.from([
631+
...version,
632+
...truncateBlockIndex,
633+
...compactingBlockIndex,
634+
...unshiftedOffset,
635+
...unshiftedBlock,
636+
])
637+
)
638+
t.pass('suppose compaction file: truncateBlockIndex 1')
639+
640+
log = Log(file, { blockSize: 9, codec: hexCodec })
641+
t.pass('start log, compaction should autostart')
642+
643+
await timer(1000)
644+
645+
await new Promise((resolve) => {
646+
log.stream({ offsets: false }).pipe(
647+
push.collect((err, ary) => {
648+
t.error(err, 'no error when streaming compacted log')
649+
t.deepEqual(
650+
ary,
651+
[
652+
// block 0
653+
[0x22, 0x44],
654+
// block 1
655+
[0x55, 0x66],
656+
].flat(),
657+
'truncated to: [0x22, 0x44], [0x55, 0x66]'
658+
)
659+
resolve()
660+
})
661+
)
662+
})
663+
664+
await run(log.close)()
665+
t.end()
666+
})
667+
595668
tape('append during compaction is postponed', async (t) => {
596669
const file = '/tmp/compaction-test_' + Date.now() + '.log'
597670
const log = Log(file, { blockSize: 10 })

0 commit comments

Comments
 (0)