Skip to content

Commit

Permalink
增加基于 hpa 的 metrics,实现在规定时间内自动扩容
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaoyeshiyu committed Apr 20, 2024
1 parent fd73dea commit 4c97a76
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 47 deletions.
10 changes: 5 additions & 5 deletions .air.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ tmp_dir = "tmp"

[build]
# Just plain old shell command. You could use `make` as well. go build -o bin/manager cmd/main.go
cmd = "go build -o /home/tmp/main cmd/main.go"
cmd = "go build -o ./tmp/main cmd/main.go"
# Binary file yields from `cmd`.
bin = "/home/tmp/main"
bin = "./tmp/main"
# Customize binary, can setup environment variables when run your app.
full_bin = "/home/tmp/main"
full_bin = "./tmp/main"
# Watch these filename extensions.
include_ext = ["go", "tpl", "tmpl", "html","mod","sum","conf","yaml"]
include_ext = ["go", "tpl", "tmpl", "html","mod","sum","conf"]
# Ignore these filename extensions or directories.
exclude_dir = ["assets", "tmp", "vendor", "frontend/node_modules"]
exclude_dir = ["assets", "tmp", "vendor", "frontend/node_modules","yaml"]
# Watch these directories if you specified.
include_dir = []
# Watch these files.
Expand Down
1 change: 1 addition & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func main() {
if err = (&controller.TaskCrdReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Tasks: make(map[string]controller.Task),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "TaskCrd")
os.Exit(1)
Expand Down
11 changes: 8 additions & 3 deletions config/crd/bases/webapp.xiaoyeshiyu.domain_taskcrds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,23 @@ spec:
spec:
description: TaskCrdSpec defines the desired state of TaskCrd
properties:
deploy:
average_utilization:
format: int32
type: integer
deployment:
type: string
end:
type: string
end_replicas:
format: int32
type: integer
start:
hpa:
type: string
start_replicas:
max_replicas:
format: int32
type: integer
start:
type: string
type: object
status:
description: TaskCrdStatus defines the observed state of TaskCrd
Expand Down
4 changes: 2 additions & 2 deletions config/samples/webapp_v1_taskcrd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ spec:
deployment: nginx
hpa: php-apache
average_utilization: 10
start: "* * * * * *"
start: "34 22 * * *"
max_replicas: 10
end: "10 * * * * *"
end: "40 22 * * *"
end_replicas: 1
16 changes: 16 additions & 0 deletions config/samples/webapp_v1_taskcrd_2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
apiVersion: webapp.xiaoyeshiyu.domain/v1
kind: TaskCrd
metadata:
labels:
app.kubernetes.io/name: task-crd
app.kubernetes.io/managed-by: kustomize
name: taskcrd-sample-2
namespace: m1
spec:
deployment: nginx
hpa: php-apache
average_utilization: 10
start: "37 22 * * *"
max_replicas: 10
end: "45 22 * * *"
end_replicas: 1
152 changes: 115 additions & 37 deletions internal/controller/taskcrd_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,20 @@ import (
type TaskCrdReconciler struct {
client.Client
Scheme *runtime.Scheme

Tasks map[string]Task
}

type Task struct {
name string
cron *cron.Cron
start chan struct{}
end chan struct{}
done chan struct{}
}

func name(namespace string, name string) string {
return namespace + "." + name
}

//+kubebuilder:rbac:groups=webapp.xiaoyeshiyu.domain,resources=taskcrds,verbs=get;list;watch;create;update;patch;delete
Expand All @@ -54,13 +68,24 @@ type TaskCrdReconciler struct {
func (r *TaskCrdReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx)

log.Log.Info("task-crd", "request", req)

// 获取 CRD
var taskCrd webappv1.TaskCrd
err := r.Client.Get(ctx, req.NamespacedName, &taskCrd)
if err != nil {
return ctrl.Result{}, err
log.Log.Error(err, "get task crd")
// delete crd
if task, ok := r.Tasks[name(taskCrd.Namespace, taskCrd.Name)]; ok {
task.stop(ctx)
return ctrl.Result{}, nil
}

return ctrl.Result{}, nil
}

log.Log.Info("task-crd", "taskCrd Info", taskCrd)

// 获取 deployment
if taskCrd.Spec.Deployment == "" {
return ctrl.Result{}, nil
Expand All @@ -83,58 +108,122 @@ func (r *TaskCrdReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{}, err
}

var (
start = make(chan struct{})
end = make(chan struct{})
)
// 都存在,则开始执行
c := cron.New()
c.AddFunc(taskCrd.Spec.Start, func() {
start <- struct{}{}
// 获取任务
if task, ok := r.Tasks[taskCrd.Namespace+taskCrd.Name]; ok {
task.stop(ctx)
}

task := Task{
name: name(taskCrd.Namespace, taskCrd.Name),
cron: cron.New(),
start: make(chan struct{}),
end: make(chan struct{}),
done: make(chan struct{}),
}
r.Tasks[task.name] = task

// add new job
_, err = task.cron.AddFunc(taskCrd.Spec.Start, func() {
log.Log.Info("crontab", "start", time.Now())
task.start <- struct{}{}
})
if err != nil {
return ctrl.Result{}, err
}

c.AddFunc(taskCrd.Spec.End, func() {
end <- struct{}{}
_, err = task.cron.AddFunc(taskCrd.Spec.End, func() {
log.Log.Info("crontab", "end", time.Now())
task.end <- struct{}{}
})
if err != nil {
return ctrl.Result{}, err
}

go task.do(ctx, r.Client, taskCrd)

task.cron.Start()

return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *TaskCrdReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&webappv1.TaskCrd{}).
Complete(r)
}

func (t *Task) stop(ctx context.Context) {
log.Log.Info("task", "stop", t.name)
// clean all cron job
for _, entry := range t.cron.Entries() {
t.cron.Remove(entry.ID)
}
close(t.done)
t.cron.Stop()
}

func (t *Task) do(ctx context.Context, c client.Client, taskCrd webappv1.TaskCrd) {
log.Log.Info("do start")
for {
OUT:
select {
case <-t.done:
log.Log.Info("do", "task done", t.name)
return
case <-ctx.Done():
return ctrl.Result{}, ctx.Err()
case <-start:
log.Log.Info("task start: ", time.Now())
log.Log.Info("do", "context done", t.name)
return
case <-t.start:
log.Log.Info("do", "start", time.Now(), "task", t.name)
tick := time.NewTicker(10 * time.Second)

var deploy v1.Deployment
err := c.Get(ctx, client.ObjectKey{Namespace: taskCrd.Namespace, Name: taskCrd.Spec.Deployment}, &deploy)
if err != nil {
return
}

for {
select {
case <-t.done:
log.Log.Info("do", "task done", t.name)
return
case <-ctx.Done():
return ctrl.Result{}, ctx.Err()
case <-end:
log.Log.Info("task end: ", time.Now())
log.Log.Info("do", "context done", t.name)
return
case <-t.end:
log.Log.Info("task", "end", time.Now())
err := c.Get(ctx, client.ObjectKey{Namespace: taskCrd.Namespace, Name: taskCrd.Spec.Deployment}, &deploy)
if err != nil {
return
}
atomic.StoreInt32(deploy.Spec.Replicas, taskCrd.Spec.EndReplicas)
err = r.Client.Update(ctx, &deploy)
err = c.Update(ctx, &deploy)
if err != nil {
return ctrl.Result{}, err
log.Log.Info("task", "end", err)
return
}
break OUT
case <-tick.C:
err = r.Client.Get(ctx, client.ObjectKey{Namespace: req.Namespace, Name: taskCrd.Spec.HPA}, &hpa)
var hpa v2.HorizontalPodAutoscaler
err = c.Get(ctx, client.ObjectKey{Namespace: taskCrd.Namespace, Name: taskCrd.Spec.HPA}, &hpa)
if err != nil {
return ctrl.Result{}, err
return
}
for _, metrics := range hpa.Status.CurrentMetrics {
// CPU 的平均使用率小于设定值,则修改对应 deployment 副本数目
if metrics.Resource.Name == corev1.ResourceCPU && *metrics.Resource.Current.AverageUtilization < taskCrd.Spec.AverageUtilization {
err = r.Client.Get(ctx, client.ObjectKey{Namespace: taskCrd.Namespace, Name: taskCrd.Spec.Deployment}, &deploy)
err = c.Get(ctx, client.ObjectKey{Namespace: taskCrd.Namespace, Name: taskCrd.Spec.Deployment}, &deploy)
if err != nil {
return ctrl.Result{}, err
return
}
log.Log.Info("get deploy", deploy.Name, "replicas", deploy.Status.Replicas)
log.Log.Info("task", "deployment", deploy.Name, "replicas", deploy.Status.Replicas)
if deploy.Status.Replicas < taskCrd.Spec.MaxReplicas {
atomic.AddInt32(deploy.Spec.Replicas, 1)
err = r.Client.Update(ctx, &deploy)
err = c.Update(ctx, &deploy)
if err != nil {
return ctrl.Result{}, err
return
}
} else {
// 不小于,则结束
Expand All @@ -146,15 +235,4 @@ func (r *TaskCrdReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}
}
}

c.Stop()

return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *TaskCrdReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&webappv1.TaskCrd{}).
Complete(r)
}
1 change: 1 addition & 0 deletions tmp/air.log
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
exit status 1

0 comments on commit 4c97a76

Please sign in to comment.