Skip to content

Commit 584fada

Browse files
committed
initial import
0 parents  commit 584fada

14 files changed

+1309
-0
lines changed

Diff for: .gitattributes

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
* text=auto eol=lf

Diff for: .gitignore

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# Ignore all
2+
*
3+
4+
# Unignore all with extensions
5+
!*.*
6+
!**/Dockerfile
7+
8+
# Unignore all dirs
9+
!*/
10+
!api
11+
12+
.idea
13+
**/.DS_Store
14+
**/logs
15+
!Makefile
16+
17+
# gitlab ci
18+
.cache
19+
20+
# vim auto backup file
21+
*~
22+
!OWNERS

Diff for: dq/config.go

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package dq
2+
3+
import "github.com/tal-tech/go-zero/core/stores/redis"
4+
5+
type (
6+
Beanstalk struct {
7+
Endpoint string
8+
Tube string
9+
}
10+
11+
DqConf struct {
12+
Beanstalks []Beanstalk
13+
Redis redis.RedisConf
14+
}
15+
)

Diff for: dq/connection.go

+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package dq
2+
3+
import (
4+
"sync"
5+
6+
"github.com/beanstalkd/go-beanstalk"
7+
)
8+
9+
type connection struct {
10+
lock sync.RWMutex
11+
endpoint string
12+
tube string
13+
conn *beanstalk.Conn
14+
}
15+
16+
func newConnection(endpint, tube string) *connection {
17+
return &connection{
18+
endpoint: endpint,
19+
tube: tube,
20+
}
21+
}
22+
23+
func (c *connection) Close() error {
24+
c.lock.Lock()
25+
conn := c.conn
26+
c.conn = nil
27+
defer c.lock.Unlock()
28+
29+
if conn != nil {
30+
return conn.Close()
31+
}
32+
33+
return nil
34+
}
35+
36+
func (c *connection) get() (*beanstalk.Conn, error) {
37+
c.lock.RLock()
38+
conn := c.conn
39+
c.lock.RUnlock()
40+
if conn != nil {
41+
return conn, nil
42+
}
43+
44+
c.lock.Lock()
45+
defer c.lock.Unlock()
46+
47+
var err error
48+
c.conn, err = beanstalk.Dial("tcp", c.endpoint)
49+
if err != nil {
50+
return nil, err
51+
}
52+
53+
c.conn.Tube.Name = c.tube
54+
return c.conn, err
55+
}
56+
57+
func (c *connection) reset() {
58+
c.lock.Lock()
59+
defer c.lock.Unlock()
60+
61+
if c.conn != nil {
62+
c.conn.Close()
63+
c.conn = nil
64+
}
65+
}

Diff for: dq/consumer.go

+100
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package dq
2+
3+
import (
4+
"strconv"
5+
"time"
6+
7+
"github.com/tal-tech/go-zero/core/hash"
8+
"github.com/tal-tech/go-zero/core/logx"
9+
"github.com/tal-tech/go-zero/core/service"
10+
"github.com/tal-tech/go-zero/core/stores/redis"
11+
)
12+
13+
const (
14+
expiration = 3600 // seconds
15+
guardValue = "1"
16+
tolerance = time.Minute * 30
17+
)
18+
19+
var maxCheckBytes = getMaxTimeLen()
20+
21+
type (
22+
Consume func(body []byte)
23+
24+
Consumer interface {
25+
Consume(consume Consume)
26+
}
27+
28+
consumerCluster struct {
29+
nodes []*consumerNode
30+
red *redis.Redis
31+
}
32+
)
33+
34+
func NewConsumer(c DqConf) Consumer {
35+
var nodes []*consumerNode
36+
for _, node := range c.Beanstalks {
37+
nodes = append(nodes, newConsumerNode(node.Endpoint, node.Tube))
38+
}
39+
return &consumerCluster{
40+
nodes: nodes,
41+
red: c.Redis.NewRedis(),
42+
}
43+
}
44+
45+
func (c *consumerCluster) Consume(consume Consume) {
46+
guardedConsume := func(body []byte) {
47+
key := hash.Md5Hex(body)
48+
body, ok := c.unwrap(body)
49+
if !ok {
50+
logx.Errorf("discarded: %q", string(body))
51+
return
52+
}
53+
54+
ok, err := c.red.SetnxEx(key, guardValue, expiration)
55+
if err != nil {
56+
logx.Error(err)
57+
} else if ok {
58+
consume(body)
59+
}
60+
}
61+
62+
group := service.NewServiceGroup()
63+
for _, node := range c.nodes {
64+
group.Add(consumeService{
65+
c: node,
66+
consume: guardedConsume,
67+
})
68+
}
69+
group.Start()
70+
}
71+
72+
func (c *consumerCluster) unwrap(body []byte) ([]byte, bool) {
73+
var pos = -1
74+
for i := 0; i < maxCheckBytes; i++ {
75+
if body[i] == timeSep {
76+
pos = i
77+
break
78+
}
79+
}
80+
if pos < 0 {
81+
return nil, false
82+
}
83+
84+
val, err := strconv.ParseInt(string(body[:pos]), 10, 64)
85+
if err != nil {
86+
logx.Error(err)
87+
return nil, false
88+
}
89+
90+
t := time.Unix(0, val)
91+
if t.Add(tolerance).Before(time.Now()) {
92+
return nil, false
93+
}
94+
95+
return body[pos+1:], true
96+
}
97+
98+
func getMaxTimeLen() int {
99+
return len(strconv.FormatInt(time.Now().UnixNano(), 10)) + 2
100+
}

