Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions code/go/0chain.net/blobbercore/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ type Config struct {
PebbleCache int64
PebbleMemtableSize int64
PebbleMaxOpenFiles int

// EnableDirectIO enables O_DIRECT flag for file operations (bypasses OS cache)
EnableDirectIO bool
}

/*Configuration of the system */
Expand Down Expand Up @@ -321,6 +324,8 @@ func ReadConfig(deploymentMode int) {
Configuration.PebbleCache = viper.GetInt64("kv.pebble_cache")
Configuration.PebbleMemtableSize = viper.GetInt64("kv.pebble_memtable_size")
Configuration.PebbleMaxOpenFiles = viper.GetInt("kv.pebble_max_open_files")

Configuration.EnableDirectIO = viper.GetBool("storage.enable_direct_io")
}

// StorageSCConfiguration will include all the required sc configs to operate blobber
Expand Down
125 changes: 120 additions & 5 deletions code/go/0chain.net/blobbercore/filestore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ import (
"sync"
"syscall"
"time"
"unsafe"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/config"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/encryption"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
Expand All @@ -61,10 +63,44 @@ const (
ThumbnailSuffix = "_thumbnail"
)

// getSectorSize returns the sector size for the given file path.
// It tries BLKSSZGET ioctl for block devices, and falls back to statfs for regular files.
func getSectorSize(path string) (int, error) {
f, err := os.Open(path)
if err != nil {
return 0, err
}
defer f.Close()

const BLKSSZGET = 0x1268
var sectorSize int32
_, _, errno := syscall.Syscall(
syscall.SYS_IOCTL,
f.Fd(),
BLKSSZGET,
uintptr(unsafe.Pointer(&sectorSize)),
)
if errno == 0 && sectorSize > 0 {
return int(sectorSize), nil
}

// Fallback: use statfs for regular files
var stat syscall.Statfs_t
err = syscall.Statfs(path, &stat)
if err != nil {
return 0, err
}
if stat.Bsize > 0 {
return int(stat.Bsize), nil
}
return 512, nil // fallback default
}

func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, infile multipart.File) (*FileOutputData, error) {
tempFilePath := fs.getTempPathForFile(allocID, fileData.Name, fileData.FilePathHash, conID)
var (
initialSize int64
writtenSize int64
)
finfo, err := os.Stat(tempFilePath)
if err != nil && !errors.Is(err, os.ErrNotExist) {
Expand All @@ -77,7 +113,28 @@ func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, i
if err = createDirs(filepath.Dir(tempFilePath)); err != nil {
return nil, common.NewError("dir_creation_error", err.Error())
}
f, err := os.OpenFile(tempFilePath, os.O_CREATE|os.O_RDWR, 0644)

useDirectIO := false
var f *os.File
var sectorSize int
if config.Configuration.EnableDirectIO {
fd, derr := unix.Open(tempFilePath, unix.O_WRONLY|unix.O_CREAT|unix.O_DIRECT, 0644)
if derr == nil {
useDirectIO = true
f, err = os.NewFile(uintptr(fd), tempFilePath), nil
// Get sector size for alignment
sectorSize, err = getSectorSize(tempFilePath)
if err != nil || sectorSize <= 0 {
sectorSize = 512 // fallback
}
} else {
logging.Logger.Warn("O_DIRECT not supported, falling back to regular file operations", zap.String("file", tempFilePath), zap.Error(derr))
}
}
if !useDirectIO {
f, err = os.OpenFile(tempFilePath, os.O_CREATE|os.O_RDWR, 0644)
sectorSize = 0 // not needed
}
if err != nil {
return nil, common.NewError("file_open_error", err.Error())
}
Expand All @@ -87,10 +144,32 @@ func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, i
if err != nil {
return nil, common.NewError("file_seek_error", err.Error())
}
buf := make([]byte, BufferSize)
writtenSize, err := io.CopyBuffer(f, infile, buf)
if err != nil {
return nil, common.NewError("file_write_error", err.Error())

if useDirectIO {
// Round BufferSize up to the next multiple of sectorSize for O_DIRECT alignment requirements
// This ensures the buffer is properly aligned for direct I/O
alignedBufSize := BufferSize
if sectorSize > 0 {
alignedBufSize = (BufferSize + sectorSize - 1) &^ (sectorSize - 1)
}
alignedBuf := make([]byte, alignedBufSize)
writtenSize, err = copyWithDirectIO(f, infile, alignedBuf, BufferSize, sectorSize)
if err != nil {
return nil, common.NewError("file_write_error", err.Error())
}
// Truncate the file to the actual data size to remove padding
if writtenSize > 0 {
err = f.Truncate(fileData.UploadOffset + writtenSize)
if err != nil {
return nil, common.NewError("file_truncate_error", err.Error())
}
}
} else {
buf := make([]byte, BufferSize)
writtenSize, err = io.CopyBuffer(f, infile, buf)
if err != nil {
return nil, common.NewError("file_write_error", err.Error())
}
}

finfo, err = f.Stat()
Expand Down Expand Up @@ -1071,3 +1150,39 @@ func sanitizeFileName(fileName string) string {
fileName = filepath.Base(fileName)
return fileName
}

// copyWithDirectIO performs I/O operations that are compatible with O_DIRECT
// O_DIRECT requires aligned buffers and aligned I/O operations
func copyWithDirectIO(dst io.Writer, src io.Reader, buf []byte, maxBufferSize int, alignment int) (int64, error) {
var totalWritten int64

for {
n, err := src.Read(buf[:maxBufferSize])
if n > 0 {
alignedSize := (n + alignment - 1) &^ (alignment - 1)
if alignedSize > len(buf) {
alignedSize = len(buf)
}
if alignedSize > n {
for i := n; i < alignedSize; i++ {
buf[i] = 0
}
}
written, writeErr := dst.Write(buf[:alignedSize])
if written > n {
written = n // Don't count padding bytes
}
totalWritten += int64(written)
if writeErr != nil {
return totalWritten, writeErr
}
}
if err != nil {
if err == io.EOF {
break
}
return totalWritten, err
}
}
return totalWritten, nil
}
134 changes: 134 additions & 0 deletions code/go/0chain.net/blobbercore/filestore/storage_benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package filestore

import (
"flag"
"fmt"
"math/rand"
"mime/multipart"
"os"
"path/filepath"
"sync"
"testing"
"time"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/config"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
gozap "go.uber.org/zap"
)

var enableDirectIO = flag.Bool("enable_directio", false, "Enable O_DIRECT/direct I/O for WriteFile benchmark")

var minFileSize = flag.Float64("min_file_size", 1.0, "Minimum file size in MB (can be fractional, e.g., 0.01 for 10KB)")
var maxFileSize = flag.Float64("max_file_size", 10.0, "Maximum file size in MB (can be fractional)")
var nFiles = flag.Int("n_files", 5000, "Number of files to generate")

// Helper to generate a random file of given size with a unique index
func generateRandomFileWithIndex(path string, size int64, idx int) error {
f, err := os.Create(path)
if err != nil {
return err
}
defer f.Close()

buf := make([]byte, 1024*1024) // 1MB buffer
var written int64
rng := rand.New(rand.NewSource(time.Now().UnixNano() + int64(idx)))
for written < size-8 {
n := int64(len(buf))
if size-8-written < n {
n = size - 8 - written
}
_, _ = rng.Read(buf[:n])
_, err := f.Write(buf[:n])
if err != nil {
return err
}
written += n
}
// Write the index as the last 8 bytes
idxBytes := []byte(fmt.Sprintf("%08d", idx))
_, err = f.Write(idxBytes)
return err
}

// generateRandomFilesInRange generates n files in dir with random sizes between minMB and maxMB (in MB, float64),
// each file has a unique index in its name and content. Returns the list of file paths and their sizes.
func generateRandomFilesInRange(dir string, n int, minMB, maxMB float64) ([]string, []int64, error) {
if minMB > maxMB || minMB <= 0 || n <= 0 {
return nil, nil, fmt.Errorf("invalid input parameters")
}
files := make([]string, n)
sizes := make([]int64, n)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < n; i++ {
sizeMB := minMB + rng.Float64()*(maxMB-minMB)
size := int64(sizeMB * 1024 * 1024)
if size < 1 {
size = 1 // at least 1 byte
}
files[i] = filepath.Join(dir, fmt.Sprintf("src_%d.data", i))
sizes[i] = size
if err := generateRandomFileWithIndex(files[i], size, i); err != nil {
return nil, nil, fmt.Errorf("failed to generate file %d: %w", i, err)
}
}
return files, sizes, nil
}

// Minimal config and logger setup for the test
func setupTestConfigAndLogger() {
// Minimal config
config.Configuration = config.Config{
EnableDirectIO: *enableDirectIO,
}
logging.Logger, _ = gozap.NewDevelopment() // Or zap.NewNop() for no output
}

func BenchmarkWriteFile_O_DIRECT_Batch(b *testing.B) {
tmpDir := b.TempDir()

// Generate files of random size given on the range of min and max file size
srcFiles, _, err := generateRandomFilesInRange(tmpDir, *nFiles, *minFileSize, *maxFileSize)
if err != nil {
b.Fatalf("failed to generate files: %v", err)
}

// Prepare FileStore (mock as needed)
fs := &FileStore{
mp: tmpDir,
mAllocs: make(map[string]*allocation),
rwMU: &sync.RWMutex{},
}

b.ResetTimer()
setupTestConfigAndLogger()
for bench := 0; bench < b.N; bench++ {
writtenFiles := make([]string, *nFiles)
for i := 0; i < *nFiles; i++ {
src, err := os.Open(srcFiles[i])
if err != nil {
b.Fatalf("failed to open src file %d: %v", i, err)
}
fileName := fmt.Sprintf("testfile_%d.data", i)
fileData := &FileInputData{
Name: fileName,
Path: "/" + fileName,
FilePathHash: fmt.Sprintf("dummyhash_%d", i),
// Size: fileSize,
}
var infile multipart.File = src

_, err = fs.WriteFile("allocid", "conid", fileData, infile)
src.Close()
if err != nil {
b.Fatalf("WriteFile failed for file %d: %v", i, err)
}
writtenFiles[i] = fs.getTempPathForFile("allocid", fileData.Name, fileData.FilePathHash, "conid")
}
// Clean up written files after each batch
for _, f := range writtenFiles {
os.Remove(f)
}
}
b.StopTimer()
}
Loading
Loading