@@ -18,36 +18,46 @@ function stateFileExists(logFilename) {
18
18
return fs . existsSync ( getStateFilename ( logFilename ) )
19
19
}
20
20
21
+ const NO_TRUNCATE = 0xffffffff
22
+
21
23
/**
22
24
* This file has state describing the continuation of the compaction algorithm.
23
25
*
24
- * - bytes 0..3: UInt32LE for the blockIndex to-be-compacted
25
- * - bytes 4..7: UInt32LE for the 1st unshifted record's offset
26
- * - bytes 8..(8+blockSize-1): blockBuf containing the 1st unshifted record
26
+ * - bytes 0..3: UInt32LE for the version of this file format
27
+ * smallest version is 1.
28
+ * - bytes 4..7: UInt32LE for block index where to perform truncation
29
+ * where 0xFFFFFFFF means no truncation to-be-done yet
30
+ * - bytes 8..11: UInt32LE for the blockIndex to-be-compacted
31
+ * - bytes 12..15: UInt32LE for the 1st unshifted record's offset
32
+ * - bytes 16..(16+blockSize-1): blockBuf containing the 1st unshifted record
27
33
*/
28
34
function PersistentState ( logFilename , blockSize ) {
29
35
const raf = RAF ( getStateFilename ( logFilename ) )
30
36
const writeLock = mutexify ( )
37
+ const stateFileSize = 4 + 4 + 4 + 4 + blockSize
31
38
32
39
function load ( cb ) {
33
40
raf . stat ( function onRAFStatDone ( err , stat ) {
34
41
const fileSize = ! err && stat ? stat . size : - 1
35
42
if ( fileSize <= 0 ) {
36
43
const state = {
44
+ version : 1 ,
45
+ truncateBlockIndex : NO_TRUNCATE ,
37
46
compactedBlockIndex : 0 ,
38
47
unshiftedOffset : 0 ,
39
48
unshiftedBlockBuffer : null ,
40
49
initial : true ,
41
50
}
42
51
cb ( null , state )
43
52
} else {
44
- const stateFileSize = 4 + 4 + blockSize
45
53
raf . read ( 0 , stateFileSize , function onFirstRAFReadDone ( err , buf ) {
46
54
if ( err ) return cb ( err )
47
55
const state = {
48
- compactedBlockIndex : buf . readUInt32LE ( 0 ) ,
49
- unshiftedOffset : buf . readUInt32LE ( 4 ) ,
50
- unshiftedBlockBuf : buf . slice ( 8 ) ,
56
+ version : buf . readUInt32LE ( 0 ) ,
57
+ truncateBlockIndex : buf . readUInt32LE ( 4 ) ,
58
+ compactedBlockIndex : buf . readUInt32LE ( 8 ) ,
59
+ unshiftedOffset : buf . readUInt32LE ( 12 ) ,
60
+ unshiftedBlockBuf : buf . slice ( 16 ) ,
51
61
initial : false ,
52
62
}
53
63
cb ( null , state )
@@ -57,10 +67,12 @@ function PersistentState(logFilename, blockSize) {
57
67
}
58
68
59
69
function save ( state , cb ) {
60
- const buf = Buffer . alloc ( 4 + 4 + blockSize )
61
- buf . writeUInt32LE ( state . compactedBlockIndex , 0 )
62
- buf . writeUInt32LE ( state . unshiftedOffset , 4 )
63
- state . unshiftedBlockBuf . copy ( buf , 8 )
70
+ const buf = Buffer . alloc ( stateFileSize )
71
+ buf . writeUInt32LE ( state . version , 0 )
72
+ buf . writeUInt32LE ( state . truncateBlockIndex , 4 )
73
+ buf . writeUInt32LE ( state . compactedBlockIndex , 8 )
74
+ buf . writeUInt32LE ( state . unshiftedOffset , 12 )
75
+ state . unshiftedBlockBuf . copy ( buf , 16 )
64
76
writeLock ( ( unlock ) => {
65
77
raf . write ( 0 , buf , function onRafWriteDone ( err ) {
66
78
if ( err ) return unlock ( cb , err )
@@ -117,6 +129,7 @@ function Compaction(log, onDone) {
117
129
const persistentState = PersistentState ( log . filename , log . blockSize )
118
130
const progress = Obv ( ) // for the unshifted offset
119
131
let startOffset = 0
132
+ let version = 0
120
133
121
134
let compactedBlockIndex = - 1
122
135
let compactedBlockBuf = null
@@ -127,16 +140,25 @@ function Compaction(log, onDone) {
127
140
let unshiftedBlockBuf = null
128
141
let unshiftedOffset = 0
129
142
143
+ let truncateBlockIndex = NO_TRUNCATE
144
+
130
145
loadPersistentState ( function onCompactionStateLoaded2 ( err ) {
131
146
if ( err ) return onDone ( err )
132
- startOffset = compactedBlockIndex * log . blockSize
133
- compactedBlockIndex -= 1 // because it'll be incremented very soon
134
- compactNextBlock ( )
147
+ if ( truncateBlockIndex !== NO_TRUNCATE ) {
148
+ truncateAndBeDone ( )
149
+ } else {
150
+ startOffset = compactedBlockIndex * log . blockSize
151
+ compactedBlockIndex -= 1 // because it'll be incremented very soon
152
+ compactNextBlock ( )
153
+ }
135
154
} )
136
155
137
156
function loadPersistentState ( cb ) {
138
157
persistentState . load ( function onCompactionStateLoaded ( err , state ) {
139
158
if ( err ) return cb ( err )
159
+ if ( state . version !== 1 ) return cb ( new Error ( 'unsupported state version' ) )
160
+ version = state . version
161
+ truncateBlockIndex = state . truncateBlockIndex
140
162
compactedBlockIndex = state . compactedBlockIndex
141
163
unshiftedOffset = state . unshiftedOffset
142
164
unshiftedBlockBuf = state . unshiftedBlockBuf
@@ -168,6 +190,8 @@ function Compaction(log, onDone) {
168
190
function saveIt ( ) {
169
191
persistentState . save (
170
192
{
193
+ version,
194
+ truncateBlockIndex,
171
195
compactedBlockIndex,
172
196
unshiftedOffset,
173
197
unshiftedBlockBuf,
@@ -361,9 +385,30 @@ function Compaction(log, onDone) {
361
385
function stop ( ) {
362
386
compactedBlockBuf = null
363
387
unshiftedBlockBuf = null
364
- persistentState . destroy ( function onCompactionStateDestroyed ( err ) {
388
+ truncateBlockIndex = compactedBlockIndex
389
+ const state = {
390
+ version,
391
+ truncateBlockIndex,
392
+ compactedBlockIndex : 0 ,
393
+ unshiftedOffset : 0 ,
394
+ unshiftedBlockBuf : Buffer . alloc ( 0 ) ,
395
+ }
396
+ persistentState . save ( state , function onTruncateStateSaved ( err ) {
365
397
if ( err ) return onDone ( err )
366
- log . truncate ( compactedBlockIndex , onDone )
398
+ truncateAndBeDone ( )
399
+ } )
400
+ }
401
+
402
+ function truncateAndBeDone ( ) {
403
+ if ( truncateAndBeDone === NO_TRUNCATE ) {
404
+ return onDone ( new Error ( 'Cannot truncate log yet' ) )
405
+ }
406
+ log . truncate ( truncateBlockIndex , function onTruncatedLog ( err , sizeDiff ) {
407
+ if ( err ) return onDone ( err )
408
+ persistentState . destroy ( function onStateDestroyed ( err ) {
409
+ if ( err ) return onDone ( err )
410
+ onDone ( null , sizeDiff )
411
+ } )
367
412
} )
368
413
}
369
414
0 commit comments