diff --git a/cmd/infrakit/main.go b/cmd/infrakit/main.go index 6d9142936..1fb0b1b33 100644 --- a/cmd/infrakit/main.go +++ b/cmd/infrakit/main.go @@ -19,8 +19,8 @@ import ( // TODO - deprecate these in favor of the dynamic commands (see above) //_ "github.com/docker/infrakit/cmd/infrakit/flavor" - //_ "github.com/docker/infrakit/cmd/infrakit/instance" //_ "github.com/docker/infrakit/cmd/infrakit/group" + //_ "github.com/docker/infrakit/cmd/infrakit/instance" //_ "github.com/docker/infrakit/cmd/infrakit/resource" _ "github.com/docker/infrakit/cmd/infrakit/event" @@ -32,6 +32,7 @@ import ( _ "github.com/docker/infrakit/cmd/infrakit/remote" _ "github.com/docker/infrakit/cmd/infrakit/template" _ "github.com/docker/infrakit/cmd/infrakit/util" + _ "github.com/docker/infrakit/cmd/infrakit/x" ) func init() { diff --git a/cmd/infrakit/util/util.go b/cmd/infrakit/util/util.go index 4ada0f8ad..812c1c7fc 100644 --- a/cmd/infrakit/util/util.go +++ b/cmd/infrakit/util/util.go @@ -21,7 +21,11 @@ func Command(plugins func() discovery.Plugins) *cobra.Command { Short: "Utilties", } - util.AddCommand(muxCommand(plugins), fileServerCommand(plugins), trackCommand(plugins)) + util.AddCommand( + muxCommand(plugins), + fileServerCommand(plugins), + trackCommand(plugins), + ) return util } diff --git a/cmd/infrakit/x/maxlife.go b/cmd/infrakit/x/maxlife.go new file mode 100644 index 000000000..59498fd19 --- /dev/null +++ b/cmd/infrakit/x/maxlife.go @@ -0,0 +1,99 @@ +package x + +import ( + "os" + "strings" + "time" + + "github.com/docker/infrakit/pkg/discovery" + "github.com/docker/infrakit/pkg/plugin" + instance_rpc "github.com/docker/infrakit/pkg/rpc/instance" + "github.com/docker/infrakit/pkg/spi/instance" + "github.com/docker/infrakit/pkg/x/maxlife" + "github.com/spf13/cobra" +) + +func maxlifeCommand(plugins func() discovery.Plugins) *cobra.Command { + + cmd := &cobra.Command{ + Use: "maxlife ...", + Short: "Sets max life on the given instances", + } + + //name := cmd.Flags().String("name", "", "Name to use as name of this plugin") + poll := cmd.Flags().DurationP("poll", "i", 10*time.Second, "Polling interval") + maxlifeDuration := cmd.Flags().DurationP("maxlife", "m", 10*time.Minute, "Max lifetime of the resource") + flagTags := cmd.Flags().StringSliceP("tag", "t", []string{}, "Tags to filter instance by") + + cmd.RunE = func(c *cobra.Command, args []string) error { + + if len(args) == 0 { + cmd.Usage() + os.Exit(-1) + } + + tags := toTags(*flagTags) + + // Now we have a list of instance plugins to maxlife + plugins, err := getInstancePlugins(plugins, args) + if err != nil { + return err + } + + // For each we start a goroutine to poll and kill instances + controllers := []*maxlife.Controller{} + + for name, plugin := range plugins { + + controller := maxlife.NewController(name, plugin, *poll, *maxlifeDuration, tags) + controller.Start() + + controllers = append(controllers, controller) + } + + // TODO - publish events when we start taking down instances. + done := make(chan struct{}) + + <-done + return nil + } + + return cmd +} + +func ensureMaxlife(name string, plugin instance.Plugin, stop chan struct{}, poll, maxlife time.Duration, + tags map[string]string, initialCount int) { +} +func getInstancePlugins(plugins func() discovery.Plugins, names []string) (map[string]instance.Plugin, error) { + targets := map[string]instance.Plugin{} + for _, target := range names { + endpoint, err := plugins().Find(plugin.Name(target)) + if err != nil { + return nil, err + } + if p, err := instance_rpc.NewClient(plugin.Name(target), endpoint.Address); err == nil { + targets[target] = p + } else { + return nil, err + } + } + return targets, nil +} + +func toTags(slice []string) map[string]string { + tags := map[string]string{} + + for _, tag := range slice { + kv := strings.SplitN(tag, "=", 2) + if len(kv) != 2 { + log.Warn("bad format tag", "input", tag) + continue + } + key := strings.TrimSpace(kv[0]) + val := strings.TrimSpace(kv[1]) + if key != "" && val != "" { + tags[key] = val + } + } + return tags +} diff --git a/cmd/infrakit/x/x.go b/cmd/infrakit/x/x.go new file mode 100644 index 000000000..cfdf1ad07 --- /dev/null +++ b/cmd/infrakit/x/x.go @@ -0,0 +1,29 @@ +package x + +import ( + "github.com/docker/infrakit/cmd/infrakit/base" + "github.com/docker/infrakit/pkg/discovery" + logutil "github.com/docker/infrakit/pkg/log" + "github.com/spf13/cobra" +) + +var log = logutil.New("module", "cli/x") + +func init() { + base.Register(Command) +} + +// Command is the head of this module +func Command(plugins func() discovery.Plugins) *cobra.Command { + + experimental := &cobra.Command{ + Use: "x", + Short: "Experimental features", + } + + experimental.AddCommand( + maxlifeCommand(plugins), + ) + + return experimental +} diff --git a/pkg/types/link.go b/pkg/types/link.go index 1507b712d..e2dfece34 100644 --- a/pkg/types/link.go +++ b/pkg/types/link.go @@ -20,25 +20,42 @@ func init() { type Link struct { value string context string + created time.Time } // NewLink creates a link func NewLink() *Link { return &Link{ - value: randomAlphaNumericString(16), + value: randomAlphaNumericString(16), + created: time.Now(), } } -// NewLinkFromMap constructs a link from data in the map +// Link related labels +const ( + LinkLabel = "infrakit-link" + LinkContextLabel = "infrakit-link-context" + LinkCreatedLabel = "infrakit-link-created" +) + +// NewLinkFromMap constructs a link from data in the map. The link will have missing data +// if the input does not contain the attribute labels. func NewLinkFromMap(m map[string]string) *Link { l := &Link{} - if v, has := m["infrakit-link"]; has { + if v, has := m[LinkLabel]; has { l.value = v } - if v, has := m["infrakit-link-context"]; has { + if v, has := m[LinkContextLabel]; has { l.context = v } + + if v, has := m[LinkCreatedLabel]; has { + t, err := time.Parse(time.RFC3339, v) + if err == nil { + l.created = t + } + } return l } @@ -52,9 +69,14 @@ func (l Link) Value() string { return l.value } +// Created returns the creation time of the link +func (l Link) Created() time.Time { + return l.created +} + // Label returns the label to look for the link func (l Link) Label() string { - return "infrakit-link" + return LinkLabel } // Context returns the context of the link @@ -80,8 +102,9 @@ func (l *Link) KVPairs() []string { // Map returns a representation that is easily converted to JSON or YAML func (l *Link) Map() map[string]string { return map[string]string{ - "infrakit-link": l.value, - "infrakit-link-context": l.context, + LinkLabel: l.value, + LinkContextLabel: l.context, + LinkCreatedLabel: l.created.Format(time.RFC3339), } } @@ -94,7 +117,7 @@ func (l *Link) WriteMap(target map[string]string) { // InMap returns true if the link is contained in the map func (l *Link) InMap(m map[string]string) bool { - c, has := m["infrakit-link-context"] + c, has := m[LinkContextLabel] if !has { return false } @@ -102,7 +125,7 @@ func (l *Link) InMap(m map[string]string) bool { return false } - v, has := m["infrakit-link"] + v, has := m[LinkLabel] if !has { return false } diff --git a/pkg/x/maxlife/maxlife.go b/pkg/x/maxlife/maxlife.go new file mode 100644 index 000000000..05a24d50d --- /dev/null +++ b/pkg/x/maxlife/maxlife.go @@ -0,0 +1,151 @@ +package maxlife + +import ( + "math" + "time" + + logutil "github.com/docker/infrakit/pkg/log" + "github.com/docker/infrakit/pkg/spi/instance" + "github.com/docker/infrakit/pkg/types" +) + +var log = logutil.New("module", "x/maxlife") + +// Controller is a single maxlife controller that works with a single instance +// plugin to ensure that the resource instances managed by the plugin do not +// exceed a specified lifetime. +type Controller struct { + name string + plugin instance.Plugin + poll time.Duration + maxlife time.Duration + tags map[string]string + stop chan struct{} +} + +// NewController creates a controller based on the given plugin and configurations. +func NewController(name string, plugin instance.Plugin, poll, maxlife time.Duration, + tags map[string]string) *Controller { + return &Controller{ + name: name, + plugin: plugin, + stop: make(chan struct{}), + poll: poll, + maxlife: maxlife, + } +} + +// Stop stops the controller +func (c *Controller) Stop() { + close(c.stop) +} + +// Start starts the controller running. This call does not block. +func (c *Controller) Start() { + go c.run() +} + +func (c *Controller) run() { + initialCount := 0 +loop: + for { + described, err := c.plugin.DescribeInstances(c.tags, false) + if err != nil { + log.Warn("cannot get initial count", "name", c.name, "err", err) + } else { + initialCount = len(described) + break loop + } + + // Wait a little bit before trying again -- use the same poll interval + <-time.After(c.poll) + } + + // Now we have initial state, continue with the sampling and monitoring of instances. + c.ensureMaxlife(initialCount) +} + +func (c *Controller) ensureMaxlife(initialCount int) { + + tick := time.Tick(c.poll) + +loop: + for { + + select { + + case now := <-tick: + + log.Info("TICK") + + described, err := c.plugin.DescribeInstances(c.tags, false) + if err != nil { + // Transient error? + log.Warn("error describing instances", "name", c.name, "err", err) + continue + } + + // If we are not in a steady state, don't destroy the instances. This is + // important so that we don't take down the whole cluster without restraint. + if len(described) != initialCount { + log.Info("Not steady state yet. No action") + continue + } + + // Just pick a single oldest instance per polling cycle. This is so + // that we don't end up destroying the cluster by taking down too many instances + // all at once. + oldest := maxAge(described, now) + + // check to make sure the age is over the maxlife + if age(oldest, now) > c.maxlife { + + log.Info("Destroying", "oldest", oldest, "age", age(oldest, now), "maxlife", c.maxlife) + + // terminate it and hope the group controller restores with a new intance + err = c.plugin.Destroy(oldest.ID) + if err != nil { + log.Warn("cannot destroy instance", "name", c.name, "id", oldest.ID, "err", err) + continue + } + } + + case <-c.stop: + log.Info("stop requested", "name", c.name) + break loop + } + } + + log.Info("maxlife stopped", "name", c.name) + return +} + +// age returns the age to the nearest second +func age(instance instance.Description, now time.Time) (age time.Duration) { + link := types.NewLinkFromMap(instance.Tags) + if link.Valid() { + age = now.Sub(link.Created()) + age = time.Duration(math.Floor(age.Seconds())) * time.Second + } + return +} + +func maxAge(instances []instance.Description, now time.Time) (result instance.Description) { + if len(instances) == 0 || instances == nil { + return + } + + // check to see if the tags of the instances have links. Links have a creation date and + // we can use it to compute the age + var max time.Duration + var found = 0 + for i, instance := range instances { + age := age(instance, now) + if age > max { + max = age + found = i + } + } + result = instances[found] + return +} diff --git a/pkg/x/maxlife/maxlife_test.go b/pkg/x/maxlife/maxlife_test.go new file mode 100644 index 000000000..1fa007b5b --- /dev/null +++ b/pkg/x/maxlife/maxlife_test.go @@ -0,0 +1,115 @@ +package maxlife + +import ( + "fmt" + "testing" + "time" + + "github.com/docker/infrakit/pkg/spi/instance" + fake "github.com/docker/infrakit/pkg/testing/instance" + "github.com/docker/infrakit/pkg/types" + "github.com/stretchr/testify/require" +) + +func TestAge(t *testing.T) { + + link := types.NewLink() + created := link.Created() + + instance := instance.Description{ + ID: instance.ID("test"), + Tags: link.Map(), + } + + require.Equal(t, 1*time.Hour, age(instance, created.Add(1*time.Hour))) + require.Equal(t, 59*time.Second, age(instance, created.Add(59*time.Second))) +} + +func TestMaxAge(t *testing.T) { + + instances := []instance.Description{} + + for i := 0; i < 3; i++ { + instances = append(instances, instance.Description{ + ID: instance.ID(fmt.Sprintf("test%d", i)), + Tags: types.NewLink().Map(), + }) + + <-time.After(1 * time.Second) + } + + require.True(t, age(instances[0], time.Now()) > 1*time.Second) + maxAge := maxAge(instances, time.Now()) + require.Equal(t, "test0", string(maxAge.ID)) + +} + +func TestStartStop(t *testing.T) { + + poll := 100 * time.Millisecond + maxlife := 1 * time.Second + tags := map[string]string{} + + plugin := &fake.Plugin{ + DoDescribeInstances: func(tags map[string]string, details bool) ([]instance.Description, error) { + return nil, nil + }, + DoDestroy: func(instance instance.ID) error { + return nil + }, + } + + controller := NewController("test", plugin, poll, maxlife, tags) + controller.Start() + + <-time.After(1 * time.Second) + + controller.Stop() +} + +func TestEnsureMaxlife(t *testing.T) { + + poll := 100 * time.Millisecond + maxlife := 1 * time.Second + tags := map[string]string{} + + all := map[instance.ID]instance.Description{} + for i := 0; i < 5; i++ { + inst := instance.Description{ + ID: instance.ID(fmt.Sprintf("%d", i)), + Tags: types.NewLink().Map(), + } + all[inst.ID] = inst + <-time.After(500 * time.Millisecond) + } + + destroy := make(chan instance.ID, 2) + plugin := &fake.Plugin{ + DoDescribeInstances: func(tags map[string]string, details bool) ([]instance.Description, error) { + + list := []instance.Description{} + for _, inst := range all { + list = append(list, inst) + } + return list, nil + }, + DoDestroy: func(instance instance.ID) error { + delete(all, instance) + destroy <- instance + return nil + }, + } + + controller := NewController("test", plugin, poll, maxlife, tags) + + go controller.ensureMaxlife(len(all)) + + <-time.After(2 * time.Second) + controller.Stop() + + // now read what we were destroying + d := <-destroy + + require.Equal(t, instance.ID("0"), d) + +}