-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconncontext.go
243 lines (223 loc) · 5.72 KB
/
conncontext.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
package netx
import (
"fmt"
"io"
"net"
"sync/atomic"
"time"
"context"
"sync"
"github.com/coffeehc/logger"
)
const (
contextKeyID = "__ID__"
contextKeySyncHandler = "__syncHandler__"
//contextKey
)
//ConnContext 建立建立后的上下文
type ConnContext interface {
GetID() int64
AddListen(name string, listen ContextListen) error
RemoveListen(name string)
GetListens() map[string]ContextListen
RemoteAddr() net.Addr
LocalAddr() net.Addr
IsOpen() bool
SetHandler(handler Handler)
SetProtocols(protocols ...Protocol)
Start(cxt context.Context)
Close(cxt context.Context) error
Write(cxt context.Context, data interface{})
FireException(cxt context.Context, err error)
}
//初始化一个新的上下文
//id:指定的上下文编号
//conn:网络连接
//workPool:用于控制并行处理工作池,主要用于标记
//orderHandler:标记该上下文处理数据的时候是否顺序处理,默认为并行,否则为当前数据处理完之后再处理接下来的数据
func newConnContext(cxt context.Context, id int64, conn net.Conn, workPool chan int64, syncHandle bool) ConnContext {
encoder, decoder := newProtocolChain(new(emptyProtocol))
connContext := &_ConnContext{
id: id,
conn: conn,
listens: make(map[string]ContextListen, 0),
workPool: workPool,
writing: 0,
once: new(sync.Once),
encoder: encoder,
decoder: decoder,
syncHandle: syncHandle,
}
return connContext
}
type _ConnContext struct {
conn net.Conn // socket connection
handler Handler //biz Handler
encoder *_ProtocolHandler
decoder *_ProtocolHandler
isOpen bool //通道是否打开
listens map[string]ContextListen
workPool chan int64
writing int32 //关闭的时候用于标记剩余多少数据没有写完
once *sync.Once
id int64 //channel id
syncHandle bool
}
//GetID 获取上下文对应的ID
func (c *_ConnContext) GetID() int64 {
return c.id
}
func (c *_ConnContext) GetListens() map[string]ContextListen {
return c.listens
}
//AddListen 添加监听器
func (c *_ConnContext) AddListen(name string, listen ContextListen) error {
if _, ok := c.listens[name]; ok {
return fmt.Errorf("监听器[%s]已经存在,不能添加", name)
}
c.listens[name] = listen
return nil
}
//RemoveListen 删除监听器
func (c *_ConnContext) RemoveListen(name string) {
delete(c.listens, name)
}
//RemoteAddr 获取远程地址
func (c *_ConnContext) RemoteAddr() net.Addr {
return c.conn.RemoteAddr()
}
//LocalAddr 获取本地地址
func (c *_ConnContext) LocalAddr() net.Addr {
return c.conn.LocalAddr()
}
//IsOpen 上下文是否打开
func (c *_ConnContext) IsOpen() bool {
return c.isOpen
}
//SetHandler 设置Handler
func (c *_ConnContext) SetHandler(handler Handler) {
c.handler = handler
}
//SetProtocols 设置Protocol
func (c *_ConnContext) SetProtocols(protocols ...Protocol) {
for _, protocol := range protocols {
encoder, decoder := newProtocolChain(protocol)
encoder.next = c.encoder
c.encoder = encoder
c.decoder.addNextChain(decoder)
}
}
//开始处理上下文
func (c *_ConnContext) Start(cxt context.Context) {
c.once.Do(func() {
c.isOpen = true
defer func() {
if err := recover(); err != nil {
logger.Error("处理数据出现了异常:%s", err)
}
}()
c.handler.Active(cxt, c)
go func(connContext *_ConnContext) {
defer func() {
if err := recover(); err != nil {
logger.Error("系统异常:%s", err)
}
}()
for _, l := range connContext.listens {
l.OnActive(cxt, connContext)
}
}(c)
go func() {
//TODO 此处要优化,最好使用buf池
bytes := make([]byte, 1500)
//TODO 加入读取或者写入超时的限制
for c.isOpen {
i, err := c.conn.Read(bytes)
if err != nil {
if err == io.EOF {
c.Close(cxt)
continue
}
if opErr, ok := err.(*net.OpError); ok {
if !opErr.Timeout() && !opErr.Temporary() {
logger.Error("接收到不可恢复的异常,关闭连接,%s", err)
c.Close(cxt)
}
} else {
c.FireException(cxt, fmt.Errorf("接收内容异常,%#v", err))
}
continue
}
if i > 0 {
c.decoder.Fire(cxt, c, bytes[:i])
}
}
}()
})
}
/*
处理异常
*/
func (c *_ConnContext) FireException(cxt context.Context, err error) {
logger.Debug("获取了一个异常事件:%s", err)
c.handler.Exception(cxt, c, err)
go func(cxt context.Context, connContext ConnContext, err error) {
defer func() {
if err := recover(); err != nil {
logger.Error("系统异常:%s", err)
}
}()
for _, l := range connContext.GetListens() {
l.OnException(cxt, connContext, err)
}
}(cxt, c, err)
}
//Write 写入数据
func (c *_ConnContext) Write(cxt context.Context, data interface{}) {
if !c.isOpen {
logger.Warn("通道已经关闭,不能发送数据")
return
}
atomic.AddInt32(&c.writing, 1)
defer func() {
if err := recover(); err != nil {
logger.Error("发送数据异常,%s", err)
}
atomic.AddInt32(&c.writing, -1)
}()
c.encoder.Fire(cxt, c, data)
}
//Close 关闭上下文,包括关闭连接等
func (c *_ConnContext) Close(cxt context.Context) error {
if c.isOpen {
logger.Info("开始关闭连接")
c.isOpen = false
if c.writing != 0 {
for i := 0; i <= 1000; i++ {
time.Sleep(time.Millisecond * 10)
if c.writing == 0 {
break
}
}
}
err := c.conn.Close()
if err != nil {
c.FireException(cxt, err)
return err
}
logger.Info("关闭了连接,%s", c.conn.RemoteAddr().String())
c.handler.Close(cxt, c)
c.encoder.Destroy()
go func(this *_ConnContext) {
defer func() {
if err := recover(); err != nil {
logger.Error("系统异常:%s", err)
}
}()
for _, l := range this.listens {
l.OnClose(cxt, this)
}
}(c)
}
return nil
}