Skip to content

Commit 3a190d6

Browse files
committed
Added Wake to connection
1 parent 3db0020 commit 3a190d6

File tree

4 files changed

+56
-6
lines changed

4 files changed

+56
-6
lines changed

evio.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ type Conn interface {
6060
LocalAddr() net.Addr
6161
// RemoteAddr is the connection's remote peer address.
6262
RemoteAddr() net.Addr
63+
// Wake triggers a Data event for this connection.
64+
Wake()
6365
}
6466

6567
// LoadBalance sets the load balancing method.

evio_std.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ func (c *stdudpconn) SetContext(ctx interface{}) {}
4040
func (c *stdudpconn) AddrIndex() int { return c.addrIndex }
4141
func (c *stdudpconn) LocalAddr() net.Addr { return c.localAddr }
4242
func (c *stdudpconn) RemoteAddr() net.Addr { return c.remoteAddr }
43+
func (c *stdudpconn) Wake() {}
4344

4445
type stdloop struct {
4546
idx int // loop index
@@ -59,16 +60,22 @@ type stdconn struct {
5960
done int32 // 0: attached, 1: closed, 2: detached
6061
}
6162

63+
type wakeReq struct {
64+
c *stdconn
65+
}
66+
6267
func (c *stdconn) Context() interface{} { return c.ctx }
6368
func (c *stdconn) SetContext(ctx interface{}) { c.ctx = ctx }
6469
func (c *stdconn) AddrIndex() int { return c.addrIndex }
6570
func (c *stdconn) LocalAddr() net.Addr { return c.localAddr }
6671
func (c *stdconn) RemoteAddr() net.Addr { return c.remoteAddr }
72+
func (c *stdconn) Wake() { c.loop.ch <- wakeReq{c} }
6773

6874
type stdin struct {
6975
c *stdconn
7076
in []byte
7177
}
78+
7279
type stderr struct {
7380
c *stdconn
7481
err error
@@ -267,6 +274,8 @@ func stdloopRun(s *stdserver, l *stdloop) {
267274
err = stdloopReadUDP(s, l, v)
268275
case *stderr:
269276
err = stdloopError(s, l, v.c, v.err)
277+
case wakeReq:
278+
err = stdloopRead(s, l, v.c, nil)
270279
}
271280
}
272281
if err != nil {
@@ -360,6 +369,8 @@ func (c *stddetachedConn) Close() error {
360369
return c.conn.Close()
361370
}
362371

372+
func (c *stddetachedConn) Wake() {}
373+
363374
func stdloopRead(s *stdserver, l *stdloop, c *stdconn, in []byte) error {
364375
if atomic.LoadInt32(&c.done) == 2 {
365376
// should not ignore reads for detached connections

evio_unix.go

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,28 @@ import (
2222
type conn struct {
2323
fd int // file descriptor
2424
lnidx int // listener index in the server lns list
25-
loopidx int // owner loop
2625
out []byte // write buffer
2726
sa syscall.Sockaddr // remote socket address
2827
reuse bool // should reuse input buffer
2928
opened bool // connection opened event fired
3029
action Action // next user action
3130
ctx interface{} // user-defined context
32-
addrIndex int
33-
localAddr net.Addr
34-
remoteAddr net.Addr
31+
addrIndex int // index of listening address
32+
localAddr net.Addr // local addre
33+
remoteAddr net.Addr // remote addr
34+
loop *loop // connected loop
3535
}
3636

3737
func (c *conn) Context() interface{} { return c.ctx }
3838
func (c *conn) SetContext(ctx interface{}) { c.ctx = ctx }
3939
func (c *conn) AddrIndex() int { return c.addrIndex }
4040
func (c *conn) LocalAddr() net.Addr { return c.localAddr }
4141
func (c *conn) RemoteAddr() net.Addr { return c.remoteAddr }
42+
func (c *conn) Wake() {
43+
if c.loop != nil {
44+
c.loop.poll.Trigger(c)
45+
}
46+
}
4247

4348
type server struct {
4449
events Events // user events
@@ -198,6 +203,12 @@ func loopNote(s *server, l *loop, note interface{}) error {
198203
s.tch <- delay
199204
case error: // shutdown
200205
err = v
206+
case *conn:
207+
// Wake called for connection
208+
if l.fdconns[v.fd] != v {
209+
return nil // ignore stale wakes
210+
}
211+
return loopWake(s, l, v)
201212
}
202213
return err
203214
}
@@ -258,7 +269,8 @@ func loopAccept(s *server, l *loop, fd int) error {
258269
}
259270
}
260271
case RoundRobin:
261-
if int(atomic.LoadUintptr(&s.accepted))%len(s.loops) != l.idx {
272+
idx := int(atomic.LoadUintptr(&s.accepted)) % len(s.loops)
273+
if idx != l.idx {
262274
return nil // do not accept
263275
}
264276
atomic.AddUintptr(&s.accepted, 1)
@@ -277,7 +289,7 @@ func loopAccept(s *server, l *loop, fd int) error {
277289
if err := syscall.SetNonblock(nfd, true); err != nil {
278290
return err
279291
}
280-
c := &conn{fd: nfd, sa: sa, lnidx: i}
292+
c := &conn{fd: nfd, sa: sa, lnidx: i, loop: l}
281293
l.fdconns[c.fd] = c
282294
l.poll.AddReadWrite(c.fd)
283295
atomic.AddInt32(&l.count, 1)
@@ -391,6 +403,21 @@ func loopAction(s *server, l *loop, c *conn) error {
391403
return nil
392404
}
393405

406+
func loopWake(s *server, l *loop, c *conn) error {
407+
if s.events.Data == nil {
408+
return nil
409+
}
410+
out, action := s.events.Data(c, nil)
411+
c.action = action
412+
if len(out) > 0 {
413+
c.out = append([]byte{}, out...)
414+
}
415+
if len(c.out) != 0 || c.action != None {
416+
l.poll.ModReadWrite(c.fd)
417+
}
418+
return nil
419+
}
420+
394421
func loopRead(s *server, l *loop, c *conn) error {
395422
var in []byte
396423
n, err := syscall.Read(c.fd, l.packet)

examples/redis-server/main.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,20 @@ func main() {
5858
return
5959
}
6060
events.Opened = func(ec evio.Conn) (out []byte, opts evio.Options, action evio.Action) {
61+
//fmt.Printf("opened: %v\n", ec.RemoteAddr())
6162
ec.SetContext(&conn{})
6263
return
6364
}
6465
events.Closed = func(ec evio.Conn, err error) (action evio.Action) {
66+
// fmt.Printf("closed: %v\n", ec.RemoteAddr())
6567
return
6668
}
69+
6770
events.Data = func(ec evio.Conn, in []byte) (out []byte, action evio.Action) {
71+
if in == nil {
72+
log.Printf("wake from %s\n", ec.RemoteAddr())
73+
return nil, evio.Close
74+
}
6875
c := ec.Context().(*conn)
6976
data := c.is.Begin(in)
7077
var n int
@@ -94,6 +101,9 @@ func main() {
94101
} else {
95102
out = redcon.AppendString(out, "PONG")
96103
}
104+
case "WAKE":
105+
go ec.Wake()
106+
out = redcon.AppendString(out, "OK")
97107
case "ECHO":
98108
if len(args) != 2 {
99109
out = redcon.AppendError(out, "ERR wrong number of arguments for '"+string(args[0])+"' command")

0 commit comments

Comments
 (0)