Skip to content

Commit 09dbbd1

Browse files
committed
feat: add WithReadThreshold API
1 parent 9707178 commit 09dbbd1

16 files changed

+446
-58
lines changed

connection_errors.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ const (
3535
ErrEOF = syscall.Errno(0x106)
3636
// Write I/O buffer timeout, calling by Connection.Writer
3737
ErrWriteTimeout = syscall.Errno(0x107)
38+
// The wait read size large than read threshold
39+
ErrReadOutOfThreshold = syscall.Errno(0x108)
3840
)
3941

4042
const ErrnoMask = 0xFF
@@ -97,4 +99,5 @@ var errnos = [...]string{
9799
ErrnoMask & ErrUnsupported: "netpoll dose not support",
98100
ErrnoMask & ErrEOF: "EOF",
99101
ErrnoMask & ErrWriteTimeout: "connection write timeout",
102+
ErrnoMask & ErrReadOutOfThreshold: "connection read size is out of threshold",
100103
}

connection_impl.go

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,9 @@ type connection struct {
4545
outputBuffer *LinkBuffer
4646
outputBarrier *barrier
4747
supportZeroCopy bool
48-
maxSize int // The maximum size of data between two Release().
49-
bookSize int // The size of data that can be read at once.
48+
maxSize int // The maximum size of data between two Release().
49+
bookSize int // The size of data that can be read at once.
50+
readThreshold int64 // The readThreshold of connection max read.
5051
}
5152

