Skip to content

Commit

Permalink
inprogress
Browse files Browse the repository at this point in the history
  • Loading branch information
huangnauh committed Oct 31, 2024
1 parent d50bd44 commit ef91080
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 26 deletions.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,18 @@ upx put ./video /myfiles
upx put . --err-log=err.log
```

### 上传大文件的过程中同时下载

```
upx put --in-progress ./file_1G ./inprogress/file_1G
```

```
upx get --in-progress /inprogress/file_1G inprogress/file_1G
```



## upload
> 上传文件或目录,支持多文件,文件名匹配
Expand Down
8 changes: 7 additions & 1 deletion commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,15 +307,19 @@ func NewGetCommand() cli.Command {
PrintErrorAndExit("max concurrent threads must between (1 - 10)")
}
if mc.Start != "" || mc.End != "" {
if c.Bool("in-progress") {
PrintErrorAndExit("get %s: --in-progress and -start/-end can't be used together", upPath)
}
session.GetStartBetweenEndFiles(upPath, localPath, mc, c.Int("w"))
} else {
session.Get(upPath, localPath, mc, c.Int("w"), c.Bool("c"))
session.Get(upPath, localPath, mc, c.Int("w"), c.Bool("c"), c.Bool("in-progress"))
}
return nil
},
Flags: []cli.Flag{
cli.IntFlag{Name: "w", Usage: "max concurrent threads (1-10)", Value: 5},
cli.BoolFlag{Name: "c", Usage: "continue download, Resume Broken Download"},
cli.BoolFlag{Name: "in-progress", Usage: "download the file being uploaded"},
cli.StringFlag{Name: "mtime", Usage: "file's data was last modified n*24 hours ago, same as linux find command."},
cli.StringFlag{Name: "start", Usage: "file download range starting location"},
cli.StringFlag{Name: "end", Usage: "file download range ending location"},
Expand Down Expand Up @@ -358,11 +362,13 @@ func NewPutCommand() cli.Command {
upPath,
c.Int("w"),
c.Bool("all"),
c.Bool("in-progress"),
)
return nil
},
Flags: []cli.Flag{
cli.IntFlag{Name: "w", Usage: "max concurrent threads", Value: 5},
cli.BoolFlag{Name: "in-progress", Usage: "upload a file that can be downloaded simultaneously"},
cli.BoolFlag{Name: "all", Usage: "upload all files including hidden files"},
cli.StringFlag{Name: "err-log", Usage: "upload file error log to file"},
},
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/fatih/color v1.15.0
github.com/stretchr/testify v1.8.4
github.com/syndtr/goleveldb v1.0.0
github.com/upyun/go-sdk/v3 v3.0.4
github.com/upyun/go-sdk/v3 v3.0.5-0.20241031024504-de08aa91940c
github.com/urfave/cli v1.22.12
github.com/vbauerster/mpb/v8 v8.5.2
golang.org/x/term v0.22.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/upyun/go-sdk/v3 v3.0.4 h1:2DCJa/Yi7/3ZybT9UCPATSzvU3wpPPxhXinNlb1Hi8Q=
github.com/upyun/go-sdk/v3 v3.0.4/go.mod h1:P/SnuuwhrIgAVRd/ZpzDWqCsBAf/oHg7UggbAxyZa0E=
github.com/upyun/go-sdk/v3 v3.0.5-0.20241031024504-de08aa91940c h1:Wer/AWhipz0U88vbivpDgrlWc0H1MgF5FOHu+rFnbG4=
github.com/upyun/go-sdk/v3 v3.0.5-0.20241031024504-de08aa91940c/go.mod h1:xtmsshnvsTP8h8iqA3+L0NWqFEJc5tUJLGXHVUVzLs8=
github.com/urfave/cli v1.22.12 h1:igJgVw1JdKH+trcLWLeLwZjU9fEfPesQ+9/e4MQ44S8=
github.com/urfave/cli v1.22.12/go.mod h1:sSBEIC79qR6OvcmsD4U3KABeOTxDqQtdDnaFuUN30b8=
github.com/vbauerster/mpb/v8 v8.5.2 h1:zanzt1cZpSEG5uGNYKcv43+97f0IgEnXpuBFaMxKbM0=
Expand Down
66 changes: 44 additions & 22 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/fs"
"io/ioutil"
"log"
Expand Down Expand Up @@ -48,8 +49,9 @@ type Session struct {
updriver *upyun.UpYun
color bool

scores map[int]int
smu sync.RWMutex
scores map[int]int
smu sync.RWMutex
multipart bool

taskChan chan interface{}
}
Expand Down Expand Up @@ -319,7 +321,7 @@ func (sess *Session) getDir(upPath, localPath string, match *MatchConfig, worker
}

for i := 1; i <= MaxRetry; i++ {
e = sess.getFileWithProgress(fpath, lpath, fInfo, 1, isContinue)
e = sess.getFileWithProgress(fpath, lpath, fInfo, 1, isContinue, false)
if e == nil {
break
}
Expand Down Expand Up @@ -349,7 +351,7 @@ func (sess *Session) getDir(upPath, localPath string, match *MatchConfig, worker
return err
}

func (sess *Session) getFileWithProgress(upPath, localPath string, upInfo *upyun.FileInfo, works int, resume bool) error {
func (sess *Session) getFileWithProgress(upPath, localPath string, upInfo *upyun.FileInfo, works int, resume, inprogress bool) error {
var err error

var bar *mpb.Bar
Expand All @@ -376,12 +378,16 @@ func (sess *Session) getFileWithProgress(upPath, localPath string, upInfo *upyun
works,
func(start, end int64) ([]byte, error) {
var buffer bytes.Buffer
headers := map[string]string{
"Range": fmt.Sprintf("bytes=%d-%d", start, end),
}
if inprogress {
headers["X-Upyun-Multi-In-Progress"] = "true"
}
_, err = sess.updriver.Get(&upyun.GetObjectConfig{
Path: sess.AbsPath(upPath),
Writer: &buffer,
Headers: map[string]string{
"Range": fmt.Sprintf("bytes=%d-%d", start, end),
},
Path: sess.AbsPath(upPath),
Writer: &buffer,
Headers: headers,
})
return buffer.Bytes(), err
},
Expand All @@ -396,9 +402,14 @@ func (sess *Session) getFileWithProgress(upPath, localPath string, upInfo *upyun
return err
}

func (sess *Session) Get(upPath, localPath string, match *MatchConfig, workers int, resume bool) {
func (sess *Session) Get(upPath, localPath string, match *MatchConfig, workers int, resume, inprogress bool) {
upPath = sess.AbsPath(upPath)
upInfo, err := sess.updriver.GetInfo(upPath)
headers := map[string]string{}
if inprogress {
headers["X-Upyun-Multi-In-Progress"] = "true"
resume = true
}
upInfo, err := sess.updriver.GetInfoWithHeaders(upPath, headers)
if err != nil {
PrintErrorAndExit("getinfo %s: %v", upPath, err)
}
Expand All @@ -414,6 +425,9 @@ func (sess *Session) Get(upPath, localPath string, match *MatchConfig, workers i
}

if upInfo.IsDir {
if inprogress {
PrintErrorAndExit("get: %s is a directory", localPath)
}
if exist {
if !isDir {
PrintErrorAndExit("get: %s Not a directory", localPath)
Expand All @@ -432,10 +446,10 @@ func (sess *Session) Get(upPath, localPath string, match *MatchConfig, workers i
}

// 小于 100M 不开启多线程
if upInfo.Size < 1024*1024*100 {
if upInfo.Size < 1024*1024*100 || inprogress {
workers = 1
}
err := sess.getFileWithProgress(upPath, localPath, upInfo, workers, resume)
err := sess.getFileWithProgress(upPath, localPath, upInfo, workers, resume, inprogress)
if err != nil {
PrintErrorAndExit(err.Error())
}
Expand Down Expand Up @@ -483,7 +497,7 @@ func (sess *Session) GetStartBetweenEndFiles(upPath, localPath string, match *Ma
for fInfo := range fInfoChan {
fp := filepath.Join(fpath, fInfo.Name)
if (fp >= startList || startList == "") && (fp < endList || endList == "") {
sess.Get(fp, localPath, match, workers, false)
sess.Get(fp, localPath, match, workers, false, false)
} else if strings.HasPrefix(startList, fp) {
//前缀相同进入下一级文件夹,继续递归判断
if fInfo.IsDir {
Expand Down Expand Up @@ -516,16 +530,22 @@ func (sess *Session) putFileWithProgress(localPath, upPath string, localInfo os.
if IsVerbose {
if localInfo.Size() > 0 {
bar = processbar.ProcessBar.AddBar(upPath, localInfo.Size())
cfg.Reader = NewFileWrappedReader(bar, fd)
cfg.ProxyReader = func(offset int64, r io.Reader) io.Reader {
if offset > 0 {
bar.SetCurrent(offset)
}
return bar.ProxyReader(r)
}
}
} else {
log.Printf("file: %s, Start\n", upPath)
if localInfo.Size() >= MinResumePutFileSize {
cfg.UseResumeUpload = true
cfg.ResumePartSize = DefaultBlockSize
cfg.MaxResumePutTries = DefaultResumeRetry
}
}
if localInfo.Size() >= MinResumePutFileSize || sess.multipart {
cfg.UseResumeUpload = true
cfg.ResumePartSize = DefaultBlockSize
cfg.MaxResumePutTries = DefaultResumeRetry
}

err = sess.updriver.Put(cfg)
if bar != nil {
bar.EnableTriggerComplete()
Expand Down Expand Up @@ -688,9 +708,11 @@ func (sess *Session) putDir(localPath, upPath string, workers int, withIgnore bo
}

// / Put 上传单文件或单目录
func (sess *Session) Put(localPath, upPath string, workers int, withIgnore bool) {
func (sess *Session) Put(localPath, upPath string, workers int, withIgnore, inprogress bool) {
upPath = sess.AbsPath(upPath)

if inprogress {
sess.multipart = true
}
exist, isDir := false, false
if upInfo, _ := sess.updriver.GetInfo(upPath); upInfo != nil {
exist = true
Expand Down

0 comments on commit ef91080

Please sign in to comment.