Skip to content

Commit

Permalink
[Experimental] - Max resource lifetime controller (docker-archive#545)
Browse files Browse the repository at this point in the history
Signed-off-by: David Chung <[email protected]>
David Chung authored May 24, 2017

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 007114d commit d30f4bf
Showing 7 changed files with 433 additions and 11 deletions.
3 changes: 2 additions & 1 deletion cmd/infrakit/main.go
Original file line number Diff line number Diff line change
@@ -19,8 +19,8 @@ import (

// TODO - deprecate these in favor of the dynamic commands (see above)
//_ "github.com/docker/infrakit/cmd/infrakit/flavor"
//_ "github.com/docker/infrakit/cmd/infrakit/instance"
//_ "github.com/docker/infrakit/cmd/infrakit/group"
//_ "github.com/docker/infrakit/cmd/infrakit/instance"
//_ "github.com/docker/infrakit/cmd/infrakit/resource"

_ "github.com/docker/infrakit/cmd/infrakit/event"
@@ -32,6 +32,7 @@ import (
_ "github.com/docker/infrakit/cmd/infrakit/remote"
_ "github.com/docker/infrakit/cmd/infrakit/template"
_ "github.com/docker/infrakit/cmd/infrakit/util"
_ "github.com/docker/infrakit/cmd/infrakit/x"
)

func init() {
6 changes: 5 additions & 1 deletion cmd/infrakit/util/util.go
Original file line number Diff line number Diff line change
@@ -21,7 +21,11 @@ func Command(plugins func() discovery.Plugins) *cobra.Command {
Short: "Utilties",
}

util.AddCommand(muxCommand(plugins), fileServerCommand(plugins), trackCommand(plugins))
util.AddCommand(
muxCommand(plugins),
fileServerCommand(plugins),
trackCommand(plugins),
)

return util
}
99 changes: 99 additions & 0 deletions cmd/infrakit/x/maxlife.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package x

import (
"os"
"strings"
"time"

"github.com/docker/infrakit/pkg/discovery"
"github.com/docker/infrakit/pkg/plugin"
instance_rpc "github.com/docker/infrakit/pkg/rpc/instance"
"github.com/docker/infrakit/pkg/spi/instance"
"github.com/docker/infrakit/pkg/x/maxlife"
"github.com/spf13/cobra"
)

func maxlifeCommand(plugins func() discovery.Plugins) *cobra.Command {

cmd := &cobra.Command{
Use: "maxlife <instance plugin name>...",
Short: "Sets max life on the given instances",
}

//name := cmd.Flags().String("name", "", "Name to use as name of this plugin")
poll := cmd.Flags().DurationP("poll", "i", 10*time.Second, "Polling interval")
maxlifeDuration := cmd.Flags().DurationP("maxlife", "m", 10*time.Minute, "Max lifetime of the resource")
flagTags := cmd.Flags().StringSliceP("tag", "t", []string{}, "Tags to filter instance by")

cmd.RunE = func(c *cobra.Command, args []string) error {

if len(args) == 0 {
cmd.Usage()
os.Exit(-1)
}

tags := toTags(*flagTags)

// Now we have a list of instance plugins to maxlife
plugins, err := getInstancePlugins(plugins, args)
if err != nil {
return err
}

// For each we start a goroutine to poll and kill instances
controllers := []*maxlife.Controller{}

for name, plugin := range plugins {

controller := maxlife.NewController(name, plugin, *poll, *maxlifeDuration, tags)
controller.Start()

controllers = append(controllers, controller)
}

// TODO - publish events when we start taking down instances.
done := make(chan struct{})

<-done
return nil
}

return cmd
}

func ensureMaxlife(name string, plugin instance.Plugin, stop chan struct{}, poll, maxlife time.Duration,
tags map[string]string, initialCount int) {
}
func getInstancePlugins(plugins func() discovery.Plugins, names []string) (map[string]instance.Plugin, error) {
targets := map[string]instance.Plugin{}
for _, target := range names {
endpoint, err := plugins().Find(plugin.Name(target))
if err != nil {
return nil, err
}
if p, err := instance_rpc.NewClient(plugin.Name(target), endpoint.Address); err == nil {
targets[target] = p
} else {
return nil, err
}
}
return targets, nil
}

func toTags(slice []string) map[string]string {
tags := map[string]string{}

for _, tag := range slice {
kv := strings.SplitN(tag, "=", 2)
if len(kv) != 2 {
log.Warn("bad format tag", "input", tag)
continue
}
key := strings.TrimSpace(kv[0])
val := strings.TrimSpace(kv[1])
if key != "" && val != "" {
tags[key] = val
}
}
return tags
}
29 changes: 29 additions & 0 deletions cmd/infrakit/x/x.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package x

import (
"github.com/docker/infrakit/cmd/infrakit/base"
"github.com/docker/infrakit/pkg/discovery"
logutil "github.com/docker/infrakit/pkg/log"
"github.com/spf13/cobra"
)

var log = logutil.New("module", "cli/x")

func init() {
base.Register(Command)
}

// Command is the head of this module
func Command(plugins func() discovery.Plugins) *cobra.Command {

experimental := &cobra.Command{
Use: "x",
Short: "Experimental features",
}

experimental.AddCommand(
maxlifeCommand(plugins),
)

return experimental
}
41 changes: 32 additions & 9 deletions pkg/types/link.go
Original file line number Diff line number Diff line change
@@ -20,25 +20,42 @@ func init() {
type Link struct {
value string
context string
created time.Time
}

// NewLink creates a link
func NewLink() *Link {
return &Link{
value: randomAlphaNumericString(16),
value: randomAlphaNumericString(16),
created: time.Now(),
}
}

// NewLinkFromMap constructs a link from data in the map
// Link related labels
const (
LinkLabel = "infrakit-link"
LinkContextLabel = "infrakit-link-context"
LinkCreatedLabel = "infrakit-link-created"
)

// NewLinkFromMap constructs a link from data in the map. The link will have missing data
// if the input does not contain the attribute labels.
func NewLinkFromMap(m map[string]string) *Link {
l := &Link{}
if v, has := m["infrakit-link"]; has {
if v, has := m[LinkLabel]; has {
l.value = v
}

if v, has := m["infrakit-link-context"]; has {
if v, has := m[LinkContextLabel]; has {
l.context = v
}

if v, has := m[LinkCreatedLabel]; has {
t, err := time.Parse(time.RFC3339, v)
if err == nil {
l.created = t
}
}
return l
}

@@ -52,9 +69,14 @@ func (l Link) Value() string {
return l.value
}

// Created returns the creation time of the link
func (l Link) Created() time.Time {
return l.created
}

// Label returns the label to look for the link
func (l Link) Label() string {
return "infrakit-link"
return LinkLabel
}

// Context returns the context of the link
@@ -80,8 +102,9 @@ func (l *Link) KVPairs() []string {
// Map returns a representation that is easily converted to JSON or YAML
func (l *Link) Map() map[string]string {
return map[string]string{
"infrakit-link": l.value,
"infrakit-link-context": l.context,
LinkLabel: l.value,
LinkContextLabel: l.context,
LinkCreatedLabel: l.created.Format(time.RFC3339),
}
}

@@ -94,15 +117,15 @@ func (l *Link) WriteMap(target map[string]string) {

// InMap returns true if the link is contained in the map
func (l *Link) InMap(m map[string]string) bool {
c, has := m["infrakit-link-context"]
c, has := m[LinkContextLabel]
if !has {
return false
}
if c != l.context {
return false
}

v, has := m["infrakit-link"]
v, has := m[LinkLabel]
if !has {
return false
}
151 changes: 151 additions & 0 deletions pkg/x/maxlife/maxlife.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package maxlife

import (
"math"
"time"

logutil "github.com/docker/infrakit/pkg/log"
"github.com/docker/infrakit/pkg/spi/instance"
"github.com/docker/infrakit/pkg/types"
)

var log = logutil.New("module", "x/maxlife")

// Controller is a single maxlife controller that works with a single instance
// plugin to ensure that the resource instances managed by the plugin do not
// exceed a specified lifetime.
type Controller struct {
name string
plugin instance.Plugin
poll time.Duration
maxlife time.Duration
tags map[string]string
stop chan struct{}
}

// NewController creates a controller based on the given plugin and configurations.
func NewController(name string, plugin instance.Plugin, poll, maxlife time.Duration,
tags map[string]string) *Controller {
return &Controller{
name: name,
plugin: plugin,
stop: make(chan struct{}),
poll: poll,
maxlife: maxlife,
}
}

// Stop stops the controller
func (c *Controller) Stop() {
close(c.stop)
}

// Start starts the controller running. This call does not block.
func (c *Controller) Start() {
go c.run()
}

func (c *Controller) run() {
initialCount := 0
loop:
for {
described, err := c.plugin.DescribeInstances(c.tags, false)
if err != nil {
log.Warn("cannot get initial count", "name", c.name, "err", err)
} else {
initialCount = len(described)
break loop
}

// Wait a little bit before trying again -- use the same poll interval
<-time.After(c.poll)
}

// Now we have initial state, continue with the sampling and monitoring of instances.
c.ensureMaxlife(initialCount)
}

func (c *Controller) ensureMaxlife(initialCount int) {

tick := time.Tick(c.poll)

loop:
for {

select {

case now := <-tick:

log.Info("TICK")

described, err := c.plugin.DescribeInstances(c.tags, false)
if err != nil {
// Transient error?
log.Warn("error describing instances", "name", c.name, "err", err)
continue
}

// If we are not in a steady state, don't destroy the instances. This is
// important so that we don't take down the whole cluster without restraint.
if len(described) != initialCount {
log.Info("Not steady state yet. No action")
continue
}

// Just pick a single oldest instance per polling cycle. This is so
// that we don't end up destroying the cluster by taking down too many instances
// all at once.
oldest := maxAge(described, now)

// check to make sure the age is over the maxlife
if age(oldest, now) > c.maxlife {

log.Info("Destroying", "oldest", oldest, "age", age(oldest, now), "maxlife", c.maxlife)

// terminate it and hope the group controller restores with a new intance
err = c.plugin.Destroy(oldest.ID)
if err != nil {
log.Warn("cannot destroy instance", "name", c.name, "id", oldest.ID, "err", err)
continue
}
}

case <-c.stop:
log.Info("stop requested", "name", c.name)
break loop
}
}

log.Info("maxlife stopped", "name", c.name)
return
}

// age returns the age to the nearest second
func age(instance instance.Description, now time.Time) (age time.Duration) {
link := types.NewLinkFromMap(instance.Tags)
if link.Valid() {
age = now.Sub(link.Created())
age = time.Duration(math.Floor(age.Seconds())) * time.Second
}
return
}

func maxAge(instances []instance.Description, now time.Time) (result instance.Description) {
if len(instances) == 0 || instances == nil {
return
}

// check to see if the tags of the instances have links. Links have a creation date and
// we can use it to compute the age
var max time.Duration
var found = 0
for i, instance := range instances {
age := age(instance, now)
if age > max {
max = age
found = i
}
}
result = instances[found]
return
}
115 changes: 115 additions & 0 deletions pkg/x/maxlife/maxlife_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package maxlife

import (
"fmt"
"testing"
"time"

"github.com/docker/infrakit/pkg/spi/instance"
fake "github.com/docker/infrakit/pkg/testing/instance"
"github.com/docker/infrakit/pkg/types"
"github.com/stretchr/testify/require"
)

func TestAge(t *testing.T) {

link := types.NewLink()
created := link.Created()

instance := instance.Description{
ID: instance.ID("test"),
Tags: link.Map(),
}

require.Equal(t, 1*time.Hour, age(instance, created.Add(1*time.Hour)))
require.Equal(t, 59*time.Second, age(instance, created.Add(59*time.Second)))
}

func TestMaxAge(t *testing.T) {

instances := []instance.Description{}

for i := 0; i < 3; i++ {
instances = append(instances, instance.Description{
ID: instance.ID(fmt.Sprintf("test%d", i)),
Tags: types.NewLink().Map(),
})

<-time.After(1 * time.Second)
}

require.True(t, age(instances[0], time.Now()) > 1*time.Second)
maxAge := maxAge(instances, time.Now())
require.Equal(t, "test0", string(maxAge.ID))

}

func TestStartStop(t *testing.T) {

poll := 100 * time.Millisecond
maxlife := 1 * time.Second
tags := map[string]string{}

plugin := &fake.Plugin{
DoDescribeInstances: func(tags map[string]string, details bool) ([]instance.Description, error) {
return nil, nil
},
DoDestroy: func(instance instance.ID) error {
return nil
},
}

controller := NewController("test", plugin, poll, maxlife, tags)
controller.Start()

<-time.After(1 * time.Second)

controller.Stop()
}

func TestEnsureMaxlife(t *testing.T) {

poll := 100 * time.Millisecond
maxlife := 1 * time.Second
tags := map[string]string{}

all := map[instance.ID]instance.Description{}
for i := 0; i < 5; i++ {
inst := instance.Description{
ID: instance.ID(fmt.Sprintf("%d", i)),
Tags: types.NewLink().Map(),
}
all[inst.ID] = inst
<-time.After(500 * time.Millisecond)
}

destroy := make(chan instance.ID, 2)
plugin := &fake.Plugin{
DoDescribeInstances: func(tags map[string]string, details bool) ([]instance.Description, error) {

list := []instance.Description{}
for _, inst := range all {
list = append(list, inst)
}
return list, nil
},
DoDestroy: func(instance instance.ID) error {
delete(all, instance)
destroy <- instance
return nil
},
}

controller := NewController("test", plugin, poll, maxlife, tags)

go controller.ensureMaxlife(len(all))

<-time.After(2 * time.Second)
controller.Stop()

// now read what we were destroying
d := <-destroy

require.Equal(t, instance.ID("0"), d)

}

0 comments on commit d30f4bf

Please sign in to comment.