Skip to content

feat: add WithReadThreshold API #298

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions connection_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const (
ErrEOF = syscall.Errno(0x106)
// Write I/O buffer timeout, calling by Connection.Writer
ErrWriteTimeout = syscall.Errno(0x107)
// The wait read size large than read threshold
ErrReadExceedThreshold = syscall.Errno(0x108)
)

const ErrnoMask = 0xFF
Expand Down Expand Up @@ -110,11 +112,12 @@ func (e *exception) Temporary() bool {

// Errors defined in netpoll
var errnos = [...]string{
ErrnoMask & ErrConnClosed: "connection has been closed",
ErrnoMask & ErrReadTimeout: "connection read timeout",
ErrnoMask & ErrDialTimeout: "dial wait timeout",
ErrnoMask & ErrDialNoDeadline: "dial no deadline",
ErrnoMask & ErrUnsupported: "netpoll dose not support",
ErrnoMask & ErrEOF: "EOF",
ErrnoMask & ErrWriteTimeout: "connection write timeout",
ErrnoMask & ErrConnClosed: "connection has been closed",
ErrnoMask & ErrReadTimeout: "connection read timeout",
ErrnoMask & ErrDialTimeout: "dial wait timeout",
ErrnoMask & ErrDialNoDeadline: "dial no deadline",
ErrnoMask & ErrUnsupported: "netpoll dose not support",
ErrnoMask & ErrEOF: "EOF",
ErrnoMask & ErrWriteTimeout: "connection write timeout",
ErrnoMask & ErrReadExceedThreshold: "connection read size exceeds the threshold",
}
81 changes: 50 additions & 31 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,22 @@ type connection struct {
netFD
onEvent
locker
operator *FDOperator
readTimeout time.Duration
readTimer *time.Timer
readTrigger chan error
waitReadSize int64
writeTimeout time.Duration
writeTimer *time.Timer
writeTrigger chan error
inputBuffer *LinkBuffer
outputBuffer *LinkBuffer
outputBarrier *barrier
supportZeroCopy bool
maxSize int // The maximum size of data between two Release().
bookSize int // The size of data that can be read at once.
state int32 // 0: not connected, 1: connected, 2: disconnected. Connection state should be changed sequentially.
operator *FDOperator
readTimeout time.Duration
readTimer *time.Timer
readTrigger chan error
waitReadSize int64
writeTimeout time.Duration
writeTimer *time.Timer
writeTrigger chan error
inputBuffer *LinkBuffer
outputBuffer *LinkBuffer
outputBarrier *barrier
supportZeroCopy bool
maxSize int // The maximum size of data between two Release().
bookSize int // The size of data that can be read at once.
state int32 // 0: not connected, 1: connected, 2: disconnected. Connection state should be changed sequentially.
readBufferThreshold int64 // The readBufferThreshold limit the size of connection inputBuffer. In bytes.
}

var (
Expand Down Expand Up @@ -95,6 +96,12 @@ func (c *connection) SetWriteTimeout(timeout time.Duration) error {
return nil
}

// SetReadBufferThreshold implements Connection.
func (c *connection) SetReadBufferThreshold(threshold int64) error {
c.readBufferThreshold = threshold
return nil
}

// ------------------------------------------ implement zero-copy reader ------------------------------------------

// Next implements Connection.
Expand Down Expand Up @@ -396,28 +403,41 @@ func (c *connection) triggerWrite(err error) {
// waitRead will wait full n bytes.
func (c *connection) waitRead(n int) (err error) {
if n <= c.inputBuffer.Len() {
return nil
goto CLEANUP
}
// cannot wait read with an out of threshold size
if c.readBufferThreshold > 0 && int64(n) > c.readBufferThreshold {
// just return error and dont do cleanup
return Exception(ErrReadExceedThreshold, "wait read")
}

atomic.StoreInt64(&c.waitReadSize, int64(n))
defer atomic.StoreInt64(&c.waitReadSize, 0)
if c.readTimeout > 0 {
return c.waitReadWithTimeout(n)
err = c.waitReadWithTimeout(n)
goto CLEANUP
}
// wait full n
for c.inputBuffer.Len() < n {
for c.inputBuffer.Len() < n && err == nil {
switch c.status(closing) {
case poller:
return Exception(ErrEOF, "wait read")
err = Exception(ErrEOF, "wait read")
case user:
return Exception(ErrConnClosed, "wait read")
err = Exception(ErrConnClosed, "wait read")
default:
err = <-c.readTrigger
if err != nil {
return err
}
}
}
return nil
CLEANUP:
atomic.StoreInt64(&c.waitReadSize, 0)
if c.readBufferThreshold > 0 && err == nil {
// only resume read when current read size could make newBufferSize < readBufferThreshold
bufferSize := int64(c.inputBuffer.Len())
newBufferSize := bufferSize - int64(n)
if bufferSize >= c.readBufferThreshold && newBufferSize < c.readBufferThreshold {
c.resumeRead()
}
}
return err
}

// waitReadWithTimeout will wait full n bytes or until timeout.
Expand Down Expand Up @@ -485,11 +505,10 @@ func (c *connection) flush() error {
if c.outputBuffer.IsEmpty() {
return nil
}
err = c.operator.Control(PollR2RW)
if err != nil {
return Exception(err, "when flush")
}

// no need to check if resume write successfully
// if resume failed, the connection will be triggered triggerWrite(err), and waitFlush will return err
c.resumeWrite()
return c.waitFlush()
}

Expand Down Expand Up @@ -522,8 +541,8 @@ func (c *connection) waitFlush() (err error) {
default:
}
// if timeout, remove write event from poller
// we cannot flush it again, since we don't if the poller is still process outputBuffer
c.operator.Control(PollRW2R)
// we cannot flush it again, since we don't know if the poller is still processing outputBuffer
c.pauseWrite()
return Exception(ErrWriteTimeout, c.remoteAddr.String())
}
}
1 change: 1 addition & 0 deletions connection_onevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (c *connection) onPrepare(opts *options) (err error) {
c.SetReadTimeout(opts.readTimeout)
c.SetWriteTimeout(opts.writeTimeout)
c.SetIdleTimeout(opts.idleTimeout)
c.SetReadBufferThreshold(opts.readBufferThreshold)

// calling prepare first and then register.
if opts.onPrepare != nil {
Expand Down
53 changes: 48 additions & 5 deletions connection_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ func (c *connection) closeBuffer() {

// inputs implements FDOperator.
func (c *connection) inputs(vs [][]byte) (rs [][]byte) {
// trigger throttle
if c.readBufferThreshold > 0 && int64(c.inputBuffer.Len()) >= c.readBufferThreshold {
c.pauseRead()
return
}

vs[0] = c.inputBuffer.book(c.bookSize, c.maxSize)
return vs[:1]
}
Expand All @@ -108,6 +114,11 @@ func (c *connection) inputAck(n int) (err error) {
c.maxSize = mallocMax
}

// trigger throttle
if c.readBufferThreshold > 0 && int64(length) >= c.readBufferThreshold {
c.pauseRead()
}

var needTrigger = true
if length == n { // first start onRequest
needTrigger = c.onRequest()
Expand All @@ -121,7 +132,8 @@ func (c *connection) inputAck(n int) (err error) {
// outputs implements FDOperator.
func (c *connection) outputs(vs [][]byte) (rs [][]byte, supportZeroCopy bool) {
if c.outputBuffer.IsEmpty() {
c.rw2r()
c.pauseWrite()
c.triggerWrite(nil)
return rs, c.supportZeroCopy
}
rs = c.outputBuffer.GetBytes(vs)
Expand All @@ -135,13 +147,44 @@ func (c *connection) outputAck(n int) (err error) {
c.outputBuffer.Release()
}
if c.outputBuffer.IsEmpty() {
c.rw2r()
c.pauseWrite()
c.triggerWrite(nil)
}
return nil
}

// rw2r removed the monitoring of write events.
func (c *connection) rw2r() {
/* The race description of operator event monitoring
- Pause operation will remove old event monitor of operator
- Resume operation will add new event monitor of operator
- Only poller could use Pause to remove event monitor, and poller already hold the op.do() locker
- Only user could use Resume, and user's operation maybe compete with poller's operation
- If competition happen, because of all resume operation will monitor all events, it's safe to do that with a race condition.
* If resume first and pause latter, poller will monitor the accurate events it needs.
* If pause first and resume latter, poller will monitor the duplicate events which will be removed after next poller triggered.
And poller will ensure to remove the duplicate events.
- If there is no readBufferThreshold option, the code path will be more simple and efficient.
*/

// pauseWrite removed the monitoring of write events.
// pauseWrite used in poller
func (c *connection) pauseWrite() {
c.operator.Control(PollRW2R)
c.triggerWrite(nil)
}

// resumeWrite add the monitoring of write events.
// resumeWrite used by users
func (c *connection) resumeWrite() {
c.operator.Control(PollR2RW)
}

// pauseRead removed the monitoring of read events.
// pauseRead used in poller
func (c *connection) pauseRead() {
c.operator.Control(PollRW2W)
}

// resumeRead add the monitoring of read events.
// resumeRead used by users
func (c *connection) resumeRead() {
c.operator.Control(PollW2RW)
}
Loading