diff --git a/api/grpc/server/server.go b/api/grpc/server/server.go index 75c4e02..e5f8033 100644 --- a/api/grpc/server/server.go +++ b/api/grpc/server/server.go @@ -501,17 +501,17 @@ func (s *apiServer) Stats(ctx context.Context, r *types.StatsRequest) (*types.St return t, nil } -func (s *apiServer)Migration(ctx context.Context, r *types.MigrationRequest) (*types.MigrationResponse, error) { - e:=&supervisor.MigrationTask{} +func (s *apiServer) Migration(ctx context.Context, r *types.MigrationRequest) (*types.MigrationResponse, error) { + e := &supervisor.MigrationTask{} e.WithContext(ctx) - e.Id=r.Id - e.TargetMachine= struct { + e.Id = r.Id + e.TargetMachine = struct { Host string Port uint32 - }{Host:r.Targetmachine.Ip , Port:r.Targetmachine.Port } + }{Host: r.Targetmachine.Ip, Port: r.Targetmachine.Port} s.sv.SendTask(e) - if err:=<-e.ErrorCh();err!=nil { - return nil,err + if err := <-e.ErrorCh(); err != nil { + return nil, err } - return &types.MigrationResponse{},nil -} \ No newline at end of file + return &types.MigrationResponse{}, nil +} diff --git a/containerd-shim/main.go b/containerd-shim/main.go index 6b59f4e..46a2430 100644 --- a/containerd-shim/main.go +++ b/containerd-shim/main.go @@ -9,8 +9,8 @@ import ( "runtime" "syscall" - "github.com/containerd/containerd/osutils" "github.com/containerd/console" + "github.com/containerd/containerd/osutils" ) func writeMessage(f *os.File, level string, err error) { diff --git a/containerd-shim/process.go b/containerd-shim/process.go index a9cdd4b..5101d3e 100644 --- a/containerd-shim/process.go +++ b/containerd-shim/process.go @@ -15,9 +15,9 @@ import ( "syscall" "time" + "github.com/containerd/console" "github.com/containerd/containerd/osutils" "github.com/containerd/containerd/specs" - "github.com/containerd/console" runc "github.com/containerd/go-runc" ) diff --git a/containerd-shim/process_linux.go b/containerd-shim/process_linux.go index eed0c62..82dc458 100644 --- a/containerd-shim/process_linux.go +++ b/containerd-shim/process_linux.go @@ -9,8 +9,8 @@ import ( "syscall" "time" - "github.com/containerd/containerd/osutils" "github.com/containerd/console" + "github.com/containerd/containerd/osutils" runc "github.com/containerd/go-runc" "github.com/tonistiigi/fifo" "golang.org/x/net/context" diff --git a/ctr/checkpoint_linux.go b/ctr/checkpoint_linux.go index 5b119ff..4aaaeaf 100644 --- a/ctr/checkpoint_linux.go +++ b/ctr/checkpoint_linux.go @@ -5,8 +5,8 @@ import ( "os" "text/tabwriter" - "github.com/urfave/cli" "github.com/containerd/containerd/api/grpc/types" + "github.com/urfave/cli" netcontext "golang.org/x/net/context" ) diff --git a/ctr/events.go b/ctr/events.go index 6bc33f6..f7eeefa 100644 --- a/ctr/events.go +++ b/ctr/events.go @@ -6,9 +6,9 @@ import ( "text/tabwriter" "time" - "github.com/urfave/cli" "github.com/containerd/containerd/api/grpc/types" "github.com/golang/protobuf/ptypes" + "github.com/urfave/cli" netcontext "golang.org/x/net/context" ) diff --git a/ctr/migration.go b/ctr/migration.go index 138682c..5eb5605 100644 --- a/ctr/migration.go +++ b/ctr/migration.go @@ -1,46 +1,46 @@ package main import ( - "github.com/urfave/cli" "fmt" - "os" - netcontext "golang.org/x/net/context" "github.com/containerd/containerd/api/grpc/types" + "github.com/urfave/cli" + netcontext "golang.org/x/net/context" + "os" ) -var migrationCommand=cli.Command{ - Name:"migration", - Usage:"migration containers", - ArgsUsage:" || mysql 192.168.18.2 9001", - Flags:[]cli.Flag{ +var migrationCommand = cli.Command{ + Name: "migration", + Usage: "migration containers", + ArgsUsage: " || mysql 192.168.18.2 9001", + Flags: []cli.Flag{ cli.StringFlag{ - Name:"host,H", - Usage:"host ip address", + Name: "host,H", + Usage: "host ip address", }, cli.UintFlag{ - Name:"port,p", - Usage:"host port", + Name: "port,p", + Usage: "host port", }, }, Action: func(context *cli.Context) error { - if err:=checkArgs(context,1);err!=nil { + if err := checkArgs(context, 1); err != nil { return err } - id:=context.Args().First() - ip:=context.String("host") - port:=context.Uint("port") - fmt.Printf("id:%v, ip %v, port:%v\n",id,ip,port) - c:=getClient(context) - machine:=&types.TargetMachine{ - Ip:ip, - Port:uint32(port), + id := context.Args().First() + ip := context.String("host") + port := context.Uint("port") + fmt.Printf("id:%v, ip %v, port:%v\n", id, ip, port) + c := getClient(context) + machine := &types.TargetMachine{ + Ip: ip, + Port: uint32(port), } - _,err:=c.Migration(netcontext.Background(),&types.MigrationRequest{ - Id:id, - Targetmachine:machine, + _, err := c.Migration(netcontext.Background(), &types.MigrationRequest{ + Id: id, + Targetmachine: machine, }) - if err!=nil { + if err != nil { fmt.Println(err) return err } @@ -51,12 +51,12 @@ var migrationCommand=cli.Command{ func checkArgs(context *cli.Context, expected int) error { var err error - cmdName:=context.Command.Name - fmt.Printf("nums is %v\n",context.NArg()) - if context.NArg()!=expected { + cmdName := context.Command.Name + fmt.Printf("nums is %v\n", context.NArg()) + if context.NArg() != expected { err = fmt.Errorf("%s: %q requires exactly %d argument(s)", os.Args[0], cmdName, expected) } - if err!=nil { + if err != nil { fmt.Printf("Incorrect Usage.\n\n") return err } diff --git a/supervisor/migration.go b/supervisor/migration.go index 9be7871..92e6e5c 100644 --- a/supervisor/migration.go +++ b/supervisor/migration.go @@ -1,21 +1,21 @@ package supervisor import ( - "time" - "github.com/sirupsen/logrus" "errors" "fmt" "github.com/containerd/containerd/runtime" + "github.com/containerd/containerd/supervisor/migration" + "github.com/sirupsen/logrus" "net" "strings" - "github.com/containerd/containerd/supervisor/migration" + "time" + ) type MigrationTask struct { baseTask TargetMachine Id string - } type TargetMachine struct { @@ -40,22 +40,21 @@ type TargetMachine struct { // CheckpointDir string //} - func (s *Supervisor) StartMigration(t *MigrationTask) error { - startTime:=time.Now() + startTime := time.Now() fmt.Println("begin Migration") - logrus.Printf("startMigration %v\n",startTime) - c,err:=t.checkContainers(s) - if err!=nil { + logrus.Printf("startMigration %v\n", startTime) + c, err := t.checkContainers(s) + if err != nil { fmt.Println(err) return err } - if err=t.checkTargetMachine(s);err!=nil { + if err = t.checkTargetMachine(s); err != nil { return err } - if err=t.startMigration(c,s);err!=nil { + if err = t.startMigration(c, s); err != nil { return err } //logrus.Printf("container %v\n",i.container.ID()) @@ -63,28 +62,28 @@ func (s *Supervisor) StartMigration(t *MigrationTask) error { return nil } -func (t *MigrationTask)checkContainers(s *Supervisor) (*containerInfo,error) { - i,ok:=s.containers[t.Id] +func (t *MigrationTask) checkContainers(s *Supervisor) (*containerInfo, error) { + i, ok := s.containers[t.Id] if !ok { - return nil,MigrationWriteErr(fmt.Sprintf("Container %v Not Exist\n",t.Id)) + return nil, MigrationWriteErr(fmt.Sprintf("Container %v Not Exist\n", t.Id)) } - if i.container.State()!=runtime.Running { - return nil,MigrationWriteErr("Container not running") + if i.container.State() != runtime.Running { + return nil, MigrationWriteErr("Container not running") } - return i,nil + return i, nil } func (t *MigrationTask) checkTargetMachine(s *Supervisor) error { - ip:=t.Host - addrs,err:=net.InterfaceAddrs() - if err!=nil { + ip := t.Host + addrs, err := net.InterfaceAddrs() + if err != nil { return MigrationWriteErr(err.Error()) } - for _,addr:=range addrs { + for _, addr := range addrs { - ips:=strings.SplitN(addr.String(),"/",2) - fmt.Printf("network:%v,string:%v,splite:%v\n",addr.Network(),addr.String(),ips[0]) - if ips[0]==ip { + ips := strings.SplitN(addr.String(), "/", 2) + fmt.Printf("network:%v,string:%v,splite:%v\n", addr.Network(), addr.String(), ips[0]) + if ips[0] == ip { return MigrationWriteErr("Cannot Migration Localhost Machine") } } @@ -92,38 +91,54 @@ func (t *MigrationTask) checkTargetMachine(s *Supervisor) error { } func MigrationWriteErr(w string) error { - return errors.New(fmt.Sprintf("miration failed:%v",w)) + return errors.New(fmt.Sprintf("Miration Failed:%v", w)) } -func (t *MigrationTask) startCopyImage(c *containerInfo) error { - image,err:=migration.NewImage(c.container) - if err!=nil { +func (t *MigrationTask) startCopyImage(c *containerInfo) error { + image, err := migration.NewImage(c.container) + if err != nil { return err } image.Path() return nil } -func (t *MigrationTask) startMigration(c *containerInfo,s *Supervisor) error { - l,err:=migration.NewLocalMigration(c.container) +func (t *MigrationTask) startMigration(c *containerInfo, s *Supervisor) error { + var ( + e chan error + err error + ) + + l, err := migration.NewLocalMigration(c.container) + if err != nil { + return MigrationWriteErr(err.Error()) + } + r,err:=migration.NewRemoteMigration(t.Host,t.Id,t.Port) if err!=nil { - return err + return MigrationWriteErr(err.Error()) } - if err=l.DoCheckpoint();err!=nil { + + go r.PreLoadImage(e,l.Imagedir) + + if err = l.DoCheckpoint(); err != nil { return err } - if err=l.DoneCheckpoint();err!=nil { + if err = l.DoneCheckpoint(); err != nil { return err } + if err=<-e;err!=nil { + return MigrationWriteErr(err.Error()) + } //r,_:=migration.NewRemoteMigration(t,l) - - logrus.Println("begin restore") + if err=r.DoRestore();err!=nil { + return MigrationWriteErr(err.Error()) + } + logrus.Println("done restore") //go r.Dorestore(s) return nil } - // ////创建本地dump的目录 //func newLocalMigration(c *containerInfo) (*localMigration, error) { @@ -192,4 +207,4 @@ func (t *MigrationTask) startMigration(c *containerInfo,s *Supervisor) error { // //fmt.Println(re.Container.Status()) // fmt.Println("after restore") // return nil -//} \ No newline at end of file +//} diff --git a/supervisor/migration/containerUtils.go b/supervisor/migration/containerUtils.go index 2a06fed..9268630 100644 --- a/supervisor/migration/containerUtils.go +++ b/supervisor/migration/containerUtils.go @@ -1,24 +1,24 @@ package migration import ( + "encoding/json" + "errors" + "fmt" + "github.com/containerd/containerd/api/grpc/types" "github.com/containerd/containerd/runtime" "github.com/containerd/containerd/specs" - "os" - "path/filepath" - "encoding/json" + "github.com/pkg/sftp" + "golang.org/x/crypto/ssh" "google.golang.org/grpc" - "github.com/containerd/containerd/api/grpc/types" "google.golang.org/grpc/grpclog" + "io" "io/ioutil" "log" - "time" "net" - "fmt" - "github.com/pkg/sftp" - "golang.org/x/crypto/ssh" - "errors" - "io" + "os" + "path/filepath" "strings" + "time" ) var ( @@ -26,77 +26,75 @@ var ( ) func init() { - glog=log.New(os.Stderr,"",log.Lshortfile) + glog = log.New(os.Stderr, "", log.Lshortfile) } -func LoadSpec(c runtime.Container) (*specs.Spec,error) { +func LoadSpec(c runtime.Container) (*specs.Spec, error) { var spec specs.Spec - f,err:=os.Open(filepath.Join(c.Path(),"config.json")) - if err!=nil { - return nil,err + f, err := os.Open(filepath.Join(c.Path(), "config.json")) + if err != nil { + return nil, err } defer f.Close() if err := json.NewDecoder(f).Decode(&spec); err != nil { return nil, err } - return &spec,nil + return &spec, nil } -func GetClient(ip string,port uint32) (types.APIClient,error) { - bindSpec:=fmt.Sprintf("tcp://%v:%d",ip,port) +func GetClient(ip string, port uint32) (types.APIClient, error) { + bindSpec := fmt.Sprintf("tcp://%v:%d", ip, port) grpclog.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags)) - dialOpts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithTimeout(1*time.Second)} - dialOpts=append(dialOpts, + dialOpts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithTimeout(1 * time.Second)} + dialOpts = append(dialOpts, grpc.WithDialer(func(s string, duration time.Duration) (net.Conn, error) { - return net.DialTimeout("tcp",fmt.Sprintf("%v:%d",ip,port),duration) + return net.DialTimeout("tcp", fmt.Sprintf("%v:%d", ip, port), duration) }, )) conn, err := grpc.Dial(bindSpec, dialOpts...) - if err!=nil { - return nil,err + if err != nil { + return nil, err } - return types.NewAPIClient(conn),nil + return types.NewAPIClient(conn), nil } +func GetSftpClient(user, passwd, host string, port uint32) (*sftp.Client, error) { -func GetSftpClient(user,passwd,host string,port uint32) (*sftp.Client,error) { - - auth:=make([]ssh.AuthMethod,0) - auth=append(auth,ssh.Password(passwd)) - addrConfig:=&ssh.ClientConfig{ - User:user, - Auth:auth, - Timeout:10*time.Second, - HostKeyCallback:ssh.InsecureIgnoreHostKey(), + auth := make([]ssh.AuthMethod, 0) + auth = append(auth, ssh.Password(passwd)) + addrConfig := &ssh.ClientConfig{ + User: user, + Auth: auth, + Timeout: 10 * time.Second, + HostKeyCallback: ssh.InsecureIgnoreHostKey(), } - addr:=fmt.Sprintf("%s:%d",host,port) + addr := fmt.Sprintf("%s:%d", host, port) - sshClient,err:=ssh.Dial("tcp",addr,addrConfig) - if err!=nil { - return nil,err + sshClient, err := ssh.Dial("tcp", addr, addrConfig) + if err != nil { + return nil, err } - - sftpClient,err:=sftp.NewClient(sshClient) - if err!=nil { - return nil,err + sftpClient, err := sftp.NewClient(sshClient) + if err != nil { + return nil, err } - return sftpClient,nil + return sftpClient, nil } //通过本地目录得到远程目录 把目录路径的docker变为migration func PathToRemote(s string) (string, error) { - if len(s)<15 { - return "",errors.New("local Path illegal\n") + if len(s) < 15 { + return "", errors.New("local Path illegal\n") } - ss:=[]byte(s) - head:=ss[:9] - tail:=ss[15:] - res:=string(head)+"migration"+string(tail) + ss := []byte(s) + head := ss[:9] + tail := ss[15:] + res := string(head) + "migration" + string(tail) //fmt.Println("res is:",res) - return res,nil + return res, nil } @@ -106,45 +104,45 @@ func RemoteCopyDir(localPath, remotePath string, c *sftp.Client) error { err error ) - if err=RemoteMkdirAll(remotePath,c);err!=nil { + if err = RemoteMkdirAll(remotePath, c); err != nil { return err } - if err=os.Chdir(localPath);err!=nil { + if err = os.Chdir(localPath); err != nil { return err } - buf:=make([]byte,512) + buf := make([]byte, 512) - err=filepath.Walk(".", func(path string, info os.FileInfo, err error) error { - if err!=nil { + err = filepath.Walk(".", func(path string, info os.FileInfo, err error) error { + if err != nil { panic(err) return nil } if info.IsDir() { - rpath:=filepath.Join(remotePath,path) + rpath := filepath.Join(remotePath, path) - if err=c.Mkdir(rpath);err!=nil { + if err = c.Mkdir(rpath); err != nil { //panic(err) glog.Println(err) return err } - + } else { - dstf,err:=c.Create(filepath.Join(remotePath,path)) - if err!=nil { + dstf, err := c.Create(filepath.Join(remotePath, path)) + if err != nil { return err } defer dstf.Close() - srcf,err:=os.Open(filepath.Join(localPath,path)) - if err!=nil { + srcf, err := os.Open(filepath.Join(localPath, path)) + if err != nil { return err } defer srcf.Close() - _,err=io.CopyBuffer(dstf,srcf,buf) - if err!=nil { + _, err = io.CopyBuffer(dstf, srcf, buf) + if err != nil { //panic(err) glog.Println(err) return err @@ -152,7 +150,7 @@ func RemoteCopyDir(localPath, remotePath string, c *sftp.Client) error { } return nil }) - if err!=nil { + if err != nil { glog.Println(err) return err } @@ -161,16 +159,16 @@ func RemoteCopyDir(localPath, remotePath string, c *sftp.Client) error { //创建所有的父文件夹,便于后续的传输 func RemoteMkdirAll(rpath string, c *sftp.Client) error { - ps:=strings.SplitAfter(rpath,"/") - root:="" - for _,v:=range ps[:len(ps)-1] { + ps := strings.SplitAfter(rpath, "/") + root := "" + for _, v := range ps[:len(ps)-1] { - root=root+v + root = root + v - if _,err:=c.Stat(root);err!=nil { - if err==os.ErrNotExist { + if _, err := c.Stat(root); err != nil { + if err == os.ErrNotExist { //glog.Printf("dir %v not exist,we create it\n",root) - if err:=c.Mkdir(root);err!=nil { + if err := c.Mkdir(root); err != nil { return err } } else { diff --git a/supervisor/migration/image.go b/supervisor/migration/image.go index 0cb4b08..6139eee 100644 --- a/supervisor/migration/image.go +++ b/supervisor/migration/image.go @@ -1,117 +1,114 @@ package migration - import ( - "github.com/containerd/containerd/runtime" "errors" - "strings" - "path/filepath" - "os" - "io/ioutil" + "github.com/containerd/containerd/runtime" "github.com/containerd/containerd/specs" "github.com/pkg/sftp" + "io/ioutil" + "os" + "path/filepath" + "strings" ) - -const Driver = "overlay2" -const DriverDir = "/var/lib/docker/overlay2" - - +const Driver = "overlay2" +const DriverDir = "/var/lib/docker/overlay2" type Image struct { runtime.Container - spce specs.Spec - bundle string + spce specs.Spec + bundle string mountType string - lowerRO []string - upperRD string + lowerRO []string + upperRD string } -func NewImage(c runtime.Container) (*Image,error) { - spec,err:=LoadSpec(c) - if err!=nil { - return nil,err +// 解析overlay2镜像的lower层(只读)和upper层(读写) +func NewImage(c runtime.Container) (*Image, error) { + + spec, err := LoadSpec(c) + if err != nil { + return nil, err } if spec.Root.Readonly { - return nil,errors.New("Cannot Migration Readonly Containers\n") + return nil, errors.New("Cannot Migration Readonly Containers\n") } - path:=spec.Root.Path - if !strings.Contains(path,Driver) { - return nil,errors.New("Only Support Overlay2\n") + path := spec.Root.Path + if !strings.Contains(path, Driver) { + return nil, errors.New("Only Support Overlay2\n") } - tmp:=strings.Split(path,"/") - imageid:=tmp[len(tmp)-2] - lower,err:=GetDir(imageid) - if err!=nil { - return nil,err + tmp := strings.Split(path, "/") + imageid := tmp[len(tmp)-2] + lower, err := GetDir(imageid) + if err != nil { + return nil, err } - s,err:=LoadSpec(c) - if err!=nil { - return nil,err + s, err := LoadSpec(c) + if err != nil { + return nil, err } - i:=&Image{} - i.spce=*s - i.upperRD=filepath.Join(DriverDir,imageid,"diff") - i.lowerRO=lower - i.Container=c - i.bundle=c.Path() - i.mountType=Driver - return i,nil + i := &Image{} + i.spce = *s + i.upperRD = filepath.Join(DriverDir, imageid, "diff") + i.lowerRO = lower + i.Container = c + i.bundle = c.Path() + i.mountType = Driver + return i, nil } - func GetDir(imageID string) ([]string, error) { - fp,err:=os.Open(filepath.Join(DriverDir,imageID,"lower")) - if err!=nil { - return nil,err + fp, err := os.Open(filepath.Join(DriverDir, imageID, "lower")) + if err != nil { + return nil, err } defer fp.Close() - lowerContext,err:=ioutil.ReadAll(fp) - if err!=nil { - return nil,err + lowerContext, err := ioutil.ReadAll(fp) + if err != nil { + return nil, err } - nowdir,err:=os.Getwd() - if err!=nil { - return nil,err + nowdir, err := os.Getwd() + if err != nil { + return nil, err } - os.Chdir(filepath.Join(DriverDir,"l")) - res:=make([]string,0) - lowers:=strings.Split(string(lowerContext),":") - - for _,v:=range lowers { - lpath,err:=os.Readlink(filepath.Join(DriverDir,v)) - if err!=nil { - return nil,err + os.Chdir(filepath.Join(DriverDir, "l")) + res := make([]string, 0) + lowers := strings.Split(string(lowerContext), ":") + + for _, v := range lowers { + lpath, err := os.Readlink(filepath.Join(DriverDir, v)) + if err != nil { + return nil, err } - abs,err:=filepath.Abs(lpath) - if err!=nil { - return nil,err + abs, err := filepath.Abs(lpath) + if err != nil { + return nil, err } - res=append(res,abs) + res = append(res, abs) } os.Chdir(nowdir) - return res,nil + return res, nil } func (i *Image) PreCopyImage(c *sftp.Client) error { - for _,v:=range i.lowerRO { + for _, v := range i.lowerRO { - remotePath,err:=PathToRemote(v) - if err!=nil { + remotePath, err := PathToRemote(v) + if err != nil { return err } - glog.Printf("v :%v,r %v\n",v,remotePath) - _,err=c.Stat(remotePath) - if err!=nil { + glog.Printf("v :%v,r %v\n", v, remotePath) + _, err = c.Stat(remotePath) + if err != nil { //TODO 远程不存在该文件,则传输过去 - if err==os.ErrNotExist { + if err == os.ErrNotExist { //fmt.Printf("begin copy %v to %v\n",v,remotePath) - if err=RemoteCopyDir(v,remotePath,c);err!=nil { + if err = RemoteCopyDir(v, remotePath, c); err != nil { return err } } else { @@ -123,5 +120,3 @@ func (i *Image) PreCopyImage(c *sftp.Client) error { } return nil } - - diff --git a/supervisor/migration/localMigration.go b/supervisor/migration/localMigration.go index 1efa8ab..35ef09b 100644 --- a/supervisor/migration/localMigration.go +++ b/supervisor/migration/localMigration.go @@ -2,63 +2,78 @@ package migration import ( "github.com/containerd/containerd/runtime" - "path/filepath" "os" + "path/filepath" //"github.com/containerd/containerd/supervisor" "errors" + "fmt" ) const MigrationDir = "/run/migration" -const DumpAll = "fullDump" +const DumpAll = "fullDump" type localMigration struct { runtime.Container - Rootfs string - Bundle string - CheckpointDir string - IsDump bool - image *Image + Rootfs string + Bundle string + CheckpointDir string + CheckpointName string + IsDump bool + Imagedir *Image } func NewLocalMigration(c runtime.Container) (*localMigration, error) { - l:=&localMigration{} - l.Bundle=c.Path() - l.Container=c - l.CheckpointDir=filepath.Join(MigrationDir,c.ID()) - l.IsDump=false + i, err := NewImage(c) + if err != nil { + return nil, err + } + + l := &localMigration{} + l.Bundle = c.Path() + l.Container = c + l.CheckpointDir = filepath.Join(MigrationDir, c.ID()) + l.IsDump = false + l.Imagedir = i + l.CheckpointName = DumpAll - if err:=os.MkdirAll(l.CheckpointDir,0666);err!=nil { - return nil,err + if err := os.MkdirAll(l.CheckpointDir, 0666); err != nil { + return nil, err } - return l,nil + return l, nil } +//本地进行checkpoint func (l *localMigration) DoCheckpoint() error { - doCheckpoint:=runtime.Checkpoint{ - Name:DumpAll, - Exit:false, - TCP:true, - Shell:true, - UnixSockets:true, - EmptyNS:[]string{"network"}, + doCheckpoint := runtime.Checkpoint{ + Name: l.CheckpointName, + Exit: false, + TCP: true, + Shell: true, + UnixSockets: true, + EmptyNS: []string{"network"}, } - return l.Checkpoint(doCheckpoint,l.CheckpointDir) + return l.Checkpoint(doCheckpoint, l.CheckpointDir) } -func (l *localMigration)DoneCheckpoint() error { +func (l *localMigration) DoneCheckpoint() error { if l.IsDump { return errors.New("recheckpoint") } - l.IsDump=true + l.IsDump = true return nil } -func (l *localMigration)loadImage(image *Image) error { - i,err:=NewImage(l.Container) - if err!=nil { + +//把本地的checkpoint文件夹拷贝到远程主机 +func (l *localMigration) CopyCheckPointToRemote(r *remoteMigration) error { + if r == nil { + return fmt.Errorf("Err: remote nil\n ") + } + + if err := RemoteCopyDir(l.CheckpointDir, r.CheckpointDir, r.sftpClient); err != nil { + glog.Println(err) return err } - l.image=i return nil -} \ No newline at end of file +} diff --git a/supervisor/migration/remoteMigration.go b/supervisor/migration/remoteMigration.go index f24712f..5851047 100644 --- a/supervisor/migration/remoteMigration.go +++ b/supervisor/migration/remoteMigration.go @@ -1,91 +1,138 @@ package migration import ( - //"github.com/containerd/containerd/supervisor" - //"github.com/sirupsen/logrus" - //"fmt" - netcontext "golang.org/x/net/context" - //"github.com/containerd/containerd/runtime" + "encoding/json" + "fmt" "github.com/containerd/containerd/api/grpc/types" + "github.com/containerd/containerd/specs" + "github.com/pkg/sftp" + "github.com/sirupsen/logrus" + netcontext "golang.org/x/net/context" + "os" "path/filepath" "time" - "github.com/sirupsen/logrus" - "github.com/pkg/sftp" + ) const STDIO = "/dev/null" -const RUNTIMR = "runc" -const LoginUser = "root" -const LoginPasswd = "123456" -const RemoteCheckpointDir = "/var/lib/migration/checkpoint" +const RUNTIMR = "runc" +const LoginUser = "root" +const LoginPasswd = "123456" +const RemoteCheckpointDir = "/var/lib/migration/checkpoint" // type remoteMigration struct { - Id string - Rootfs string - Bundle string + Id string + Rootfs string + Bundle string CheckpointDir string - ip string - port uint32 - clienApi types.APIClient - sftpClient *sftp.Client + CheckpointName string + ip string + port uint32 + clienApi types.APIClient + sftpClient *sftp.Client + spec specs.Spec } -func NewRemoteMigration(ip,id string,port uint32) (*remoteMigration,error) { - c,err:=GetClient(ip,port) - if err!=nil { - return nil,err +func NewRemoteMigration(ip, id string, port uint32) (*remoteMigration, error) { + c, err := GetClient(ip, port) + if err != nil { + return nil, err } - sc,err:=GetSftpClient(LoginUser,LoginPasswd,ip,port) - if err!=nil { - return nil,err + sc, err := GetSftpClient(LoginUser, LoginPasswd, ip, port) + if err != nil { + return nil, err } - r:=&remoteMigration{ - Id:id+"copy", - ip:ip, - port:port, - clienApi:c, - sftpClient:sc, + r := &remoteMigration{ + Id: id + "copy", + ip: ip, + CheckpointDir: RemoteCheckpointDir, + CheckpointName:DumpAll, + port: port, + clienApi: c, + sftpClient: sc, } - return r,nil + return r, nil } +//在远程主机进行恢复 func (r *remoteMigration) DoRestore() error { - bpath,err:=filepath.Abs(r.Bundle) - if err!=nil { + bpath, err := filepath.Abs(r.Bundle) + if err != nil { return nil } - req:=&types.CreateContainerRequest{ - Id:r.Id, - BundlePath:bpath, - Checkpoint:DumpAll, - CheckpointDir:r.CheckpointDir, - Stdin:STDIO, - Stdout:STDIO, - Stderr:STDIO, - Runtime:RUNTIMR, + req := &types.CreateContainerRequest{ + Id: r.Id, + BundlePath: bpath, + Checkpoint: DumpAll, + CheckpointDir: r.CheckpointDir, + Stdin: STDIO, + Stdout: STDIO, + Stderr: STDIO, + Runtime: RUNTIMR, } - if _,err=r.clienApi.CreateContainer(netcontext.Background(),req);err!=nil { - logrus.Printf("remote restore err:%v\n",err) + //runc create + if _, err = r.clienApi.CreateContainer(netcontext.Background(), req); err != nil { + logrus.Printf("remote restore err:%v\n", err) return err } - time.Sleep(2*time.Second) - if _,err=r.clienApi.UpdateProcess(netcontext.Background(),&types.UpdateProcessRequest{ - Id:r.Id, - Pid:"Init", - CloseStdin:true, - });err!=nil { + time.Sleep(2 * time.Second) + //runc start + if _, err = r.clienApi.UpdateProcess(netcontext.Background(), &types.UpdateProcessRequest{ + Id: r.Id, + Pid: "Init", + CloseStdin: true, + }); err != nil { return err } return nil } -func (r *remoteMigration) PreLoadImage(image *Image) error { +func (r *remoteMigration) PreLoadImage(e chan error,image *Image) { + + err:=image.PreCopyImage(r.sftpClient) + if err!=nil { + glog.Println(err) + } + e<-err + + +} + +//在远程主机创建对应的config.json文件 +func (r *remoteMigration) setSpec(l *localMigration) error { + if l == nil { + return fmt.Errorf("Err: local is nil\n") + } + rspec, err := LoadSpec(l.Container) + if err != nil { + glog.Println(err) + return err + } + rspec.Root.Path = r.Rootfs + rspec.Root.Readonly = false - return image.PreCopyImage(r.sftpClient) + if _, err := r.sftpClient.Stat(filepath.Join(r.Bundle, "config.json")); err != nil { + if err == os.ErrNotExist { + if specf, err := r.sftpClient.Create(filepath.Join(r.Bundle, "config.json")); err != nil { + glog.Println(err) + return err + } else { + if err = json.NewEncoder(specf).Encode(rspec); err != nil { + glog.Println(err) + return err + } + } + } else { + glog.Println(err) + return err + } + } + return fmt.Errorf("Remote Has Config.json\n") } + //func NewRemoteMigration(t *supervisor.MigrationTask,l *localMigration) (*remoteMigration,error) { // return &remoteMigration{ // Bundle:l.Bundle, diff --git a/supervisor/migration/utils_test.go b/supervisor/migration/utils_test.go index 735147c..8ba28f0 100644 --- a/supervisor/migration/utils_test.go +++ b/supervisor/migration/utils_test.go @@ -49,7 +49,6 @@ func TestImage_PreCopyImage(t *testing.T) { lowerRO: lower, } - c, err := GetSftpClient(LoginUser, LoginPasswd, "192.168.18.128", 22) if err != nil { t.Errorf("sftp err:%v\n", err) @@ -106,11 +105,11 @@ func TestRemoteCopyDir(t *testing.T) { func TestRemoteMkdirAll(t *testing.T) { c, err := GetSftpClient(LoginUser, LoginPasswd, "192.168.18.128", 22) - if err!=nil { + if err != nil { t.Error(err) return } - if err:=RemoteMkdirAll(testrpath,c);err!=nil { + if err := RemoteMkdirAll(testrpath, c); err != nil { t.Error(err) } } diff --git a/supervisor/supervisor.go b/supervisor/supervisor.go index 52e595c..81222de 100644 --- a/supervisor/supervisor.go +++ b/supervisor/supervisor.go @@ -415,7 +415,7 @@ func (s *Supervisor) handleTask(i Task) { case *OOMTask: err = s.oom(t) case *MigrationTask: - err=s.StartMigration(t) + err = s.StartMigration(t) default: err = ErrUnknownTask } diff --git a/vendor/github.com/go-check/check/benchmark.go b/vendor/github.com/go-check/check/benchmark.go index 46ea9dc..b2d3519 100644 --- a/vendor/github.com/go-check/check/benchmark.go +++ b/vendor/github.com/go-check/check/benchmark.go @@ -1,9 +1,9 @@ // Copyright (c) 2012 The Go Authors. All rights reserved. -// +// // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: -// +// // * Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above @@ -13,7 +13,7 @@ // * Neither the name of Google Inc. nor the names of its // contributors may be used to endorse or promote products derived from // this software without specific prior written permission. -// +// // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR diff --git a/vendor/github.com/sirupsen/logrus/terminal_windows.go b/vendor/github.com/sirupsen/logrus/terminal_windows.go index 7a33630..540aafc 100644 --- a/vendor/github.com/sirupsen/logrus/terminal_windows.go +++ b/vendor/github.com/sirupsen/logrus/terminal_windows.go @@ -41,7 +41,7 @@ func getVersion() (float64, error) { if err != nil { return -1, err } - + // The output should be like "Microsoft Windows [Version XX.X.XXXXXX]" version := strings.Replace(stdout.String(), "\n", "", -1) version = strings.Replace(version, "\r\n", "", -1)