Skip to content

Commit 1d6a669

Browse files
committed
feat: add task and proc delete API
1 parent 2029ccf commit 1d6a669

File tree

8 files changed

+192
-2
lines changed

8 files changed

+192
-2
lines changed

api/api.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,12 @@ func Start() {
1212

1313
func initRouter(g *gin.Engine) {
1414
g.GET("/completed", TaskAPI.TaskCompleteHandler)
15+
1516
g.GET("/base/info", BaseAPI.Info)
17+
1618
g.GET("/processor/info", ProcessorAPI.Info)
17-
g.GET("task/info", TaskAPI.Info)
19+
g.POST("/processor/delete", ProcessorAPI.Delete)
20+
21+
g.GET("/task/info", TaskAPI.Info)
22+
g.POST("/task/delete", TaskAPI.Delete)
1823
}

api/common.go

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package api
2+
3+
import (
4+
"github.com/gin-gonic/gin"
5+
"net/http"
6+
)
7+
8+
func Response(c *gin.Context, obj any) {
9+
c.JSON(http.StatusOK, obj)
10+
}

api/model.go

+21
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,24 @@ type taskInfo struct {
6262
StartAt int64 `json:"start_at"`
6363
RunTime int64 `json:"run_time"`
6464
}
65+
66+
type ProcessorDeleteReq struct {
67+
ID string `form:"id" binding:"required"`
68+
Force bool `form:"force"`
69+
}
70+
71+
type ProcessorDeleteResp struct {
72+
BaseResp
73+
Success bool `json:"success"`
74+
Info string `json:"info"`
75+
}
76+
77+
type TaskDeleteReq struct {
78+
ID string `form:"id" binding:"required"`
79+
}
80+
81+
type TaskDeleteResp struct {
82+
BaseResp
83+
Success bool `json:"success"`
84+
Info string `json:"info"`
85+
}

api/processor.go

+33
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package api
22

33
import (
4+
"fmt"
45
"github.com/gin-gonic/gin"
56
"net/http"
67
"rscheduler/pkg/utils"
@@ -14,6 +15,7 @@ type processorAPI struct {
1415
var ProcessorAPI = &processorAPI{}
1516

1617
func (p processorAPI) Info(c *gin.Context) {
18+
// TODO 后续重构,放在这边不太好
1719
scheduler.RScheduler.Lock.RLock()
1820
defer scheduler.RScheduler.Lock.RUnlock()
1921

@@ -39,3 +41,34 @@ func (p processorAPI) Info(c *gin.Context) {
3941
ProcessorInfo: processorInfoList,
4042
})
4143
}
44+
45+
func (p processorAPI) Delete(c *gin.Context) {
46+
var req ProcessorDeleteReq
47+
if err := c.ShouldBindQuery(&req); err != nil {
48+
failResp := ProcessorDeleteResp{
49+
BaseResp: NewBaseFailResp(),
50+
Success: false,
51+
Info: err.Error(),
52+
}
53+
Response(c, failResp)
54+
return
55+
}
56+
57+
err := scheduler.KillProcByProcID(req.ID, req.Force)
58+
if err != nil {
59+
failResp := ProcessorDeleteResp{
60+
BaseResp: NewBaseFailResp(),
61+
Success: false,
62+
Info: err.Error(),
63+
}
64+
Response(c, failResp)
65+
return
66+
}
67+
68+
successResp := ProcessorDeleteResp{
69+
BaseResp: NewBaseSuccessResp(),
70+
Success: true,
71+
Info: fmt.Sprintf("Processor %s delete success", req.ID),
72+
}
73+
Response(c, successResp)
74+
}

api/task.go

+33
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package api
22

33
import (
4+
"fmt"
45
"github.com/gin-gonic/gin"
56
"github.com/liang09255/lutils/conv"
67
"net/http"
@@ -23,6 +24,7 @@ func (t taskAPI) TaskCompleteHandler(c *gin.Context) {
2324
}
2425

2526
func (t taskAPI) Info(c *gin.Context) {
27+
// TODO 后续重构,放在这边不太好
2628
scheduler.RScheduler.Lock.RLock()
2729
defer scheduler.RScheduler.Lock.RUnlock()
2830

@@ -58,3 +60,34 @@ func (t taskAPI) Info(c *gin.Context) {
5860
TaskInfo: taskInfoList,
5961
})
6062
}
63+
64+
func (t taskAPI) Delete(c *gin.Context) {
65+
var req TaskDeleteReq
66+
if err := c.ShouldBindQuery(&req); err != nil {
67+
failResp := TaskDeleteResp{
68+
BaseResp: NewBaseFailResp(),
69+
Success: false,
70+
Info: err.Error(),
71+
}
72+
Response(c, failResp)
73+
return
74+
}
75+
76+
err := scheduler.KillProcByTaskID(req.ID)
77+
if err != nil {
78+
failResp := TaskDeleteResp{
79+
BaseResp: NewBaseFailResp(),
80+
Success: false,
81+
Info: err.Error(),
82+
}
83+
Response(c, failResp)
84+
return
85+
}
86+
87+
successResp := TaskDeleteResp{
88+
BaseResp: NewBaseSuccessResp(),
89+
Success: true,
90+
Info: fmt.Sprintf("Task %s delete success", req.ID),
91+
}
92+
Response(c, successResp)
93+
}

processor/processor.go

+6
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type Proc struct {
2626
NowMem uint64 // 当前内存占用
2727
NowCPU float64 // 当前CPU占用
2828
Running bool // 是否正在运行
29+
PreDelete bool // 预删除标识
2930
CMD *exec.Cmd
3031
InPipe *io.WriteCloser
3132
Task *task.Task
@@ -85,6 +86,11 @@ func NewProc(name string) *Proc {
8586
return proc
8687
}
8788

89+
// SetPreDelete 设置预删除标识,二次请求取消标识
90+
func (p *Proc) SetPreDelete() {
91+
p.PreDelete = !p.PreDelete
92+
}
93+
8894
// ForceClose This method will directly kill the process
8995
func (p *Proc) ForceClose() error {
9096
return p.CMD.Process.Kill()

scheduler/crud.go

+82
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package scheduler
2+
3+
import (
4+
"fmt"
5+
"rscheduler/processor"
6+
)
7+
8+
func FindProcByProcID(id string) *processor.Proc {
9+
RScheduler.Lock.RLock()
10+
defer RScheduler.Lock.RUnlock()
11+
12+
for _, procList := range RScheduler.M {
13+
for i := procList.Front(); i != nil; i = i.Next() {
14+
proc := i.Value.(*processor.Proc)
15+
if proc.ID == id {
16+
return proc
17+
}
18+
}
19+
}
20+
21+
return nil
22+
}
23+
24+
func FindProcByTaskID(id string) *processor.Proc {
25+
RScheduler.Lock.RLock()
26+
defer RScheduler.Lock.RUnlock()
27+
28+
for _, procList := range RScheduler.M {
29+
for i := procList.Front(); i != nil; i = i.Next() {
30+
proc := i.Value.(*processor.Proc)
31+
if proc.Task != nil && proc.Task.ID == id {
32+
return proc
33+
}
34+
}
35+
}
36+
37+
return nil
38+
}
39+
40+
func KillProcByProcID(id string, force bool) error {
41+
RScheduler.Lock.Lock()
42+
defer RScheduler.Lock.Unlock()
43+
44+
for _, procList := range RScheduler.M {
45+
for i := procList.Front(); i != nil; i = i.Next() {
46+
proc := i.Value.(*processor.Proc)
47+
if proc.ID == id {
48+
if force || proc.IsIdle() {
49+
if err := proc.ForceClose(); err != nil {
50+
return err
51+
}
52+
procList.Remove(i)
53+
return nil
54+
}
55+
proc.SetPreDelete()
56+
return nil
57+
}
58+
}
59+
}
60+
61+
return fmt.Errorf("can not find proc by proc id: %s", id)
62+
}
63+
64+
func KillProcByTaskID(id string) error {
65+
RScheduler.Lock.Lock()
66+
defer RScheduler.Lock.Unlock()
67+
68+
for _, procList := range RScheduler.M {
69+
for i := procList.Front(); i != nil; i = i.Next() {
70+
proc := i.Value.(*processor.Proc)
71+
if proc.Task != nil && proc.Task.ID == id {
72+
if err := proc.ForceClose(); err != nil {
73+
return err
74+
}
75+
procList.Remove(i)
76+
return nil
77+
}
78+
}
79+
}
80+
81+
return fmt.Errorf("can not find proc by task id: %s", id)
82+
}

scheduler/scheduler.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func (rs *rScheduler) TaskComplete(taskName, taskID string, kill bool) {
8282
if proc.Task != nil && proc.Task.ID == taskID {
8383
proc.Complete()
8484
// judge whether to kill the processor
85-
if !rs.enableIdleProc() || !proc.MemCheck() {
85+
if !rs.enableIdleProc() || !proc.MemCheck() || proc.PreDelete {
8686
kill = true
8787
}
8888
if kill {

0 commit comments

Comments
 (0)