Skip to content

Commit

Permalink
Stream on read 145 (#148)
Browse files Browse the repository at this point in the history
* removed need to copy to temp file for read/seek. now streams

* improve test coverage

* remove now-unused getObjectInput func

* simplified error messages for seek.  refactored seekTo() to be simpler and account for invalid whence. Updated test accordingly

* remove old version of seek test. add bad whence test.

* update changelog. Fixes #145

* add comments to sentinel errors
  • Loading branch information
funkyshu authored Nov 28, 2023
1 parent 70f09c4 commit 17e669e
Show file tree
Hide file tree
Showing 5 changed files with 253 additions and 176 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Fixed
- fixed #145 - Remove use of local temp file when reading/seeking from s3 files. This should improve performance by allowing streaming reads from s3 files.

## [6.9.1] - 2023-11-21
### Fixed
Expand Down
2 changes: 1 addition & 1 deletion backend/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/c2fo/vfs/v6"
)

// ValidateCopySeekPosition return ensures curren seek cursor is 0,0. This is useful to ensure it's safe to copy. A seek position
// ValidateCopySeekPosition return ensures current seek cursor is 0,0. This is useful to ensure it's safe to copy. A seek position
// elsewhere will mean a partial copy.
func ValidateCopySeekPosition(f vfs.File) error {
// validate seek is at 0,0 before doing copy
Expand Down
203 changes: 118 additions & 85 deletions backend/s3/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,19 @@ package s3

import (
"bytes"
"errors"
"fmt"
"io"
"net/url"
"os"
"path"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/aws/aws-sdk-go/service/s3/s3manager"

"github.com/c2fo/vfs/v6"
"github.com/c2fo/vfs/v6/backend"
"github.com/c2fo/vfs/v6/mocks"
"github.com/c2fo/vfs/v6/options"
"github.com/c2fo/vfs/v6/options/delete"
Expand All @@ -28,19 +26,11 @@ type File struct {
fileSystem *FileSystem
bucket string
key string
tempFile *os.File
cursorPos int64
reader io.ReadCloser
writeBuffer *bytes.Buffer
}

// Downloader interface needed to mock S3 Downloader data access object in tests
type Downloader interface {
Download(w io.WriterAt, input *s3.GetObjectInput, options ...func(downloader *s3manager.Downloader)) (n int64, err error)

DownloadWithContext(ctx aws.Context, w io.WriterAt, input *s3.GetObjectInput, options ...func(*s3manager.Downloader)) (n int64, err error)

DownloadWithIterator(ctx aws.Context, iter s3manager.BatchDownloadIterator, opts ...func(*s3manager.Downloader)) error
}

// Info Functions

// LastModified returns the LastModified property of a HEAD request to the s3 object.
Expand All @@ -66,13 +56,10 @@ func (f *File) Path() string {
// the object's HEAD through the s3 API.
func (f *File) Exists() (bool, error) {
_, err := f.getHeadObject()
code := ""
if err != nil {
code = err.(awserr.Error).Code()
}
if err != nil && (code == s3.ErrCodeNoSuchKey || code == "NotFound") {
return false, nil
} else if err != nil {
if errors.Is(err, vfs.ErrNotExist) {
return false, nil
}
return false, err
}

Expand Down Expand Up @@ -104,8 +91,8 @@ func (f *File) Location() vfs.Location {
// method if the target file is also on S3, otherwise uses io.CopyBuffer.
func (f *File) CopyToFile(file vfs.File) error {
// validate seek is at 0,0 before doing copy
if err := backend.ValidateCopySeekPosition(f); err != nil {
return err
if f.cursorPos != 0 {
return vfs.CopyToNotPossible
}

// if target is S3
Expand Down Expand Up @@ -235,18 +222,16 @@ func (f *File) Delete(opts ...options.DeleteOption) error {
// Close cleans up underlying mechanisms for reading from and writing to the file. Closes and removes the
// local temp file, and triggers a write to s3 of anything in the f.writeBuffer if it has been created.
func (f *File) Close() error {
if f.tempFile != nil {
err := f.tempFile.Close()
if err != nil {
return err
}
f.cursorPos = 0

err = os.Remove(f.tempFile.Name())
if err != nil && !os.IsNotExist(err) {
// invalidate reader
if f.reader != nil {
err := f.reader.Close()
if err != nil {
return err
}

f.tempFile = nil
f.reader = nil
}

if f.writeBuffer != nil {
Expand All @@ -273,19 +258,69 @@ func (f *File) Close() error {
// Read implements the standard for io.Reader. For this to work with an s3 file, a temporary local copy of
// the file is created, and reads work on that. This file is closed and removed upon calling f.Close()
func (f *File) Read(p []byte) (n int, err error) {
if err := f.checkTempFile(); err != nil {
// check/initialize for reader
r, err := f.getReader()
if err != nil {
return 0, err
}
return f.tempFile.Read(p)

read, err := r.Read(p)
if err != nil {
return read, err
}

f.cursorPos += int64(read)

return read, nil
}

// Seek implements the standard for io.Seeker. A temporary local copy of the s3 file is created (the same
// one used for Reads) which Seek() acts on. This file is closed and removed upon calling f.Close()
func (f *File) Seek(offset int64, whence int) (int64, error) {
if err := f.checkTempFile(); err != nil {
length, err := f.Size()
if err != nil {
return 0, err
}
return f.tempFile.Seek(offset, whence)

// update cursorPos
pos, err := seekTo(int64(length), f.cursorPos, offset, whence)
if err != nil {
return 0, err
}
f.cursorPos = pos

// invalidate reader
if f.reader != nil {
err := f.reader.Close()
if err != nil {
return 0, err
}

f.reader = nil
}

return f.cursorPos, nil
}

// seekTo is a helper function for Seek. It takes the current position, offset, whence, and length of the file
// and returns the new position. It also checks for invalid offsets and returns an error if one is found.
func seekTo(length, position, offset int64, whence int) (int64, error) {

switch whence {
default:
return 0, vfs.ErrSeekInvalidWhence
case io.SeekStart:
// this actually does nothing since the new position just becomes the offset but is here for completeness
case io.SeekCurrent:
offset += position
case io.SeekEnd:
offset += length
}
if offset < 0 {
return 0, vfs.ErrSeekInvalidOffset
}

return offset, nil
}

// Write implements the standard for io.Writer. A buffer is added to with each subsequent
Expand All @@ -305,7 +340,14 @@ func (f *File) Write(data []byte) (res int, err error) {

f.writeBuffer = bytes.NewBuffer([]byte{})
}
return f.writeBuffer.Write(data)

written, err := f.writeBuffer.Write(data)
if err != nil {
return 0, err
}
f.cursorPos += int64(written)

return written, err
}

// Touch creates a zero-length file on the vfs.File if no File exists. Update File's last modified timestamp.
Expand Down Expand Up @@ -363,7 +405,10 @@ func (f *File) getHeadObject() (*s3.HeadObjectOutput, error) {
if err != nil {
return nil, err
}
return client.HeadObject(headObjectInput)

head, err := client.HeadObject(headObjectInput)

return head, handleExistsError(err)
}

// For copy from S3-to-S3 when credentials are the same between source and target, return *s3.CopyObjectInput or error
Expand Down Expand Up @@ -424,52 +469,6 @@ func (f *File) getCopyObjectInput(targetFile *File) (*s3.CopyObjectInput, error)
return nil, nil
}

func (f *File) checkTempFile() error {
if f.tempFile == nil {
localTempFile, err := f.copyToLocalTempReader()
if err != nil {
return err
}
f.tempFile = localTempFile
}

return nil
}

func (f *File) copyToLocalTempReader() (*os.File, error) {
// Create temp file
tmpFile, err := os.CreateTemp("", fmt.Sprintf("%s.%d", f.Name(), time.Now().UnixNano()))
if err != nil {
return nil, err
}

// Create S3 Downloader, get client, and set partition size for multipart download
var partSize int64
if opts, ok := f.Location().FileSystem().(*FileSystem).options.(Options); ok {
if partSize = opts.DownloadPartitionSize; partSize == 0 {
partSize = 32 * 1024 * 1024 // 32 MB per partition default if opts.DownloadPartitionSize is 0
}
}

client, err := f.fileSystem.Client()
if err != nil {
return nil, err
}

// Download file
_, err = getDownloader(client, partSize).Download(tmpFile, f.getObjectInput())
if err != nil {
return nil, err
}

// Return temp file
return tmpFile, nil
}

func (f *File) getObjectInput() *s3.GetObjectInput {
return new(s3.GetObjectInput).SetBucket(f.bucket).SetKey(f.key)
}

// TODO: need to provide an implementation-agnostic container for providing config options such as SSE
func uploadInput(f *File) *s3manager.UploadInput {
sseType := "AES256"
Expand Down Expand Up @@ -534,8 +533,42 @@ func waitUntilFileExists(file vfs.File, retries int) error {
return nil
}

var getDownloader = func(client s3iface.S3API, partSize int64) Downloader {
return s3manager.NewDownloaderWithClient(client, func(d *s3manager.Downloader) {
d.PartSize = partSize
})
func (f *File) getReader() (io.ReadCloser, error) {
if f.reader == nil {
// Create the request to get the object
input := new(s3.GetObjectInput).
SetBucket(f.bucket).
SetKey(f.key).
SetRange(fmt.Sprintf("bytes=%d-", f.cursorPos))

// Get the client
client, err := f.fileSystem.Client()
if err != nil {
return nil, err
}

// Request the object
result, err := client.GetObject(input)
if err != nil {
return nil, err
}

// Set the reader to the body of the object
f.reader = result.Body
}
return f.reader, nil
}

func handleExistsError(err error) error {
if err != nil {
var awsErr awserr.Error
if errors.As(err, &awsErr) {
switch awsErr.Code() {
case s3.ErrCodeNoSuchKey, s3.ErrCodeNoSuchBucket, "NotFound":
return vfs.ErrNotExist
}
}
return err
}
return nil
}
Loading

0 comments on commit 17e669e

Please sign in to comment.