Skip to content

Commit

Permalink
Merge pull request #1168 from swordqiu/feature/qj-object-parallel
Browse files Browse the repository at this point in the history
fix: download/upload/copy large objects in parallel
  • Loading branch information
swordqiu authored Jan 25, 2025
2 parents eb7eb31 + 830150b commit 8e6d4dd
Show file tree
Hide file tree
Showing 2 changed files with 288 additions and 63 deletions.
309 changes: 264 additions & 45 deletions pkg/cloudprovider/objectstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"yunion.io/x/jsonutils"
"yunion.io/x/log"
"yunion.io/x/pkg/errors"
"yunion.io/x/pkg/util/streamutils"
"yunion.io/x/s3cli"
)

Expand Down Expand Up @@ -185,6 +187,7 @@ type SListObjectResult struct {
IsTruncated bool
}

// range start from 0
type SGetObjectRange struct {
Start int64
End int64
Expand Down Expand Up @@ -492,6 +495,28 @@ func Makedir(ctx context.Context, bucket ICloudBucket, key string) error {
}

func UploadObject(ctx context.Context, bucket ICloudBucket, key string, blocksz int64, input io.Reader, sizeBytes int64, cannedAcl TBucketACLType, storageClass string, meta http.Header, debug bool) error {
return UploadObjectParallel(ctx, bucket, key, blocksz, input, sizeBytes, cannedAcl, storageClass, meta, debug, false)
}

func uploadPartOfMultipart(ctx context.Context, bucket ICloudBucket, key string, input io.Reader, sizeBytes int64, uploadId string, partIndex int, partSize int64, offset int64, debug bool) (string, error) {
var startAt time.Time
if debug {
startAt = time.Now()
log.Debugf("UploadPart %d %d", partIndex+1, partSize)
}
etag, err := bucket.UploadPart(ctx, key, uploadId, partIndex+1, io.LimitReader(input, partSize), partSize, offset, sizeBytes)
if err != nil {
return "", errors.Wrapf(err, "bucket.UploadPart %d", partIndex)
}
if debug {
duration := time.Since(startAt)
rateMbps := calculateRateMbps(partSize, duration)
log.Debugf("End of uploadPart %d %d takes %d seconds at %fMbps", partIndex+1, partSize, duration/time.Second, rateMbps)
}
return etag, nil
}

func UploadObjectParallel(ctx context.Context, bucket ICloudBucket, key string, blocksz int64, input io.Reader, sizeBytes int64, cannedAcl TBucketACLType, storageClass string, meta http.Header, debug bool, parallel bool) error {
if blocksz <= 0 {
blocksz = MAX_PUT_OBJECT_SIZEBYTES
}
Expand Down Expand Up @@ -524,24 +549,47 @@ func UploadObject(ctx context.Context, bucket ICloudBucket, key string, blocksz
return errors.Wrap(err, "bucket.NewMultipartUpload")
}
etags := make([]string, partCount)
offset := int64(0)
var errs []error
var wg sync.WaitGroup
for i := 0; i < int(partCount); i += 1 {
offset := int64(i) * partSize
blockSize := partSize
if i == int(partCount)-1 {
partSize = sizeBytes - partSize*(partCount-1)
blockSize = sizeBytes - partSize*(partCount-1)
}
if debug {
log.Debugf("UploadPart %d %d", i+1, partSize)
}
etag, err := bucket.UploadPart(ctx, key, uploadId, i+1, io.LimitReader(input, partSize), partSize, offset, sizeBytes)
if err != nil {
err2 := bucket.AbortMultipartUpload(ctx, key, uploadId)
if err2 != nil {
log.Errorf("bucket.AbortMultipartUpload error %s", err2)
partIndex := i
if parallel {
wg.Add(1)
go func() {
defer wg.Done()
etag, err := uploadPartOfMultipart(ctx, bucket, key, input, sizeBytes, uploadId, partIndex, blockSize, offset, debug)
if err != nil {
errs = append(errs, errors.Wrapf(err, "uploadPartOfMultipart at %d", partIndex))
} else {
etags[partIndex] = etag
}
}()
} else {
etag, err := uploadPartOfMultipart(ctx, bucket, key, input, sizeBytes, uploadId, partIndex, blockSize, offset, debug)
if err != nil {
errs = append(errs, errors.Wrapf(err, "uploadPartOfMultipart at %d", partIndex))
break
} else {
etags[partIndex] = etag
}
return errors.Wrap(err, "bucket.UploadPart")
}
offset += partSize
etags[i] = etag
}
if parallel {
wg.Wait()
}
if len(errs) > 0 {
// upload part error
err2 := bucket.AbortMultipartUpload(ctx, key, uploadId)
if err2 != nil {
log.Errorf("bucket.AbortMultipartUpload error %s", err2)
errs = append(errs, err2)
}
return errors.Wrap(errors.NewAggregate(errs), "uploadPartOfMultipart")
}
err = bucket.CompleteMultipartUpload(ctx, key, uploadId, etags)
if err != nil {
Expand Down Expand Up @@ -592,7 +640,34 @@ func MergeMeta(src http.Header, dst http.Header) http.Header {
}

func CopyObject(ctx context.Context, blocksz int64, dstBucket ICloudBucket, dstKey string, srcBucket ICloudBucket, srcKey string, dstMeta http.Header, debug bool) error {
return CopyObjectParallel(ctx, blocksz, dstBucket, dstKey, srcBucket, srcKey, dstMeta, debug, false)
}

func copyPartOfMultipart(ctx context.Context, dstBucket ICloudBucket, dstKey string, srcBucket ICloudBucket, srcKey string, rangeOpt *SGetObjectRange, sizeBytes int64, uploadId string, partIndex int, debug bool) (string, error) {
partSize := rangeOpt.SizeBytes()
var startAt time.Time
if debug {
startAt = time.Now()
log.Debugf("CopyPart %d %d range: %s (%d)", partIndex+1, partSize, rangeOpt.String(), rangeOpt.SizeBytes())
}
srcStream, err := srcBucket.GetObject(ctx, srcKey, rangeOpt)
if err != nil {
return "", errors.Wrapf(err, "srcBucket.GetObject %d", partIndex)
}
defer srcStream.Close()
etag, err := dstBucket.UploadPart(ctx, dstKey, uploadId, partIndex+1, io.LimitReader(srcStream, partSize), partSize, rangeOpt.Start, sizeBytes)
if err != nil {
return "", errors.Wrapf(err, "dstBucket.UploadPart %d", partIndex)
}
if debug {
duration := time.Since(startAt)
rateMbps := calculateRateMbps(partSize, duration)
log.Debugf("End of copyPart %d %d range: %s (%d) takes %d seconds at %fMbps", partIndex+1, partSize, rangeOpt.String(), rangeOpt.SizeBytes(), duration/time.Second, rateMbps)
}
return etag, nil
}

func CopyObjectParallel(ctx context.Context, blocksz int64, dstBucket ICloudBucket, dstKey string, srcBucket ICloudBucket, srcKey string, dstMeta http.Header, debug bool, parallel bool) error {
srcObj, err := GetIObject(srcBucket, srcKey)
if err != nil {
return errors.Wrap(err, "GetIObject")
Expand Down Expand Up @@ -639,38 +714,52 @@ func CopyObject(ctx context.Context, blocksz int64, dstBucket ICloudBucket, dstK
return errors.Wrap(err, "bucket.NewMultipartUpload")
}
etags := make([]string, partCount)
offset := int64(0)
var errs []error
var wg sync.WaitGroup
for i := 0; i < int(partCount); i += 1 {
start := int64(i) * partSize
blockSize := partSize
if i == int(partCount)-1 {
partSize = sizeBytes - partSize*(partCount-1)
blockSize = sizeBytes - partSize*(partCount-1)
}
end := start + partSize - 1
end := start + blockSize - 1
rangeOpt := SGetObjectRange{
Start: start,
End: end,
}
if debug {
log.Debugf("UploadPart %d %d range: %s (%d)", i+1, partSize, rangeOpt.String(), rangeOpt.SizeBytes())
}
srcStream, err := srcBucket.GetObject(ctx, srcKey, &rangeOpt)
if err == nil {
defer srcStream.Close()
var etag string
etag, err = dstBucket.UploadPart(ctx, dstKey, uploadId, i+1, io.LimitReader(srcStream, partSize), partSize, offset, sizeBytes)
if err == nil {
etags[i] = etag
continue
partIndex := i
if parallel {
wg.Add(1)
go func() {
defer wg.Done()
etag, err := copyPartOfMultipart(ctx, dstBucket, dstKey, srcBucket, srcKey, &rangeOpt, sizeBytes, uploadId, partIndex, debug)
if err != nil {
errs = append(errs, errors.Wrapf(err, "copyPartOfMultipart %d", partIndex))
} else {
etags[partIndex] = etag
}
}()
} else {
etag, err := copyPartOfMultipart(ctx, dstBucket, dstKey, srcBucket, srcKey, &rangeOpt, sizeBytes, uploadId, partIndex, debug)
if err != nil {
errs = append(errs, errors.Wrapf(err, "copyPartOfMultipart %d", partIndex))
break
} else {
etags[partIndex] = etag
}
}
offset += partSize
if err != nil {
err2 := dstBucket.AbortMultipartUpload(ctx, dstKey, uploadId)
if err2 != nil {
log.Errorf("bucket.AbortMultipartUpload error %s", err2)
}
return errors.Wrap(err, "bucket.UploadPart")
}
if parallel {
wg.Wait()
}
if len(errs) > 0 {
// upload part error
err2 := dstBucket.AbortMultipartUpload(ctx, dstKey, uploadId)
if err2 != nil {
log.Errorf("bucket.AbortMultipartUpload error %s", err2)
errs = append(errs, err2)
}
return errors.Wrap(errors.NewAggregate(errs), "copyPartOfMultipart")
}
err = dstBucket.CompleteMultipartUpload(ctx, dstKey, uploadId, etags)
if err != nil {
Expand All @@ -687,17 +776,7 @@ func CopyPart(ctx context.Context,
iDstBucket ICloudBucket, dstKey string, uploadId string, partNumber int,
iSrcBucket ICloudBucket, srcKey string, rangeOpt *SGetObjectRange,
) (string, error) {
srcReader, err := iSrcBucket.GetObject(ctx, srcKey, rangeOpt)
if err != nil {
return "", errors.Wrap(err, "iSrcBucket.GetObject")
}
defer srcReader.Close()

etag, err := iDstBucket.UploadPart(ctx, dstKey, uploadId, partNumber, io.LimitReader(srcReader, rangeOpt.SizeBytes()), rangeOpt.SizeBytes(), 0, 0)
if err != nil {
return "", errors.Wrap(err, "iDstBucket.UploadPart")
}
return etag, nil
return copyPartOfMultipart(ctx, iDstBucket, dstKey, iSrcBucket, srcKey, rangeOpt, 0, uploadId, partNumber, false)
}

func ObjectSetMeta(ctx context.Context,
Expand Down Expand Up @@ -854,3 +933,143 @@ func SetBucketTags(ctx context.Context, iBucket ICloudBucket, mangerId string, t
}
return ret, SetTags(ctx, iBucket, mangerId, tags, true)
}

type sOffsetWiter struct {
writerAt io.WriterAt
offset int64
}

func newWriter(output io.WriterAt, outputOffset int64) io.Writer {
return &sOffsetWiter{
writerAt: output,
offset: outputOffset,
}
}

func (ow *sOffsetWiter) Write(p []byte) (int, error) {
n, err := ow.writerAt.WriteAt(p, ow.offset)
ow.offset += int64(n)
return n, err
}

func calculateRateMbps(sizeBytes int64, duration time.Duration) float64 {
return float64(sizeBytes*8*int64(time.Second)) / float64(duration) / 1000 / 1000
}

func downloadPartOfMultipart(ctx context.Context, bucket ICloudBucket, key string, rangeOpt *SGetObjectRange, output io.Writer, partIndex int, debug bool) (int64, error) {
partSize := rangeOpt.SizeBytes()
var startAt time.Time
if debug {
startAt = time.Now()
log.Debugf("downloadPart %d %d range: %s (%d)", partIndex+1, partSize, rangeOpt.String(), rangeOpt.SizeBytes())
}
stream, err := bucket.GetObject(ctx, key, rangeOpt)
if err != nil {
return 0, errors.Wrap(err, "bucket.GetObject")
}
defer stream.Close()
prop, err := streamutils.StreamPipe(stream, output, false, nil)
if err != nil {
return 0, errors.Wrap(err, "StreamPipe")
}
if debug {
duration := time.Since(startAt)
rateMbps := calculateRateMbps(partSize, duration)
log.Debugf("End of downloadPart %d %d range: %s (%d) takes %f seconds at %fMbps", partIndex+1, partSize, rangeOpt.String(), rangeOpt.SizeBytes(), float64(duration)/float64(time.Second), rateMbps)
}
return prop.Size, nil
}

func DownloadObjectParallel(ctx context.Context, bucket ICloudBucket, key string, rangeOpt *SGetObjectRange, output io.WriterAt, outputOffset int64, blocksz int64, debug bool, parallel bool) (int64, error) {
obj, err := GetIObject(bucket, key)
if err != nil {
return 0, errors.Wrap(err, "GetIObject")
}
if blocksz <= 0 {
blocksz = MAX_PUT_OBJECT_SIZEBYTES
}
sizeBytes := obj.GetSizeBytes()
if rangeOpt == nil {
rangeOpt = &SGetObjectRange{
Start: 0,
End: sizeBytes - 1,
}
} else {
if rangeOpt.End < rangeOpt.Start {
tmp := rangeOpt.Start
rangeOpt.Start = rangeOpt.End
rangeOpt.End = tmp
}
if rangeOpt.End >= sizeBytes {
rangeOpt.End = sizeBytes - 1
}
if rangeOpt.Start < 0 {
rangeOpt.Start = 0
}
sizeBytes = rangeOpt.SizeBytes()
}
if sizeBytes < blocksz {
if debug {
log.Debugf("too small, download object in one shot")
}
size, err := downloadPartOfMultipart(ctx, bucket, key, rangeOpt, newWriter(output, outputOffset), 0, true)
if err != nil {
return 0, errors.Wrap(err, "downloadPartOfMultipart")
}
return size, nil
}
partSize := blocksz
partCount := sizeBytes / partSize
if partCount*partSize < sizeBytes {
partCount += 1
}
if debug {
log.Debugf("multipart download part count %d part size %d", partCount, partSize)
}

var errs []error
var wg sync.WaitGroup
var totalSize int64
for i := 0; i < int(partCount); i += 1 {
dstOffset := outputOffset + int64(i)*partSize
start := rangeOpt.Start + int64(i)*partSize
if i == int(partCount)-1 {
partSize = sizeBytes - partSize*(partCount-1)
}
end := start + partSize - 1
srcRangeOpt := SGetObjectRange{
Start: start,
End: end,
}

partIndex := i
if parallel {
wg.Add(1)
go func() {
defer wg.Done()
sz, err := downloadPartOfMultipart(ctx, bucket, key, &srcRangeOpt, newWriter(output, dstOffset), partIndex, true)
if err != nil {
errs = append(errs, errors.Wrapf(err, "downloadPartOfMultipart %d", partIndex))
} else {
totalSize += sz
}
}()
} else {
sz, err := downloadPartOfMultipart(ctx, bucket, key, &srcRangeOpt, newWriter(output, dstOffset), partIndex, true)
if err != nil {
errs = append(errs, errors.Wrapf(err, "downloadPartOfMultipart %d", partIndex))
break
} else {
totalSize += sz
}
}
}
if parallel {
wg.Wait()
}
if len(errs) > 0 {
return 0, errors.Wrap(errors.NewAggregate(errs), "downloadPartOfMultipart")
}

return totalSize, nil
}
Loading

0 comments on commit 8e6d4dd

Please sign in to comment.