Skip to content

Commit

Permalink
run test
Browse files Browse the repository at this point in the history
  • Loading branch information
gojoy committed May 8, 2018
1 parent 2b78fb0 commit 66f2c59
Show file tree
Hide file tree
Showing 12 changed files with 279 additions and 269 deletions.
2 changes: 1 addition & 1 deletion api/grpc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ func (s *apiServer) PreMigration(ctx context.Context, r *types.PreMigrationReque
e.Vol = append(e.Vol, struct {
Src, Dst string
IsWrite bool
}{Src:v.Src , Dst:v.Dst , IsWrite:v.Iswrite })
}{Src:v.Src, Dst:v.Dst, IsWrite:v.Iswrite})
}
s.sv.SendTask(e)
if err := <-e.ErrorCh(); err != nil {
Expand Down
38 changes: 18 additions & 20 deletions supervisor/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,11 @@ func (t *MigrationTask) startMigrationTask(c *containerInfo) error {
var (
e = make(chan error)
err error
st time.Time
st time.Time
)
start := time.Now()
log.Printf("start migration task at %v\n", start.String())



log.Println("new local")
l, err := migration.NewLocalMigration(c.container)
if err != nil {
Expand All @@ -157,12 +155,12 @@ func (t *MigrationTask) startMigrationTask(c *containerInfo) error {
}

log.Println("test getmem!")
mem,err:=l.GetContainerMem()
if err!=nil {
mem, err := l.GetContainerMem()
if err != nil {
log.Println(err)
return err
}
log.Printf("mem is %v\n",mem)
log.Printf("mem is %v\n", mem)
//panic("test finish!")

log.Println("new remote")
Expand All @@ -176,26 +174,26 @@ func (t *MigrationTask) startMigrationTask(c *containerInfo) error {
go r.PreLoadImage(e, l.Imagedir)

log.Println("copy readonly to remote")
if err=l.CopyReadVolToRemote(r);err!=nil {
if err = l.CopyReadVolToRemote(r); err != nil {
log.Println(err)
return err
}

log.Println("start watch write vol")
vwather,err:=l.Watchwritevol()
if err!=nil {
vwather, err := l.Watchwritevol()
if err != nil {
log.Println(err)
return err
}

log.Println("copy write vol to remote")
if err=l.CopyWriteVolToRemote(r);err!=nil {
if err = l.CopyWriteVolToRemote(r); err != nil {
log.Println(err)
return err
}

log.Println("start precopy mem")
time.Sleep(10*time.Second)
time.Sleep(20 * time.Second)

//TODO 将hostpath的目录nfs到远程挂载 准备在本机的工作
log.Println("start nfs hostpath")
Expand Down Expand Up @@ -226,33 +224,33 @@ func (t *MigrationTask) startMigrationTask(c *containerInfo) error {
}

log.Println("get stable filelist")
stablemap,err:=l.Getstablefiles(vwather)
if err!=nil {
stablemap, err := l.Getstablefiles(vwather)
if err != nil {
log.Println(err)
return err
}

log.Println("save openfile json")
if err=l.SaveOpenFile();err!=nil {
if err = l.SaveOpenFile(); err != nil {
log.Println(err)
return err
}

log.Println("fdsync files!")
st=time.Now()
if err=l.SyncWriteFd(r,stablemap);err!=nil {
st = time.Now()
if err = l.SyncWriteFd(r, stablemap); err != nil {
log.Println(err)
return err
}
log.Printf("fdsync time:%v\n",time.Since(st))
log.Printf("fdsync time:%v\n", time.Since(st))

log.Println("directrsync!")
st=time.Now()
if err=l.DirectRsync(r);err!=nil {
st = time.Now()
if err = l.DirectRsync(r); err != nil {
log.Println(err)
return err
}
log.Printf("direct time:%v\n",time.Since(st))
log.Printf("direct time:%v\n", time.Since(st))

panic("test done!")

Expand Down
38 changes: 19 additions & 19 deletions supervisor/migration/containerUtils.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,51 +501,51 @@ func GetMotifyFiles(path string, ctx context.Context, res motifyvols) error {

for {
select {
case events:=<-w.Events:
if _,ok:=res[events.Name];!ok {
res[events.Name]=events.Op.String()
case events := <-w.Events:
if _, ok := res[events.Name]; !ok {
res[events.Name] = events.Op.String()
}
case <-ctx.Done():

goto END
}
}
END:
log.Printf("end monitor %v\n",path)
END:
log.Printf("end monitor %v\n", path)
return nil
}

func getmap(files []string) map[string]bool {
func getmap(files []string) map[string]bool {
var (
res=make(map[string]bool)
res = make(map[string]bool)
)
for _,v:=range files {
if _,ok:=res[v];!ok {
res[v]=true
for _, v := range files {
if _, ok := res[v]; !ok {
res[v] = true
}
}
if _,ok:=res["ib_logfile1"];!ok {
if _,ok:=res["ib_logfile0"];ok {
delete(res,"ib_logfile0")

if _, ok := res["ib_logfile1"]; !ok {
if _, ok := res["ib_logfile0"]; ok {
delete(res, "ib_logfile0")
}
}

return res
}

func getstablerelatepath(files []string,vol Volumes) []string {
func getstablerelatepath(files []string, vol Volumes) []string {
var (
res=make([]string,0)
res = make([]string, 0)
)
if len(files)==0 {
if len(files) == 0 {
panic("files len is 0")
}
for _,v:=range files {
for _, v := range files {
right := strings.TrimPrefix(v, vol.src)
if len(right) != len(v) {
res = append(res, right[1:])
}
}
return res
}
}
57 changes: 36 additions & 21 deletions supervisor/migration/fdsSync.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ import (
"os"
"os/exec"
"strings"
"errors"
"sync"
"time"
)



//首先把数据卷拷贝到对应的远程upper目录,然后根据crit x fds文件列表,同步这些文件
//在本地监控数据卷,生成map 所有变更都存在其中,然后把上次同步过的文件从map中删除,
// 之后在目的主机的upper目录,文件只有在map中,表示以及不是最新版本,就删除
Expand All @@ -25,7 +24,7 @@ type allpath [2]string
type volpath []allpath

// remoteVolumesDirs:代表每个write数据卷在目的主机的拷贝目录
func fdsSync(checkpointDir string, remoteVolumesDirs string, vols Volumes, ip string,smap map[string]bool) error {
func fdsSync(checkpointDir string, remoteVolumesDirs string, vols Volumes, ip string, smap map[string]bool) error {
files, err := getFiles(checkpointDir)
if err != nil {
log.Println(err)
Expand All @@ -37,9 +36,9 @@ func fdsSync(checkpointDir string, remoteVolumesDirs string, vols Volumes, ip st
return err
}

log.Printf("before delete,len is %v,syncfiles is %v",len(syncfiles),syncfiles)
syncfiles=deletefromstablemap(syncfiles,smap)
log.Printf("now len is %v,syncfiles is %v\n",len(syncfiles),syncfiles)
log.Printf("before delete,len is %v,syncfiles is %v", len(syncfiles), syncfiles)
syncfiles = deletefromstablemap(syncfiles, smap)
log.Printf("now len is %v,syncfiles is %v\n", len(syncfiles), syncfiles)

if err = fastCopy(syncfiles, ip, remoteVolumesDirs, vols); err != nil {
log.Println(err)
Expand All @@ -50,17 +49,17 @@ func fdsSync(checkpointDir string, remoteVolumesDirs string, vols Volumes, ip st

func deletefromstablemap(files []string, stmap map[string]bool) []string {

if len(files)==0 {
if len(files) == 0 {
panic("files is 0")
}
if len(stmap)==0 {
if len(stmap) == 0 {
log.Println("stmap is 0")
}
for i:=0;i<len(files);i++ {
if stmap[files[i]]==true {
log.Printf("delete %v\n",files[i])
files[i]=files[len(files)-1]
files=files[:len(files)-1]
for i := 0; i < len(files); i++ {
if stmap[files[i]] == true {
log.Printf("delete %v\n", files[i])
files[i] = files[len(files)-1]
files = files[:len(files)-1]
i--
}
}
Expand All @@ -75,7 +74,7 @@ func SaveOpenFile(checkdir, path string, vol Volumes) error {
log.Println(err)
return err
}
data:= syncNeedFiles(files, vol)
data := syncNeedFiles(files, vol)
if err != nil {
log.Println(err)
return err
Expand Down Expand Up @@ -148,7 +147,7 @@ func getFiles(path string) ([]string, error) {

//找到再数据卷中的文件
func syncNeedFiles(files []string, vol Volumes) []string {
log.Printf("dst is %v\n",vol.dst)
log.Printf("dst is %v\n", vol.dst)
var (
res = make([]string, 0)
)
Expand Down Expand Up @@ -176,22 +175,38 @@ func syncNeedFiles(files []string, vol Volumes) []string {
func fastCopy(files []string, ip string, remote string, vol Volumes) error {
var (
err error
wg sync.WaitGroup
st time.Time
)
if len(files) == 0 {
log.Println("files len is 0")
return errors.New("files nil")
return nil
}

//log.Printf("f is %v\n",files)
if err = os.Chdir(vol.src); err != nil {
log.Println(err)
}

st = time.Now()

for _, v := range files {
log.Printf("fdsync:%v\n",v)
if err = RemoteCopyFileRsync(v, remote, ip); err != nil {
log.Println(err)
}
wg.Add(1)

go func(file string) {
defer wg.Done()
log.Printf("fdsync:%v\n", file)
if err = RemoteCopyFileRsync(file, remote, ip); err != nil {
log.Println(err)
}
}(v)

}

wg.Wait()

log.Printf("fastcopy time is %v\n", time.Since(st))
return nil
}

Expand All @@ -211,7 +226,7 @@ func RemoteCopyFileRsync(local, remote string, ip string) error {
//log.Printf("l is %v,r is %v,args is %v\n",local,remote,args)

cmd := exec.Command("rsync", args...)
//log.Println(cmd.Args)
log.Println(cmd.Args)

if out, err := cmd.CombinedOutput(); err != nil {
log.Printf("rsync error:%v,out:%v\n", err, string(out))
Expand Down
Loading

0 comments on commit 66f2c59

Please sign in to comment.