From 98fd0b547a076a1fb87229065f8dbb416fa7dcf5 Mon Sep 17 00:00:00 2001 From: Shukui Yang Date: Fri, 14 Jul 2017 09:58:21 +0000 Subject: [PATCH] Fix a small probability panic for containerd. containerd-->main-->daemon step 1 |--> grpc_health_v1.RegisterHealthServer --> register(with lock)--> s.m[sd.ServiceName] = srv (map write) | step 2 |--> s.Serve(l)--> go s.handleStream(without lock) --> srv, ok := s.m[service] (map read) | step 3 |--> types.RegisterAPIServer --> register(with lock)--> s.m[sd.ServiceName] = srv (map write) In containerd'code currently, step 3 is called after step 1 and step 2, it may lead to "concurrent map read and map write", and the correct sequence should be step 2 comes last(step1/3-->step 2) Just see the following code, package main import ( "time" ) func noUse(k, v int) { } func main() { mp := make(map[int]int) go func() { for { for k, v := range mp { noUse(k, v) } time.Sleep(1 * time.Millisecond) } }() go func() { i := 0 for { mp[i] = i i++ time.Sleep(1 * time.Millisecond) } }() time.Sleep(100 * time.Second) } it can make panic like this, fatal error: concurrent map read and map write goroutine 4 [running]: runtime.throw(0x491ee0, 0x21) /usr/local/go/src/runtime/panic.go:547 +0x90 fp=0xc82002d660 sp=0xc82002d648 runtime.mapaccessK(0x460ee0, 0xc820010120, 0xc8202b8b40, 0x460ee0, 0x480200) /usr/local/go/src/runtime/hashmap.go:389 +0x5a fp=0xc82002d6a8 sp=0xc82002d660 runtime.mapiternext(0xc82002d750) /usr/local/go/src/runtime/hashmap.go:774 +0x63e fp=0xc82002d738 sp=0xc82002d6a8 main.main.func1(0xc820010120) /root/workspace/go/bin/concurrent.go:33 +0x79 fp=0xc82002d7b8 sp=0xc82002d738 runtime.goexit() /usr/local/go/src/runtime/asm_amd64.s:1998 +0x1 fp=0xc82002d7c0 sp=0xc82002d7b8 created by main.main /root/workspace/go/bin/concurrent.go:38 +0x69 goroutine 1 [sleep]: time.Sleep(0x174876e800) /usr/local/go/src/runtime/time.go:59 +0xf9 main.main() /root/workspace/go/bin/concurrent.go:48 +0x9e goroutine 5 [sleep]: time.Sleep(0x989680) /usr/local/go/src/runtime/time.go:59 +0xf9 main.main.func2(0xc820010120) /root/workspace/go/bin/concurrent.go:44 +0x71 created by main.main /root/workspace/go/bin/concurrent.go:46 +0x8 Signed-off-by: Shukui Yang --- containerd/main.go | 57 +++++++++++++++++++++------------------- supervisor/supervisor.go | 6 ++--- 2 files changed, 33 insertions(+), 30 deletions(-) diff --git a/containerd/main.go b/containerd/main.go index 5ca129a..4f98fbd 100644 --- a/containerd/main.go +++ b/containerd/main.go @@ -169,10 +169,6 @@ func main() { } func daemon(context *cli.Context) error { - stateDir := context.String("state-dir") - if err := os.MkdirAll(stateDir, 0755); err != nil { - return err - } s := make(chan os.Signal, 2048) signal.Notify(s, syscall.SIGTERM, syscall.SIGINT, syscall.SIGPIPE) // Split the listen string of the form proto://addr @@ -181,31 +177,11 @@ func daemon(context *cli.Context) error { if len(listenParts) != 2 { return fmt.Errorf("bad listen address format %s, expected proto://address", listenSpec) } - // Register server early to allow healthcheck to be done - server, err := startServer(listenParts[0], listenParts[1]) - if err != nil { - return err - } - sv, err := supervisor.New( - stateDir, - context.String("runtime"), - context.String("shim"), - context.StringSlice("runtime-args"), - context.Duration("start-timeout"), - context.Int("retain-count")) + + server, err := startServer(context, listenParts[0], listenParts[1]) if err != nil { return err } - types.RegisterAPIServer(server, grpcserver.NewServer(sv)) - wg := &sync.WaitGroup{} - for i := 0; i < 10; i++ { - wg.Add(1) - w := supervisor.NewWorker(sv, wg) - go w.Start() - } - if err := sv.Start(); err != nil { - return err - } for ss := range s { switch ss { case syscall.SIGPIPE: @@ -219,7 +195,12 @@ func daemon(context *cli.Context) error { return nil } -func startServer(protocol, address string) (*grpc.Server, error) { +func startServer(context *cli.Context, protocol, address string) (*grpc.Server, error) { + stateDir := context.String("state-dir") + if err := os.MkdirAll(stateDir, 0755); err != nil { + return nil, err + } + // TODO: We should use TLS. // TODO: Add an option for the SocketGroup. sockets, err := listeners.Init(protocol, address, "", nil) @@ -234,12 +215,34 @@ func startServer(protocol, address string) (*grpc.Server, error) { healthServer := health.NewServer() grpc_health_v1.RegisterHealthServer(s, healthServer) + sv, err := supervisor.New( + stateDir, + context.String("runtime"), + context.String("shim"), + context.StringSlice("runtime-args"), + context.Duration("start-timeout"), + context.Int("retain-count")) + if err != nil { + return nil, err + } + types.RegisterAPIServer(s, grpcserver.NewServer(sv)) + wg := &sync.WaitGroup{} + for i := 0; i < 10; i++ { + wg.Add(1) + w := supervisor.NewWorker(sv, wg) + go w.Start() + } + go func() { logrus.Debugf("containerd: grpc api on %s", address) if err := s.Serve(l); err != nil { logrus.WithField("error", err).Fatal("containerd: serve grpc") } }() + + if err := sv.Start(); err != nil { + return nil, err + } return s, nil } diff --git a/supervisor/supervisor.go b/supervisor/supervisor.go index eed68df..777e7ed 100644 --- a/supervisor/supervisor.go +++ b/supervisor/supervisor.go @@ -48,9 +48,6 @@ func New(stateDir string, runtimeName, shimName string, runtimeArgs []string, ti } go s.exitHandler() go s.oomHandler() - if err := s.restore(); err != nil { - return nil, err - } return s, nil } @@ -268,6 +265,9 @@ func (s *Supervisor) notifySubscribers(e Event) { // therefore it is save to do operations in the handlers that modify state of the system or // state of the Supervisor func (s *Supervisor) Start() error { + if err := s.restore(); err != nil { + return err + } logrus.WithFields(logrus.Fields{ "stateDir": s.stateDir, "runtime": s.runtime,