-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqueue.go
111 lines (95 loc) · 1.74 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package queue
import (
"errors"
"fmt"
"sync"
"time"
)
var emptyQueue = func() error {
return errors.New("empty queue")
}
var fullQueue = errors.New("full queue")
type Queue struct {
sync.RWMutex
fs []func() error
stop chan struct{}
max, head, tail int
tick time.Duration
}
func NewQueue(capacity int, tick time.Duration) *Queue {
return &Queue{
fs: make([]func() error, capacity),
stop: make(chan struct{}),
max: capacity + 1,
tick: tick,
}
}
func (dq *Queue) Run() {
ticker := time.NewTicker(dq.tick)
defer ticker.Stop()
for {
select {
case <-ticker.C:
f := dq.Pop()
go f.Do()
case <-dq.stop:
close(dq.stop)
dq.Lock()
dq.fs, dq.head, dq.tail = nil, 0, 0
dq.Unlock()
return
}
}
}
func (dq *Queue) Close() {
defer func() {
dq.stop <- struct{}{}
}()
}
func (dq *Queue) Put(f Executor) error {
dq.Lock()
defer dq.Unlock()
if dq.isFull() {
return fullQueue
}
dq.fs[dq.tail] = f
dq.tail = (dq.tail + 1) % dq.max
return nil
}
func (dq *Queue) Pop() Executor {
dq.Lock()
defer dq.Unlock()
if dq.isEmpty() {
return emptyQueue
}
f := dq.fs[dq.head]
dq.head = (dq.head + 1) % dq.max
return f
}
func (dq *Queue) Size() int {
dq.RLock()
defer dq.RUnlock()
return dq.size()
}
func (dq *Queue) size() int {
return (dq.tail + dq.max - dq.head) % dq.max
}
func (dq *Queue) Show() {
fmt.Printf("head=%d,tail=%d,size=%d\n", dq.head, dq.tail, dq.size())
}
func (dq *Queue) IsEmpty() bool {
dq.RLock()
defer dq.RUnlock()
return dq.isEmpty()
}
func (dq *Queue) isEmpty() bool {
return dq.tail == dq.head
}
func (dq *Queue) IsFull() bool {
dq.RLock()
defer dq.RUnlock()
return dq.isFull()
}
func (dq *Queue) isFull() bool {
return (dq.tail+1)%dq.max == dq.head
}