Skip to content

Commit e37122b

Browse files
authored
fix: sdk rmdir bug (#836)
* fix: sdk rmdir bug * fix: sdk rmdir bug * fix: sdk rmdir bug && s3 mpu bug && data cache bug * fix: sdk rmdir bug && s3 mpu bug && data cache bug * fix: sdk rmdir bug && s3 mpu bug && data cache bug * fix: sdk rmdir bug && s3 mpu bug && data cache bug * fix: sdk rmdir bug && s3 mpu bug && data cache bug * fix: sdk rmdir bug && s3 mpu bug && data cache bug
1 parent a0d7805 commit e37122b

File tree

14 files changed

+230
-63
lines changed

14 files changed

+230
-63
lines changed

README_EN.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ Please refer to the quick start guide and start using PaddleFlow now
3232
## Command reference
3333
see [client command inference](docs/zh_cn/reference/client_command_reference.md) get all commands and examples.
3434
## Python sdk reference
35-
see [sdk inference](docs/zh_cn/reference/sdk_reference.md) get the sdk user manual.
35+
see [sdk inference](docs/zh_cn/reference/sdk_reference/sdk_reference.md) get the sdk user manual.
3636
## Other detailed reference
3737
[Pipeline](docs/zh_cn/reference/pipeline/overview.md) <br>
3838
[Job](docs/zh_cn/reference/job_reference.md) <br>

cmd/fs/fuse/flag/flags.go

+5-20
Original file line numberDiff line numberDiff line change
@@ -113,13 +113,13 @@ func CacheFlags(fuseConf *fuse.FuseConfig) []cli.Flag {
113113
&cli.DurationFlag{
114114
Name: "attr-timeout",
115115
Value: 1 * time.Second,
116-
Usage: "attribute cache TTL",
116+
Usage: "attribute cache TTL in kernel",
117117
Destination: &fuseConf.AttrTimeout,
118118
},
119119
&cli.DurationFlag{
120120
Name: "entry-timeout",
121121
Value: 1 * time.Second,
122-
Usage: "entry cache TTL",
122+
Usage: "entry cache TTL in kernel",
123123
Destination: &fuseConf.EntryTimeout,
124124
},
125125
&cli.IntFlag{
@@ -171,16 +171,6 @@ func MountFlags(fuseConf *fuse.FuseConfig) []cli.Flag {
171171

172172
func LinkFlags() []cli.Flag {
173173
return []cli.Flag{
174-
&cli.StringFlag{
175-
Name: "link-root",
176-
Value: "",
177-
Usage: "local root for mock link",
178-
},
179-
&cli.StringFlag{
180-
Name: "link-path",
181-
Value: "",
182-
Usage: "fs path for link",
183-
},
184174
&cli.BoolFlag{
185175
Name: "skip-check-links",
186176
Value: true,
@@ -204,11 +194,12 @@ func UserFlags(fuseConf *fuse.FuseConfig) []cli.Flag {
204194
&cli.StringFlag{
205195
Name: "user-name",
206196
Value: "root",
207-
Usage: "username",
197+
Usage: "fs server api username",
208198
},
209199
&cli.StringFlag{
210200
Name: "password",
211-
Usage: "fs server password for fs username",
201+
Value: "paddleflow",
202+
Usage: "fs server api password for fs username",
212203
},
213204
&cli.BoolFlag{
214205
Name: "allow-other",
@@ -227,12 +218,6 @@ func UserFlags(fuseConf *fuse.FuseConfig) []cli.Flag {
227218
Usage: "gid given to replace default gid",
228219
Destination: &fuseConf.Gid,
229220
},
230-
&cli.BoolFlag{
231-
Name: "raw-owner",
232-
Value: false,
233-
Usage: "show the same uid and gid to ufs",
234-
Destination: &fuseConf.RawOwner,
235-
},
236221
}
237222
}
238223

cmd/fs/fuse/flag/flags_test.go

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package flag
2+
3+
import (
4+
"reflect"
5+
"testing"
6+
7+
"github.com/PaddlePaddle/PaddleFlow/pkg/fs/client/fuse"
8+
)
9+
10+
func TestBasicFlags(t *testing.T) {
11+
tests := []struct {
12+
name string
13+
want int
14+
}{
15+
{
16+
name: "num",
17+
want: 7,
18+
},
19+
}
20+
for _, tt := range tests {
21+
t.Run(tt.name, func(t *testing.T) {
22+
if got := BasicFlags(); !reflect.DeepEqual(len(got), tt.want) {
23+
t.Errorf("BasicFlags() = %v, want %v", len(got), tt.want)
24+
}
25+
})
26+
}
27+
}
28+
29+
func TestCacheFlags(t *testing.T) {
30+
type args struct {
31+
fuseConf *fuse.FuseConfig
32+
}
33+
tests := []struct {
34+
name string
35+
args args
36+
want int
37+
}{
38+
{
39+
name: "cache num",
40+
args: args{
41+
fuseConf: fuse.FuseConf,
42+
},
43+
want: 12,
44+
},
45+
}
46+
for _, tt := range tests {
47+
t.Run(tt.name, func(t *testing.T) {
48+
if got := CacheFlags(tt.args.fuseConf); !reflect.DeepEqual(len(got), tt.want) {
49+
t.Errorf("CacheFlags() = %v, want %v", len(got), tt.want)
50+
}
51+
})
52+
}
53+
}

cmd/fs/fuse/service/mount.go

+1-11
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
_ "net/http/pprof"
2626
"os"
2727
"os/signal"
28-
"path"
2928
"strconv"
3029
"strings"
3130
"syscall"
@@ -142,7 +141,7 @@ func setup(c *cli.Context) error {
142141
metricsAddr := exposeMetricsService(c.String("server"), c.Int("metrics-service-port"))
143142
log.Debugf("mount opts: %+v, metricsAddr: %s", opts, metricsAddr)
144143
}
145-
if c.Bool("pprof-enable") {
144+
if c.Int("pprof-port") != 0 {
146145
go func() {
147146
http.ListenAndServe(fmt.Sprintf(":%d", c.Int("pprof-port")), nil)
148147
}()
@@ -260,15 +259,6 @@ func InitVFS(c *cli.Context, registry *prometheus.Registry) error {
260259
UfsType: common.LocalType,
261260
SubPath: localRoot,
262261
}
263-
linkPath, linkRoot := c.String("link-path"), c.String("link-root")
264-
if linkPath != "" && linkRoot != "" {
265-
links = map[string]common.FSMeta{
266-
path.Clean(linkPath): common.FSMeta{
267-
UfsType: common.LocalType,
268-
SubPath: linkRoot,
269-
},
270-
}
271-
}
272262
} else if c.String(schema.FuseKeyFsInfo) != "" {
273263
fs, err := utils.ProcessFSInfo(c.String(schema.FuseKeyFsInfo))
274264
if err != nil {

pkg/fs/client/cache/cache.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,12 @@ func (r *rCache) readFromReadAhead(off int64, buf []byte) (bytesRead int, err er
8585
}
8686
nread, err = readAheadBuf.ReadAt(uint64(blockOff), buf[bytesRead:])
8787
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
88-
break
88+
return 0, err
8989
}
9090
bytesRead += nread
9191
blockOff += nread
9292

93-
if nread == 0 {
93+
if nread == 0 || err == io.EOF || err == io.ErrUnexpectedEOF {
9494
break
9595
}
9696
}
@@ -181,6 +181,7 @@ func (r *rCache) ReadAt(buf []byte, off int64) (n int, err error) {
181181
log.Debugf("read buffers map %v", len(r.buffers))
182182
if err == nil {
183183
n, err = r.readFromReadAhead(off, buf)
184+
log.Debugf("readFromReadAhead n is %v err %v", n, err)
184185
return
185186
} else {
186187
log.Errorf("read ahead err is %v", err)

pkg/fs/client/cache/cache_test.go

+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package cache
2+
3+
import (
4+
"fmt"
5+
. "github.com/agiledragon/gomonkey/v2"
6+
"reflect"
7+
"sync"
8+
"testing"
9+
10+
"github.com/PaddlePaddle/PaddleFlow/pkg/fs/client/ufs"
11+
)
12+
13+
func Test_rCache_readFromReadAhead(t *testing.T) {
14+
type fields struct {
15+
id string
16+
flags uint32
17+
length int
18+
store *store
19+
ufs ufs.UnderFileStorage
20+
buffers ReadBufferMap
21+
bufferPool *BufferPool
22+
lock sync.RWMutex
23+
seqReadAmount uint64
24+
}
25+
type args struct {
26+
off int64
27+
buf []byte
28+
}
29+
tests := []struct {
30+
name string
31+
fields fields
32+
args args
33+
wantBytesRead int
34+
wantErr bool
35+
}{
36+
{
37+
name: "test read err",
38+
fields: fields{
39+
store: &store{
40+
conf: Config{BlockSize: 1},
41+
},
42+
id: "xx",
43+
length: 10,
44+
buffers: ReadBufferMap{1: &ReadBuffer{}},
45+
},
46+
wantBytesRead: 0,
47+
wantErr: true,
48+
args: args{
49+
off: 1,
50+
buf: make([]byte, 12),
51+
},
52+
},
53+
}
54+
for _, tt := range tests {
55+
t.Run(tt.name, func(t *testing.T) {
56+
r := &rCache{
57+
id: tt.fields.id,
58+
flags: tt.fields.flags,
59+
length: tt.fields.length,
60+
store: tt.fields.store,
61+
ufs: tt.fields.ufs,
62+
buffers: tt.fields.buffers,
63+
bufferPool: tt.fields.bufferPool,
64+
lock: tt.fields.lock,
65+
seqReadAmount: tt.fields.seqReadAmount,
66+
}
67+
if tt.name == "test read err" {
68+
var p1 = ApplyMethod(reflect.TypeOf(&ReadBuffer{}), "ReadAt",
69+
func(_ *ReadBuffer, offset uint64, p []byte) (n int, err error) {
70+
return 0, fmt.Errorf("read fail")
71+
})
72+
defer p1.Reset()
73+
}
74+
gotBytesRead, err := r.readFromReadAhead(tt.args.off, tt.args.buf)
75+
if (err != nil) != tt.wantErr {
76+
t.Errorf("readFromReadAhead() error = %v, wantErr %v", err, tt.wantErr)
77+
return
78+
}
79+
if gotBytesRead != tt.wantBytesRead {
80+
t.Errorf("readFromReadAhead() gotBytesRead = %v, want %v", gotBytesRead, tt.wantBytesRead)
81+
}
82+
})
83+
}
84+
}

pkg/fs/client/fs/file.go

+1-14
Original file line numberDiff line numberDiff line change
@@ -43,20 +43,6 @@ type FileInfo struct {
4343
sys interface{}
4444
}
4545

46-
func NewFileInfo(attr *base.FileInfo) FileInfo {
47-
if attr == nil {
48-
return FileInfo{}
49-
}
50-
return FileInfo{
51-
path: attr.Name,
52-
size: attr.Size,
53-
mtime: attr.Mtime,
54-
isDir: attr.IsDir,
55-
mode: attr.Mode,
56-
sys: attr.Sys,
57-
}
58-
}
59-
6046
func NewFileInfoForCreate(path string, mode os.FileMode) FileInfo {
6147
return FileInfo{
6248
path: path,
@@ -300,6 +286,7 @@ func (f *File) Readdirnames(n int) ([]string, error) {
300286
ctx := meta.NewEmptyContext()
301287
entries, err := f.fs.vfs.ReadDir(ctx, f.inode, f.fh, 0)
302288
if utils.IsError(err) {
289+
log.Errorf("Readdirnames inode[%d] fh[%d]", f.inode, f.fh)
303290
return []string{}, err
304291
}
305292
dirNames := make([]string, len(entries))

pkg/fs/client/fs/file_test.go

+47
Original file line numberDiff line numberDiff line change
@@ -731,3 +731,50 @@ func TestPathCacheAndRename(t *testing.T) {
731731
assert.Nil(t, err)
732732
assert.Equal(t, 3, len(dirs))
733733
}
734+
735+
func TestFile_Readdirnames(t *testing.T) {
736+
type fields struct {
737+
inode vfs.Ino
738+
fh uint64
739+
attr FileInfo
740+
readOffset int64
741+
writeOffset int64
742+
fs *FileSystem
743+
}
744+
type args struct {
745+
n int
746+
}
747+
tests := []struct {
748+
name string
749+
fields fields
750+
args args
751+
want []string
752+
wantErr bool
753+
}{
754+
{
755+
name: "want readdir err",
756+
fields: fields{
757+
fs: &FileSystem{
758+
vfs: &vfs.VFS{},
759+
},
760+
},
761+
want: []string{},
762+
wantErr: true,
763+
},
764+
}
765+
for _, tt := range tests {
766+
t.Run(tt.name, func(t *testing.T) {
767+
f := &File{
768+
inode: tt.fields.inode,
769+
fh: tt.fields.fh,
770+
attr: tt.fields.attr,
771+
readOffset: tt.fields.readOffset,
772+
writeOffset: tt.fields.writeOffset,
773+
fs: tt.fields.fs,
774+
}
775+
if _, err := f.Readdirnames(tt.args.n); (err != nil) != tt.wantErr {
776+
t.Errorf("Readdirnames() error = %v, wantErr %v", err, tt.wantErr)
777+
}
778+
})
779+
}
780+
}

pkg/fs/client/fs/fs.go

+1
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ func (fs *FileSystem) Open(path_ string) (*File, error) {
9999
flags := uint32(syscall.O_RDONLY)
100100
ctx := meta.NewEmptyContext()
101101
attr, ino, sysErr := fs.lookup(ctx, path_, true)
102+
log.Debugf("fs client looup inode[%v] path[%s]", ino, path_)
102103
if utils.IsError(sysErr) {
103104
return nil, sysErr
104105
}

pkg/fs/client/fs/pfs_client.go

+2-6
Original file line numberDiff line numberDiff line change
@@ -227,24 +227,20 @@ func (c *PFSClient) removeAll(path string) error {
227227
return &os.PathError{Op: "RemoveAll", Path: path, Err: syscall.EINVAL}
228228
}
229229

230-
// Simple case: if Remove works, we're done.
231-
err := c.Remove(path)
232-
if err == nil || os.IsNotExist(err) {
233-
return nil
234-
}
235-
236230
parent, err := c.pfs.Open(path)
237231
if os.IsNotExist(err) {
238232
// If parent does not exist, base cannot exist. Fail silently
239233
return nil
240234
}
241235
if err != nil {
236+
log.Errorf("Open[%s] failed: %v", path, err)
242237
return err
243238
}
244239
defer parent.Close()
245240

246241
dirs, err := parent.Readdirnames(-1)
247242
if err != nil {
243+
log.Errorf("Readdirnames failed: %v", err)
248244
return err
249245
}
250246
for _, dir := range dirs {

pkg/fs/client/meta/meta_kv.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -598,7 +598,7 @@ func (m *kvMeta) Access(ctx *Context, inode Ino, mask uint32, attr *Attr) syscal
598598
}
599599

600600
func (m *kvMeta) Lookup(ctx *Context, parent Ino, name string) (Ino, *Attr, syscall.Errno) {
601-
log.Debugf("kv meta parent Ino[%v] name [%s]", parent, name)
601+
log.Debugf("kv meta Lookup parent Ino[%v] name [%s]", parent, name)
602602
// todo:: add "." and ".."
603603
entry, err := m.get(m.entryKey(parent, name))
604604
if err != nil {
@@ -626,7 +626,7 @@ func (m *kvMeta) Lookup(ctx *Context, parent Ino, name string) (Ino, *Attr, sysc
626626
info, err := ufs_.GetAttr(path)
627627
now := time.Now()
628628
if err != nil {
629-
log.Debugf("[vfs-lookup] GetAttr failed: %v with path[%s] name[%s] and absolutePath[%s]", err, path, name, absolutePath)
629+
log.Debugf("[vfs-lookup] Lookup GetAttr failed: %v with path[%s] name[%s] and absolutePath[%s]", err, path, name, absolutePath)
630630
return err
631631
}
632632
if isLink {

0 commit comments

Comments
 (0)