diff --git a/containerd/main.go b/containerd/main.go index 5ca129a..4f98fbd 100644 --- a/containerd/main.go +++ b/containerd/main.go @@ -169,10 +169,6 @@ func main() { } func daemon(context *cli.Context) error { - stateDir := context.String("state-dir") - if err := os.MkdirAll(stateDir, 0755); err != nil { - return err - } s := make(chan os.Signal, 2048) signal.Notify(s, syscall.SIGTERM, syscall.SIGINT, syscall.SIGPIPE) // Split the listen string of the form proto://addr @@ -181,31 +177,11 @@ func daemon(context *cli.Context) error { if len(listenParts) != 2 { return fmt.Errorf("bad listen address format %s, expected proto://address", listenSpec) } - // Register server early to allow healthcheck to be done - server, err := startServer(listenParts[0], listenParts[1]) - if err != nil { - return err - } - sv, err := supervisor.New( - stateDir, - context.String("runtime"), - context.String("shim"), - context.StringSlice("runtime-args"), - context.Duration("start-timeout"), - context.Int("retain-count")) + + server, err := startServer(context, listenParts[0], listenParts[1]) if err != nil { return err } - types.RegisterAPIServer(server, grpcserver.NewServer(sv)) - wg := &sync.WaitGroup{} - for i := 0; i < 10; i++ { - wg.Add(1) - w := supervisor.NewWorker(sv, wg) - go w.Start() - } - if err := sv.Start(); err != nil { - return err - } for ss := range s { switch ss { case syscall.SIGPIPE: @@ -219,7 +195,12 @@ func daemon(context *cli.Context) error { return nil } -func startServer(protocol, address string) (*grpc.Server, error) { +func startServer(context *cli.Context, protocol, address string) (*grpc.Server, error) { + stateDir := context.String("state-dir") + if err := os.MkdirAll(stateDir, 0755); err != nil { + return nil, err + } + // TODO: We should use TLS. // TODO: Add an option for the SocketGroup. sockets, err := listeners.Init(protocol, address, "", nil) @@ -234,12 +215,34 @@ func startServer(protocol, address string) (*grpc.Server, error) { healthServer := health.NewServer() grpc_health_v1.RegisterHealthServer(s, healthServer) + sv, err := supervisor.New( + stateDir, + context.String("runtime"), + context.String("shim"), + context.StringSlice("runtime-args"), + context.Duration("start-timeout"), + context.Int("retain-count")) + if err != nil { + return nil, err + } + types.RegisterAPIServer(s, grpcserver.NewServer(sv)) + wg := &sync.WaitGroup{} + for i := 0; i < 10; i++ { + wg.Add(1) + w := supervisor.NewWorker(sv, wg) + go w.Start() + } + go func() { logrus.Debugf("containerd: grpc api on %s", address) if err := s.Serve(l); err != nil { logrus.WithField("error", err).Fatal("containerd: serve grpc") } }() + + if err := sv.Start(); err != nil { + return nil, err + } return s, nil } diff --git a/supervisor/supervisor.go b/supervisor/supervisor.go index eed68df..777e7ed 100644 --- a/supervisor/supervisor.go +++ b/supervisor/supervisor.go @@ -48,9 +48,6 @@ func New(stateDir string, runtimeName, shimName string, runtimeArgs []string, ti } go s.exitHandler() go s.oomHandler() - if err := s.restore(); err != nil { - return nil, err - } return s, nil } @@ -268,6 +265,9 @@ func (s *Supervisor) notifySubscribers(e Event) { // therefore it is save to do operations in the handlers that modify state of the system or // state of the Supervisor func (s *Supervisor) Start() error { + if err := s.restore(); err != nil { + return err + } logrus.WithFields(logrus.Fields{ "stateDir": s.stateDir, "runtime": s.runtime,