5253
var (
@@ -94,6 +95,12 @@ func (c *connection) SetWriteTimeout(timeout time.Duration) error {
9495
return nil
9596
}
9697

98+
// SetReadThreshold implements Connection.
99+
func (c *connection) SetReadThreshold(readThreshold int64) error {
100+
c.readThreshold = readThreshold
101+
return nil
102+
}
103+
97104
// ------------------------------------------ implement zero-copy reader ------------------------------------------
98105

99106
// Next implements Connection.
@@ -394,28 +401,44 @@ func (c *connection) triggerWrite(err error) {
394401
// waitRead will wait full n bytes.
395402
func (c *connection) waitRead(n int) (err error) {
396403
if n <= c.inputBuffer.Len() {
397-
return nil
404+
goto CLEANUP
398405
}
406+
// cannot wait read with an out of threshold size
407+
if c.readThreshold > 0 && int64(n) > c.readThreshold {
408+
// just return error and dont do cleanup
409+
return Exception(ErrReadOutOfThreshold, "wait read")
410+
}
411+
399412
atomic.StoreInt64(&c.waitReadSize, int64(n))
400-
defer atomic.StoreInt64(&c.waitReadSize, 0)
401413
if c.readTimeout > 0 {
402-
return c.waitReadWithTimeout(n)
414+
err = c.waitReadWithTimeout(n)
415+
goto CLEANUP
403416
}
404417
// wait full n
405418
for c.inputBuffer.Len() < n {
406419
switch c.status(closing) {
407420
case poller:
408-
return Exception(ErrEOF, "wait read")
421+
err = Exception(ErrEOF, "wait read")
409422
case user:
410-
return Exception(ErrConnClosed, "wait read")
423+
err = Exception(ErrConnClosed, "wait read")
411424
default:
412425
err = <-c.readTrigger
413-
if err != nil {
414-
return err
415-
}
426+
}
427+
if err != nil {
428+
goto CLEANUP
416429
}
417430
}
418-
return nil
431+
CLEANUP:
432+
atomic.StoreInt64(&c.waitReadSize, 0)
433+
if c.readThreshold > 0 && err == nil {
434+
// only resume read when current read size could make newBufferSize < readThreshold
435+
bufferSize := int64(c.inputBuffer.Len())
436+
newBufferSize := bufferSize - int64(n)
437+
if bufferSize >= c.readThreshold && newBufferSize < c.readThreshold {
438+
c.resumeRead()
439+
}
440+
}
441+
return err
419442
}
420443

421444
// waitReadWithTimeout will wait full n bytes or until timeout.

connection_onevent.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ func (c *connection) onPrepare(opts *options) (err error) {
103103
c.SetReadTimeout(opts.readTimeout)
104104
c.SetWriteTimeout(opts.writeTimeout)
105105
c.SetIdleTimeout(opts.idleTimeout)
106+
c.SetReadThreshold(opts.readThreshold)
106107

107108
// calling prepare first and then register.
108109
if opts.onPrepare != nil {

connection_reactor.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,11 @@ func (c *connection) inputAck(n int) (err error) {
104104
c.maxSize = mallocMax
105105
}
106106

107+
// trigger throttle
108+
if c.readThreshold > 0 && int64(length) >= c.readThreshold {
109+
c.pauseRead()
110+
}
111+
107112
var needTrigger = true
108113
if length == n { // first start onRequest
109114
needTrigger = c.onRequest()
@@ -138,6 +143,29 @@ func (c *connection) outputAck(n int) (err error) {
138143

139144
// rw2r removed the monitoring of write events.
140145
func (c *connection) rw2r() {
141-
c.operator.Control(PollRW2R)
146+
switch c.operator.getMode() {
147+
case opreadwrite:
148+
c.operator.Control(PollRW2R)
149+
case opwrite:
150+
c.operator.Control(PollW2RW)
151+
}
142152
c.triggerWrite(nil)
143153
}
154+
155+
func (c *connection) pauseRead() {
156+
switch c.operator.getMode() {
157+
case opread:
158+
c.operator.Control(PollR2Hup)
159+
case opreadwrite:
160+
c.operator.Control(PollRW2W)
161+
}
162+
}
163+
164+
func (c *connection) resumeRead() {
165+
switch c.operator.getMode() {
166+
case ophup:
167+
c.operator.Control(PollHup2R)
168+
case opwrite:
169+
c.operator.Control(PollW2RW)
170+
}
171+
}

connection_test.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -675,3 +675,95 @@ func TestConnectionDailTimeoutAndClose(t *testing.T) {
675675
wg.Wait()
676676
}
677677
}
678+
679+
func TestConnectionReadOutOfThreshold(t *testing.T) {
680+
var readThreshold = 1024 * 100
681+
var readSize = readThreshold + 1
682+
var opts = &options{}
683+
var wg sync.WaitGroup
684+
wg.Add(1)
685+
opts.onRequest = func(ctx context.Context, connection Connection) error {
686+
if connection.Reader().Len() < readThreshold {
687+
return nil
688+
}
689+
defer wg.Done()
690+
// read throttled data
691+
_, err := connection.Reader().Next(readSize)
692+
Assert(t, errors.Is(err, ErrReadOutOfThreshold), err)
693+
connection.Close()
694+
return nil
695+
}
696+
697+
WithReadThreshold(int64(readThreshold)).f(opts)
698+
r, w := GetSysFdPairs()
699+
rconn, wconn := &connection{}, &connection{}
700+
rconn.init(&netFD{fd: r}, opts)
701+
wconn.init(&netFD{fd: w}, opts)
702+
703+
msg := make([]byte, readThreshold)
704+
_, err := wconn.Writer().WriteBinary(msg)
705+
MustNil(t, err)
706+
err = wconn.Writer().Flush()
707+
MustNil(t, err)
708+
wg.Wait()
709+
}
710+
711+
func TestConnectionReadThreshold(t *testing.T) {
712+
var readThreshold int64 = 1024 * 100
713+
var opts = &options{}
714+
var wg sync.WaitGroup
715+
var throttled int32
716+
wg.Add(1)
717+
opts.onRequest = func(ctx context.Context, connection Connection) error {
718+
if int64(connection.Reader().Len()) < readThreshold {
719+
return nil
720+
}
721+
defer wg.Done()
722+
723+
atomic.StoreInt32(&throttled, 1)
724+
// check if no more read data when throttled
725+
inbuffered := connection.Reader().Len()
726+
t.Logf("Inbuffered: %d", inbuffered)
727+
time.Sleep(time.Millisecond * 100)
728+
Equal(t, inbuffered, connection.Reader().Len())
729+
730+
// read non-throttled data
731+
buf, err := connection.Reader().Next(int(readThreshold))
732+
Equal(t, int64(len(buf)), readThreshold)
733+
MustNil(t, err)
734+
err = connection.Reader().Release()
735+
MustNil(t, err)
736+
t.Logf("read non-throttled data")
737+
738+
// continue read throttled data
739+
buf, err = connection.Reader().Next(5)
740+
MustNil(t, err)
741+
t.Logf("read throttled data: [%s]", buf)
742+
Equal(t, len(buf), 5)
743+
MustNil(t, err)
744+
err = connection.Reader().Release()
745+
MustNil(t, err)
746+
Equal(t, connection.Reader().Len(), 0)
747+
return nil
748+
}
749+
750+
WithReadThreshold(readThreshold).f(opts)
751+
r, w := GetSysFdPairs()
752+
rconn, wconn := &connection{}, &connection{}
753+
rconn.init(&netFD{fd: r}, opts)
754+
wconn.init(&netFD{fd: w}, opts)
755+
Assert(t, rconn.readThreshold == readThreshold)
756+
757+
msg := make([]byte, readThreshold)
758+
_, err := wconn.Writer().WriteBinary(msg)
759+
MustNil(t, err)
760+
err = wconn.Writer().Flush()
761+
MustNil(t, err)
762+
_, err = wconn.Writer().WriteString("hello")
763+
MustNil(t, err)
764+
err = wconn.Writer().Flush()
765+
MustNil(t, err)
766+
t.Logf("flush final msg")
767+
768+
wg.Wait()
769+
}

docs/guide/guide_cn.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,26 @@ func callback(connection netpoll.Connection) error {
519519
}
520520
```
521521

522+
## 8. 如何配置连接的读取阈值大小 ?
523+
524+
Netpoll 默认不会对端发送数据的读取速度有任何限制,每当连接有数据时,Netpoll 会尽可能快地将数据存放在自己的 buffer 中。但有时候可能用户不希望数据过快发送,或者是希望控制服务内存使用量,又或者业务 OnRequest 回调处理速度很慢需要限制发送方速度,此时可以使用 `WithReadThreshold` 来控制读取的最大阈值。
525+
526+
### Client 侧使用
527+
528+
```
529+
dialer := netpoll.NewDialer(netpoll.WithReadThreshold(1024 * 1024 * 1024 * 1)) // 1GB
530+
conn, _ = dialer.DialConnection(network, address, timeout)
531+
```
532+
533+
### Server 侧使用
534+
535+
```
536+
eventLoop, _ := netpoll.NewEventLoop(
537+
handle,
538+
netpoll.WithReadThreshold(1024 * 1024 * 1024 * 1), // 1GB
539+
)
540+
```
541+
522542
# 注意事项
523543

524544
## 1. 错误设置 NumLoops

docs/guide/guide_en.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -558,6 +558,30 @@ func callback(connection netpoll.Connection) error {
558558
}
559559
```
560560

561+
## 8. How to configure the read threshold of the connection?
562+
563+
By default, Netpoll does not place any limit on the reading speed of data sent by the end.
564+
Whenever there have more data on the connection, Netpoll will read the data into its own buffer as quickly as possible.
565+
566+
But sometimes users may not want data to be read too quickly, or they want to control the service memory usage, or the user's OnRequest callback processing data very slowly and need to control the peer's send speed.
567+
In this case, you can use `WithReadThreshold` to control the maximum reading threshold.
568+
569+
### Client side use
570+
571+
```
572+
dialer := netpoll.NewDialer(netpoll.WithReadThreshold(1024 * 1024 * 1024 * 1)) // 1GB
573+
conn, _ = dialer.DialConnection(network, address, timeout)
574+
```
575+
576+
### Server side use
577+
578+
```
579+
eventLoop, _ := netpoll.NewEventLoop(
580+
handle,
581+
netpoll.WithReadThreshold(1024 * 1024 * 1024 * 1), // 1GB
582+
)
583+
```
584+
561585
# Attention
562586

563587
## 1. Wrong setting of NumLoops

fd_operator.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,15 @@ import (
1919
"sync/atomic"
2020
)
2121

22+
const (
23+
opdetach int32 = -1
24+
_ int32 = 0 // default op mode, means nothing
25+
opread int32 = 1
26+
opwrite int32 = 2
27+
opreadwrite int32 = 3
28+
ophup int32 = 4
29+
)
30+
2231
// FDOperator is a collection of operations on file descriptors.
2332
type FDOperator struct {
2433
// FD is file descriptor, poll will bind when register.
@@ -42,8 +51,7 @@ type FDOperator struct {
4251
// poll is the registered location of the file descriptor.
4352
poll Poll
4453

45-
// protect only detach once
46-
detached int32
54+
mode int32
4755

4856
// private, used by operatorCache
4957
next *FDOperator
@@ -52,16 +60,21 @@ type FDOperator struct {
5260
}
5361

5462
func (op *FDOperator) Control(event PollEvent) error {
55-
if event == PollDetach && atomic.AddInt32(&op.detached, 1) > 1 {
56-
return nil
57-
}
5863
return op.poll.Control(op, event)
5964
}
6065

6166
func (op *FDOperator) Free() {
6267
op.poll.Free(op)
6368
}
6469

70+
func (op *FDOperator) getMode() int32 {
71+
return atomic.LoadInt32(&op.mode)
72+
}
73+
74+
func (op *FDOperator) setMode(mode int32) {
75+
atomic.StoreInt32(&op.mode, mode)
76+
}
77+
6578
func (op *FDOperator) do() (can bool) {
6679
return atomic.CompareAndSwapInt32(&op.state, 1, 2)
6780
}
@@ -98,5 +111,5 @@ func (op *FDOperator) reset() {
98111
op.Inputs, op.InputAck = nil, nil
99112
op.Outputs, op.OutputAck = nil, nil
100113
op.poll = nil
101-
op.detached = 0
114+
op.mode = 0
102115
}

0 commit comments

Comments
 (0)