Diff for: dq/consumernode.go

+94
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package dq
2+
3+
import (
4+
"time"
5+
6+
"github.com/beanstalkd/go-beanstalk"
7+
"github.com/tal-tech/go-zero/core/logx"
8+
"github.com/tal-tech/go-zero/core/syncx"
9+
)
10+
11+
type (
12+
consumerNode struct {
13+
conn *connection
14+
tube string
15+
on *syncx.AtomicBool
16+
}
17+
18+
consumeService struct {
19+
c *consumerNode
20+
consume Consume
21+
}
22+
)
23+
24+
func newConsumerNode(endpoint, tube string) *consumerNode {
25+
return &consumerNode{
26+
conn: newConnection(endpoint, tube),
27+
tube: tube,
28+
on: syncx.ForAtomicBool(true),
29+
}
30+
}
31+
32+
func (c *consumerNode) dispose() {
33+
c.on.Set(false)
34+
}
35+
36+
func (c *consumerNode) consumeEvents(consume Consume) {
37+
for c.on.True() {
38+
conn, err := c.conn.get()
39+
if err != nil {
40+
logx.Error(err)
41+
time.Sleep(time.Second)
42+
continue
43+
}
44+
45+
// because getting conn takes at most one second, reserve tasks at most 5 seconds,
46+
// if don't check on/off here, the conn might not be closed due to
47+
// graceful shutdon waits at most 5.5 seconds.
48+
if !c.on.True() {
49+
break
50+
}
51+
52+
conn.Tube.Name = c.tube
53+
conn.TubeSet.Name[c.tube] = true
54+
id, body, err := conn.Reserve(reserveTimeout)
55+
if err == nil {
56+
conn.Delete(id)
57+
consume(body)
58+
continue
59+
}
60+
61+
// the error can only be beanstalk.NameError or beanstalk.ConnError
62+
switch cerr := err.(type) {
63+
case beanstalk.ConnError:
64+
switch cerr.Err {
65+
case beanstalk.ErrTimeout:
66+
// timeout error on timeout, just continue the loop
67+
case beanstalk.ErrBadChar, beanstalk.ErrBadFormat, beanstalk.ErrBuried, beanstalk.ErrDeadline,
68+
beanstalk.ErrDraining, beanstalk.ErrEmpty, beanstalk.ErrInternal, beanstalk.ErrJobTooBig,
69+
beanstalk.ErrNoCRLF, beanstalk.ErrNotFound, beanstalk.ErrNotIgnored, beanstalk.ErrTooLong:
70+
// won't reset
71+
logx.Error(err)
72+
default:
73+
// beanstalk.ErrOOM, beanstalk.ErrUnknown and other errors
74+
logx.Error(err)
75+
c.conn.reset()
76+
time.Sleep(time.Second)
77+
}
78+
default:
79+
logx.Error(err)
80+
}
81+
}
82+
83+
if err := c.conn.Close(); err != nil {
84+
logx.Error(err)
85+
}
86+
}
87+
88+
func (cs consumeService) Start() {
89+
cs.c.consumeEvents(cs.consume)
90+
}
91+
92+
func (cs consumeService) Stop() {
93+
cs.c.dispose()
94+
}

0 commit comments

Comments
 (0)