44 "encoding/binary"
55 "errors"
66 "fmt"
7+ "github.com/godzie44/go-uring/uring"
78 "os"
89
910 pool "github.com/libp2p/go-buffer-pool"
@@ -21,14 +22,14 @@ type FileWriter struct {
2122 open bool
2223 closed bool
2324
24- file * os.File
25- bufWriter WriteCloserFlusher
26- currentOffset uint64
27- compressionType int
28- compressor compressor.CompressionI
29- recordHeaderCache []byte
30- bufferPool * pool.BufferPool
31- directIOEnabled bool
25+ file * os.File
26+ bufWriter WriteCloserFlusher
27+ currentOffset uint64
28+ compressionType int
29+ compressor compressor.CompressionI
30+ recordHeaderCache []byte
31+ bufferPool * pool.BufferPool
32+ alignedBlockWrites bool
3233}
3334
3435var DirectIOSyncWriteErr = errors .New ("currently not supporting directIO with sync writing" )
@@ -59,7 +60,7 @@ func (w *FileWriter) Open() error {
5960
6061 // we flush early to get a valid file with header written, this is important in crash scenarios
6162 // when directIO is enabled however, we can't write misaligned blocks - thus this is not executed
62- if ! w .directIOEnabled {
63+ if ! w .alignedBlockWrites {
6364 err = w .bufWriter .Flush ()
6465 if err != nil {
6566 return fmt .Errorf ("flushing header in file at '%s' failed with %w" , w .file .Name (), err )
@@ -160,9 +161,10 @@ func (w *FileWriter) Write(record []byte) (uint64, error) {
160161 return prevOffset , nil
161162}
162163
163- // WriteSync appends a record of bytes and forces a disk sync, returns the current offset this item was written to
164+ // WriteSync appends a record of bytes and forces a disk sync, returns the current offset this item was written to.
165+ // When directIO is enabled however, we can't write misaligned blocks and immediately returns DirectIOSyncWriteErr
164166func (w * FileWriter ) WriteSync (record []byte ) (uint64 , error ) {
165- if w .directIOEnabled {
167+ if w .alignedBlockWrites {
166168 return 0 , DirectIOSyncWriteErr
167169 }
168170
@@ -205,11 +207,14 @@ func (w *FileWriter) Size() uint64 {
205207// options
206208
207209type FileWriterOptions struct {
208- path string
209- file * os.File
210- compressionType int
211- bufferSizeBytes int
212- useDirectIO bool
210+ path string
211+ file * os.File
212+ compressionType int
213+ bufferSizeBytes int
214+ enableDirectIO bool
215+ enableIOUring bool
216+ ioUringNumRingEntries uint32
217+ ioUringOpts []uring.SetupOption
213218}
214219
215220type FileWriterOption func (* FileWriterOptions )
@@ -246,21 +251,35 @@ func BufferSizeBytes(p int) FileWriterOption {
246251 }
247252}
248253
249- // DirectIO is experimental: this flag enables DirectIO while writing, this currently might not work due to the misaligned allocations
254+ // DirectIO is experimental: this flag enables DirectIO while writing. This has some limitation when writing headers and
255+ // disables the ability to use WriteSync.
250256func DirectIO () FileWriterOption {
251257 return func (args * FileWriterOptions ) {
252- args .useDirectIO = true
258+ args .enableDirectIO = true
259+ }
260+ }
261+
262+ // IOUring is experimental: this flag enables async writes using io_uring. This has some limitation around platform, it
263+ // needs Linux and recent 5.x kernel to work. This currently also does not work together with DirectIO.
264+ func IOUring (numRingEntries uint32 , opts ... uring.SetupOption ) FileWriterOption {
265+ return func (args * FileWriterOptions ) {
266+ args .enableIOUring = true
267+ args .ioUringNumRingEntries = numRingEntries
268+ args .ioUringOpts = opts
253269 }
254270}
255271
256272// NewFileWriter creates a new writer with the given options, either Path or File must be supplied, compression is optional.
257273func NewFileWriter (writerOptions ... FileWriterOption ) (WriterI , error ) {
258274 opts := & FileWriterOptions {
259- path : "" ,
260- file : nil ,
261- compressionType : CompressionTypeNone ,
262- bufferSizeBytes : DefaultBufferSize ,
263- useDirectIO : false ,
275+ path : "" ,
276+ file : nil ,
277+ compressionType : CompressionTypeNone ,
278+ bufferSizeBytes : DefaultBufferSize ,
279+ enableDirectIO : false ,
280+ enableIOUring : false ,
281+ ioUringNumRingEntries : 4 ,
282+ ioUringOpts : nil ,
264283 }
265284
266285 for _ , writeOption := range writerOptions {
@@ -271,13 +290,19 @@ func NewFileWriter(writerOptions ...FileWriterOption) (WriterI, error) {
271290 return nil , errors .New ("NewFileWriter: either os.File or string path must be supplied, never both" )
272291 }
273292
293+ if opts .enableIOUring && opts .enableDirectIO {
294+ return nil , errors .New ("NewFileWriter: either directIO or io_uring must be enabled, never both" )
295+ }
296+
274297 if opts .path == "" {
275298 opts .path = opts .file .Name ()
276299 }
277300
278301 var factory ReaderWriterCloserFactory
279- if opts .useDirectIO {
302+ if opts .enableDirectIO {
280303 factory = DirectIOFactory {}
304+ } else if opts .enableIOUring {
305+ factory = NewIOUringFactory (opts .ioUringNumRingEntries , opts .ioUringOpts ... )
281306 } else {
282307 factory = BufferedIOFactory {}
283308 }
@@ -294,18 +319,18 @@ func NewFileWriter(writerOptions ...FileWriterOption) (WriterI, error) {
294319 if err != nil {
295320 return nil , fmt .Errorf ("failed to create new Writer at '%s' failed with %w" , opts .path , err )
296321 }
297- return newCompressedFileWriterWithFile (file , writer , opts .compressionType , opts .useDirectIO )
322+ return newCompressedFileWriterWithFile (file , writer , opts .compressionType , opts .enableDirectIO )
298323}
299324
300325// creates a new writer with the given os.File, with the desired compression
301- func newCompressedFileWriterWithFile (file * os.File , bufWriter WriteCloserFlusher , compType int , directIOEnabled bool ) (WriterI , error ) {
326+ func newCompressedFileWriterWithFile (file * os.File , bufWriter WriteCloserFlusher , compType int , alignedBlockWrites bool ) (WriterI , error ) {
302327 return & FileWriter {
303- file : file ,
304- bufWriter : bufWriter ,
305- directIOEnabled : directIOEnabled ,
306- open : false ,
307- closed : false ,
308- compressionType : compType ,
309- currentOffset : 0 ,
328+ file : file ,
329+ bufWriter : bufWriter ,
330+ alignedBlockWrites : alignedBlockWrites ,
331+ open : false ,
332+ closed : false ,
333+ compressionType : compType ,
334+ currentOffset : 0 ,
310335 }, nil
311336}
0 commit comments