Skip to content

Commit ffdc9fe

Browse files
committed
adjust queue
1 parent 05457c3 commit ffdc9fe

File tree

3 files changed

+22
-8
lines changed

3 files changed

+22
-8
lines changed

config/config.json

+1-2
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,8 @@
44
"Schema": "test",
55
"S3Region" : "us-east-1",
66
"ParallelNumber": 4,
7-
"MaxExecutorNumber":30,
87
"MaxConcurrentTaskNumber": 4,
9-
"QueueSize": 100
8+
"MaxQueueSize": 100
109
},
1110

1211
"FileConnectorConfigs":{

scheduler/queue.go

+17-2
Original file line numberDiff line numberDiff line change
@@ -75,17 +75,32 @@ func (self *Queue) Delete(task *Task) error {
7575
return nil
7676
}
7777

78-
func (self *Queue) Add(task *Task) {
78+
func (self *Queue) AddForce(task *Task) error {
7979
self.Lock()
8080
defer self.Unlock()
8181

8282
ln := len(self.Tasks)
83-
if ln >= int(self.MaxQueueSize) && self.Tasks.Len() > 0 {
83+
if ln >= int(self.MaxQueueSize) {
8484
self.Tasks[0] = task
8585
} else {
8686
self.Tasks = append(self.Tasks, task)
8787
}
8888
sort.Sort(self.Tasks)
89+
return nil
90+
}
91+
92+
func (self *Queue) Add(task *Task) error {
93+
self.Lock()
94+
defer self.Unlock()
95+
96+
ln := len(self.Tasks)
97+
if ln >= int(self.MaxQueueSize) {
98+
return fmt.Errorf("queue is full")
99+
} else {
100+
self.Tasks = append(self.Tasks, task)
101+
}
102+
sort.Sort(self.Tasks)
103+
return nil
89104
}
90105

91106
func (self *Queue) HasTask(taskId string) bool {

scheduler/scheduler.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -74,13 +74,13 @@ func (self *Scheduler) AddTask(task *Task) (err error) {
7474
defer self.Unlock()
7575

7676
if task.Status == pb.TaskStatus_TODO {
77-
self.TodoQueue.Add(task)
77+
return self.TodoQueue.Add(task)
7878
} else if task.Status == pb.TaskStatus_ERROR {
79-
self.ErrorQueue.Add(task)
79+
return self.ErrorQueue.AddForce(task)
8080
} else if task.Status == pb.TaskStatus_SUCCEED {
81-
self.SucceedQueue.Add(task)
81+
return self.SucceedQueue.AddForce(task)
8282
} else if task.Status == pb.TaskStatus_RUNNING {
83-
self.RunningQueue.Add(task)
83+
return self.RunningQueue.Add(task)
8484
} else {
8585
return fmt.Errorf("unknown task status")
8686
}

0 commit comments

Comments
 (0)