Skip to content

Commit 7b46c29

Browse files
committed
feat: epoll managered by runtime netpoller
1 parent bb9c3f7 commit 7b46c29

File tree

4 files changed

+130
-9
lines changed

4 files changed

+130
-9
lines changed

poll_default_bsd.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,17 @@ import (
2525
"unsafe"
2626
)
2727

28+
func openPollFile() (int, error) {
29+
return syscall.Kqueue()
30+
}
31+
2832
func openPoll() (Poll, error) {
2933
return openDefaultPoll()
3034
}
3135

3236
func openDefaultPoll() (*defaultPoll, error) {
3337
l := new(defaultPoll)
34-
p, err := syscall.Kqueue()
38+
p, err := openPollFile()
3539
if err != nil {
3640
return nil, err
3741
}

poll_default_linux.go

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,17 @@ package netpoll
1616

1717
import (
1818
"errors"
19-
"runtime"
19+
"fmt"
2020
"sync"
2121
"sync/atomic"
2222
"syscall"
2323
"unsafe"
2424
)
2525

26+
func openPollFile() (int, error) {
27+
return EpollCreate(0)
28+
}
29+
2630
func openPoll() (Poll, error) {
2731
return openDefaultPoll()
2832
}
@@ -31,11 +35,17 @@ func openDefaultPoll() (*defaultPoll, error) {
3135
var poll = new(defaultPoll)
3236

3337
poll.buf = make([]byte, 8)
34-
var p, err = EpollCreate(0)
38+
var p, err = openPollFile()
3539
if err != nil {
3640
return nil, err
3741
}
3842
poll.fd = p
43+
// register epollfd into runtime's netpoller
44+
pd, errno := runtime_pollOpen(uintptr(poll.fd))
45+
if errno != 0 {
46+
return nil, Exception(ErrUnsupported, fmt.Sprintf("when poll open: errno=%d", errno))
47+
}
48+
poll.pd = pd
3949

4050
var r0, _, e0 = syscall.Syscall(syscall.SYS_EVENTFD2, 0, 0, 0)
4151
if e0 != 0 {
@@ -60,6 +70,7 @@ func openDefaultPoll() (*defaultPoll, error) {
6070
type defaultPoll struct {
6171
pollArgs
6272
fd int // epoll fd
73+
pd uintptr // the pollDesc of epoll fd in runtime's netpoller
6374
wop *FDOperator // eventfd, wake epoll_wait
6475
buf []byte // read wfd trigger msg
6576
trigger uint32 // trigger flag
@@ -90,23 +101,28 @@ func (a *pollArgs) reset(size, caps int) {
90101
// Wait implements Poll.
91102
func (p *defaultPoll) Wait() (err error) {
92103
// init
93-
var caps, msec, n = barriercap, -1, 0
104+
var caps, n = barriercap, 0
94105
p.Reset(128, caps)
95106
// wait
96107
for {
97108
if n == p.size && p.size < 128*1024 {
98109
p.Reset(p.size<<1, caps)
99110
}
100-
n, err = EpollWait(p.fd, p.events, msec)
111+
n, err = EpollWait(p.fd, p.events, 0)
101112
if err != nil && err != syscall.EINTR {
102113
return err
103114
}
104-
if n <= 0 {
105-
msec = -1
106-
runtime.Gosched()
115+
if n == 0 {
116+
errno := runtime_pollReset(p.pd, 'r')
117+
if errno != 0 {
118+
return Exception(ErrUnsupported, fmt.Sprintf("when poll reset: errno=%d", errno))
119+
}
120+
errno = runtime_pollWait(p.pd, 'r')
121+
if errno != 0 {
122+
return Exception(ErrUnsupported, fmt.Sprintf("when poll wait: errno=%d", errno))
123+
}
107124
continue
108125
}
109-
msec = 0
110126
if p.Handler(p.events[:n]) {
111127
return nil
112128
}

runtime.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Copyright 2024 CloudWeGo Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package netpoll
16+
17+
import (
18+
_ "unsafe"
19+
)
20+
21+
//go:linkname runtime_pollOpen internal/poll.runtime_pollOpen
22+
func runtime_pollOpen(fd uintptr) (pd uintptr, errno int)
23+
24+
//go:linkname runtime_pollWait internal/poll.runtime_pollWait
25+
func runtime_pollWait(pd uintptr, mode int) (errno int)
26+
27+
//go:linkname runtime_pollReset internal/poll.runtime_pollReset
28+
func runtime_pollReset(pd uintptr, mode int) (errno int)

runtime_linux_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// Copyright 2024 CloudWeGo Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//go:build linux || loong64
16+
// +build linux loong64
17+
18+
package netpoll
19+
20+
import (
21+
"syscall"
22+
"testing"
23+
"time"
24+
)
25+
26+
func TestRuntimeNetpoller(t *testing.T) {
27+
pfd, err := openPollFile()
28+
MustNil(t, err)
29+
30+
pd, errno := runtime_pollOpen(uintptr(pfd))
31+
Assert(t, errno == 0, errno)
32+
t.Logf("poll open success: pd=%d", pd)
33+
34+
var rfd, wfd = GetSysFdPairs()
35+
36+
eventin := &epollevent{
37+
events: syscall.EPOLLIN | syscall.EPOLLRDHUP | syscall.EPOLLERR,
38+
data: [8]byte{0, 0, 0, 0, 0, 0, 0, 1},
39+
}
40+
err = EpollCtl(pfd, syscall.EPOLL_CTL_ADD, rfd, eventin)
41+
MustNil(t, err)
42+
43+
go func() {
44+
time.Sleep(time.Millisecond * 100)
45+
46+
iovec := [1]syscall.Iovec{}
47+
buf := []byte("hello")
48+
n, err := writev(wfd, [][]byte{buf}, iovec[:])
49+
MustNil(t, err)
50+
Equal(t, n, 5)
51+
t.Logf("poll read success: %s", string(buf[:n]))
52+
}()
53+
54+
begin := time.Now()
55+
errno = runtime_pollWait(pd, 'r'+'w')
56+
Assert(t, errno == 0, errno)
57+
cost := time.Since(begin)
58+
Assert(t, cost.Milliseconds() >= 100)
59+
60+
events := make([]epollevent, 1)
61+
n, err := EpollWait(pfd, events, 0)
62+
MustNil(t, err)
63+
Equal(t, n, 1)
64+
t.Logf("poll wait success")
65+
66+
iovec := [1]syscall.Iovec{}
67+
buf := make([]byte, 1024)
68+
bs := [1][]byte{buf}
69+
n, err = readv(rfd, bs[:], iovec[:])
70+
MustNil(t, err)
71+
Equal(t, n, 5)
72+
t.Logf("poll read success: %s", string(buf[:n]))
73+
}

0 commit comments

Comments
 (0)