@@ -462,6 +462,7 @@ const (
462
462
compactionKindRead
463
463
compactionKindRewrite
464
464
compactionKindIngestedFlushable
465
+ compactionKindBufferedFlush
465
466
)
466
467
467
468
func (k compactionKind ) String () string {
@@ -1874,6 +1875,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
1874
1875
var n , inputs int
1875
1876
var inputBytes uint64
1876
1877
var ingest bool
1878
+ var bufferedFlush = true // TODO(aaditya): loop this into a config setting
1877
1879
for ; n < len (d .mu .mem .queue )- 1 ; n ++ {
1878
1880
if f , ok := d .mu .mem .queue [n ].flushable .(* ingestedFlushable ); ok {
1879
1881
if n == 0 {
@@ -1932,6 +1934,10 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
1932
1934
if err != nil {
1933
1935
return 0 , err
1934
1936
}
1937
+
1938
+ if bufferedFlush && ! ingest {
1939
+ c .kind = compactionKindBufferedFlush
1940
+ }
1935
1941
d .addInProgressCompaction (c )
1936
1942
1937
1943
jobID := d .mu .nextJobID
@@ -1946,7 +1952,16 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
1946
1952
1947
1953
// Compactions always write directly to the database's object provider.
1948
1954
// Flushes may write to an in-memory object provider first.
1949
- var objCreator objectCreator = d .objProvider
1955
+ var objCreator objectCreator
1956
+ if c .kind == compactionKindBufferedFlush {
1957
+ bufferedSSTs := & bufferedSSTables {}
1958
+ // TODO(aaditya): pick a better size
1959
+ bufferedSSTs .init (10 )
1960
+ objCreator = bufferedSSTs
1961
+ } else {
1962
+ objCreator = d .objProvider
1963
+ }
1964
+
1950
1965
var ve * manifest.VersionEdit
1951
1966
var pendingOutputs []physicalMeta
1952
1967
var stats compactStats
@@ -1963,6 +1978,27 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
1963
1978
// TODO(aadityas,jackson): If the buffered output sstables are too small,
1964
1979
// avoid linking them into the version and just update the flushable queue
1965
1980
// appropriately.
1981
+ if c .kind == compactionKindBufferedFlush {
1982
+ var metas []* fileMetadata
1983
+ var fileNums []base.DiskFileNum
1984
+ for _ , file := range ve .NewFiles {
1985
+ metas = append (metas , file .Meta )
1986
+ fileNums = append (fileNums , file .BackingFileNum )
1987
+ }
1988
+
1989
+ bufferedSST := objCreator .(* bufferedSSTables )
1990
+ if bufferedSST .size < d .opts .MemTableSize /* TODO(aaditya): does this make sense? */ {
1991
+ var f flushable
1992
+ f , err = newFlushableBufferedSSTables (d .opts .Comparer , metas , sstable.ReaderOptions {}, bufferedSST )
1993
+ fe := d .newFlushableEntry (f , fileNums [0 ], 0 /* TODO(aaditya): figure out what to put here */ )
1994
+ remaining := d .mu .mem .queue [n : len (d .mu .mem .queue )- 2 ]
1995
+ mutable := d .mu .mem .queue [len (d .mu .mem .queue )- 1 ]
1996
+ d .mu .mem .queue = append (remaining , fe , mutable )
1997
+ return 0 , err
1998
+ }
1999
+
2000
+ // else convert to objProvider and write to disk
2001
+ }
1966
2002
1967
2003
// Acquire logLock. This will be released either on an error, by way of
1968
2004
// logUnlock, or through a call to logAndApply if there is no error.
0 commit comments