-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy pathevtwebsocket.go
147 lines (132 loc) · 2.92 KB
/
evtwebsocket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package evtwebsocket
import (
"errors"
"time"
"golang.org/x/net/websocket"
)
// Conn is the connection structure.
type Conn struct {
OnMessage func([]byte, *Conn)
OnError func(error)
OnConnected func(*Conn)
MatchMsg func([]byte, []byte) bool
Reconnect bool
PingMsg []byte
PingIntervalSecs int
ws *websocket.Conn
url string
subprotocol string
closed bool
msgQueue []Msg
pingTimer time.Time
}
// Msg is the message structure.
type Msg struct {
Body []byte
Callback func([]byte, *Conn)
}
// Dial sets up the connection with the remote
// host provided in the url parameter.
// Note that all the parameters of the structure
// must have been set before calling it.
func (c *Conn) Dial(url, subprotocol string) error {
c.closed = true
c.url = url
c.subprotocol = subprotocol
c.msgQueue = []Msg{}
var err error
c.ws, err = websocket.Dial(url, subprotocol, "http://localhost/")
if err != nil {
return err
}
c.closed = false
if c.OnConnected != nil {
go c.OnConnected(c)
}
go func() {
defer c.close()
for {
var msg = make([]byte, 512)
var n int
if n, err = c.ws.Read(msg); err != nil {
if c.OnError != nil {
c.OnError(err)
}
return
}
c.onMsg(msg[:n])
}
}()
c.setupPing()
return nil
}
// Send sends a message through the connection.
func (c *Conn) Send(msg Msg) error {
if c.closed {
return errors.New("closed connection")
}
if _, err := c.ws.Write(msg.Body); err != nil {
c.close()
if c.OnError != nil {
c.OnError(err)
}
return err
}
if c.PingIntervalSecs > 0 && c.PingMsg != nil {
c.pingTimer = time.Now().Add(time.Second * time.Duration(c.PingIntervalSecs))
}
if msg.Callback != nil {
c.msgQueue = append(c.msgQueue, msg)
}
return nil
}
// IsConnected tells wether the connection is
// opened or closed.
func (c *Conn) IsConnected() bool {
return !c.closed
}
func (c *Conn) onMsg(msg []byte) {
if c.MatchMsg != nil {
for i, m := range c.msgQueue {
if m.Callback != nil && c.MatchMsg(msg, m.Body) {
go m.Callback(msg, c)
// Delete this element from the queue
c.msgQueue = append(c.msgQueue[:i], c.msgQueue[i+1:]...)
break
}
}
}
// Fire OnMessage every time.
if c.OnMessage != nil {
go c.OnMessage(msg, c)
}
}
func (c *Conn) close() {
c.ws.Close()
c.closed = true
if c.Reconnect {
for {
if err := c.Dial(c.url, c.subprotocol); err == nil {
break
}
time.Sleep(time.Second * 1)
}
}
}
func (c *Conn) setupPing() {
if c.PingIntervalSecs > 0 && len(c.PingMsg) > 0 {
c.pingTimer = time.Now().Add(time.Second * time.Duration(c.PingIntervalSecs))
go func() {
for {
if !time.Now().After(c.pingTimer) {
time.Sleep(time.Millisecond * 100)
continue
}
if c.Send(Msg{c.PingMsg, nil}) != nil {
return
}
c.pingTimer = time.Now().Add(time.Second * time.Duration(c.PingIntervalSecs))
}
}()
}
}