Skip to content

Commit

Permalink
Sync receive console with io copy routines
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Crosby <[email protected]>
  • Loading branch information
crosbymichael committed May 3, 2017
1 parent a34bc2b commit d10daaf
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 46 deletions.
19 changes: 14 additions & 5 deletions containerd-shim/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ type controlMessage struct {
// Arg2: runtime binary
func main() {
flag.Parse()
cwd, err := os.Getwd()
if err != nil {
panic(err)
}
f, err := os.OpenFile(filepath.Join(cwd, "shim-log.json"), os.O_CREATE|os.O_WRONLY|os.O_APPEND|os.O_SYNC, 0666)
/*
cwd, err := os.Getwd()
if err != nil {
panic(err)
}
*/
f, err := os.OpenFile(filepath.Join("/tmp", "shim-log.json"), os.O_CREATE|os.O_WRONLY|os.O_APPEND|os.O_SYNC, 0666)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -82,6 +84,9 @@ func start(log *os.File) error {
if err != nil {
return err
}
if err := p.openIO(); err != nil {
return err
}
defer func() {
if err := p.Close(); err != nil {
writeMessage(log, "warn", err)
Expand All @@ -91,6 +96,10 @@ func start(log *os.File) error {
p.delete()
return err
}
if err := <-p.consoleErrCh; err != nil {
p.delete()
return err
}
msgC := make(chan controlMessage, 32)
go func() {
for {
Expand Down
15 changes: 7 additions & 8 deletions containerd-shim/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,17 @@ type process struct {
runtime string
ioCleanupFn func()

console console.Console
socket *runc.Socket
socket *runc.Socket
console console.Console
consoleErrCh chan error
}

func newProcess(id, bundle, runtimeName string) (*process, error) {
p := &process{
id: id,
bundle: bundle,
runtime: runtimeName,
id: id,
bundle: bundle,
runtime: runtimeName,
consoleErrCh: make(chan error, 1),
}
s, err := loadProcess()
if err != nil {
Expand All @@ -90,9 +92,6 @@ func newProcess(id, bundle, runtimeName string) (*process, error) {
p.checkpoint = cpt
p.checkpointPath = s.CheckpointPath
}
if err := p.openIO(); err != nil {
return nil, err
}
return p, nil
}

Expand Down
101 changes: 69 additions & 32 deletions containerd-shim/process_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/containerd/containerd/osutils"
"github.com/crosbymichael/console"
runc "github.com/crosbymichael/go-runc"
"github.com/tonistiigi/fifo"
"golang.org/x/net/context"
Expand Down Expand Up @@ -38,47 +39,39 @@ func (p *process) openIO() error {
return err
}
p.socket = socket
consoleCh := p.waitConsole(socket)

go func() error {
master, err := socket.ReceiveMaster()
if err != nil {
return err
}
p.console = master
stdin, err := fifo.OpenFifo(ctx, p.state.Stdin, syscall.O_RDONLY, 0)
if err != nil {
return err
}
go io.Copy(master, stdin)
stdoutw, err := fifo.OpenFifo(ctx, p.state.Stdout, syscall.O_WRONLY, 0)
if err != nil {
return err
}
stdoutr, err := fifo.OpenFifo(ctx, p.state.Stdout, syscall.O_RDONLY, 0)
if err != nil {
return err
}
p.Add(1)
p.ioCleanupFn = func() {
master.Close()
stdoutr.Close()
stdoutw.Close()
}
go func() {
io.Copy(stdoutw, master)
p.Done()
}()
return nil
}()
stdin, err := fifo.OpenFifo(ctx, p.state.Stdin, syscall.O_RDONLY, 0)
if err != nil {
return err
}
stdoutw, err := fifo.OpenFifo(ctx, p.state.Stdout, syscall.O_WRONLY, 0)
if err != nil {
return err
}
stdoutr, err := fifo.OpenFifo(ctx, p.state.Stdout, syscall.O_RDONLY, 0)
if err != nil {
return err
}
// open the fifos but wait until we receive the console before we start
// copying data back and forth between the two
go p.setConsole(consoleCh, stdin, stdoutw)

p.Add(1)
p.ioCleanupFn = func() {
stdoutr.Close()
stdoutw.Close()
}
return nil
}
close(p.consoleErrCh)
i, err := p.initializeIO(uid, gid)
if err != nil {
return err
}
p.shimIO = i
// non-tty
var ioClosers []io.Closer
ioClosers := []io.Closer{}
for _, pair := range []struct {
name string
dest func(wc io.WriteCloser, rc io.Closer)
Expand Down Expand Up @@ -138,6 +131,9 @@ func (p *process) Wait() {
if p.ioCleanupFn != nil {
p.ioCleanupFn()
}
if p.console != nil {
p.console.Close()
}
}

func (p *process) killAll() error {
Expand All @@ -151,3 +147,44 @@ func (p *process) killAll() error {
}
return nil
}

func (p *process) setConsole(c <-chan *consoleR, stdin io.Reader, stdout io.Writer) {
r := <-c
if r.err != nil {
p.consoleErrCh <- r.err
return
} else {
close(p.consoleErrCh)
}
p.console = r.c
// copy from the console into the provided fifos
go io.Copy(r.c, stdin)
go func() {
io.Copy(stdout, r.c)
p.Done()
}()
}

type consoleR struct {
c console.Console
err error
}

func (p *process) waitConsole(socket *runc.Socket) <-chan *consoleR {
c := make(chan *consoleR, 1)
go func() {
master, err := socket.ReceiveMaster()
socket.Close()
if err != nil {
c <- &consoleR{
err: err,
}
return
}
c <- &consoleR{
c: master,
}
}()
return c

}
12 changes: 11 additions & 1 deletion ctr/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ import (
"text/tabwriter"
"time"

"github.com/urfave/cli"
"github.com/containerd/containerd/api/grpc/types"
"github.com/containerd/containerd/specs"
"github.com/crosbymichael/console"
"github.com/golang/protobuf/ptypes"
"github.com/urfave/cli"
netcontext "golang.org/x/net/context"
"golang.org/x/sys/unix"
"google.golang.org/grpc"
Expand Down Expand Up @@ -260,6 +260,16 @@ var startCommand = cli.Command{
}
}()
}
time.Sleep(2 * time.Second)
log.Println("closing stdin now")
if _, err := c.UpdateProcess(netcontext.Background(), &types.UpdateProcessRequest{
Id: id,
Pid: "init",
CloseStdin: true,
}); err != nil {
fatal(err.Error(), 1)
}

waitForExit(c, events, id, "init", con)
}
return nil
Expand Down

0 comments on commit d10daaf

Please sign in to comment.