-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Implement deadline for `Stream` * chore: code cleanup * fix: buffer release * fix: do not use buffer for `cmdUpdatePaddingScheme` --------- Co-authored-by: anytls <anytls>
- Loading branch information
Showing
7 changed files
with
352 additions
and
27 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
package pipe | ||
|
||
import ( | ||
"sync" | ||
"time" | ||
) | ||
|
||
// PipeDeadline is an abstraction for handling timeouts. | ||
type PipeDeadline struct { | ||
mu sync.Mutex // Guards timer and cancel | ||
timer *time.Timer | ||
cancel chan struct{} // Must be non-nil | ||
} | ||
|
||
func MakePipeDeadline() PipeDeadline { | ||
return PipeDeadline{cancel: make(chan struct{})} | ||
} | ||
|
||
// Set sets the point in time when the deadline will time out. | ||
// A timeout event is signaled by closing the channel returned by waiter. | ||
// Once a timeout has occurred, the deadline can be refreshed by specifying a | ||
// t value in the future. | ||
// | ||
// A zero value for t prevents timeout. | ||
func (d *PipeDeadline) Set(t time.Time) { | ||
d.mu.Lock() | ||
defer d.mu.Unlock() | ||
|
||
if d.timer != nil && !d.timer.Stop() { | ||
<-d.cancel // Wait for the timer callback to finish and close cancel | ||
} | ||
d.timer = nil | ||
|
||
// Time is zero, then there is no deadline. | ||
closed := isClosedChan(d.cancel) | ||
if t.IsZero() { | ||
if closed { | ||
d.cancel = make(chan struct{}) | ||
} | ||
return | ||
} | ||
|
||
// Time in the future, setup a timer to cancel in the future. | ||
if dur := time.Until(t); dur > 0 { | ||
if closed { | ||
d.cancel = make(chan struct{}) | ||
} | ||
d.timer = time.AfterFunc(dur, func() { | ||
close(d.cancel) | ||
}) | ||
return | ||
} | ||
|
||
// Time in the past, so close immediately. | ||
if !closed { | ||
close(d.cancel) | ||
} | ||
} | ||
|
||
// Wait returns a channel that is closed when the deadline is exceeded. | ||
func (d *PipeDeadline) Wait() chan struct{} { | ||
d.mu.Lock() | ||
defer d.mu.Unlock() | ||
return d.cancel | ||
} | ||
|
||
func isClosedChan(c <-chan struct{}) bool { | ||
select { | ||
case <-c: | ||
return true | ||
default: | ||
return false | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,232 @@ | ||
// Copyright 2009 The Go Authors. All rights reserved. | ||
// Use of this source code is governed by a BSD-style | ||
// license that can be found in the LICENSE file. | ||
|
||
// Pipe adapter to connect code expecting an io.Reader | ||
// with code expecting an io.Writer. | ||
|
||
package pipe | ||
|
||
import ( | ||
"io" | ||
"os" | ||
"sync" | ||
"time" | ||
) | ||
|
||
// onceError is an object that will only store an error once. | ||
type onceError struct { | ||
sync.Mutex // guards following | ||
err error | ||
} | ||
|
||
func (a *onceError) Store(err error) { | ||
a.Lock() | ||
defer a.Unlock() | ||
if a.err != nil { | ||
return | ||
} | ||
a.err = err | ||
} | ||
func (a *onceError) Load() error { | ||
a.Lock() | ||
defer a.Unlock() | ||
return a.err | ||
} | ||
|
||
// A pipe is the shared pipe structure underlying PipeReader and PipeWriter. | ||
type pipe struct { | ||
wrMu sync.Mutex // Serializes Write operations | ||
wrCh chan []byte | ||
rdCh chan int | ||
|
||
once sync.Once // Protects closing done | ||
done chan struct{} | ||
rerr onceError | ||
werr onceError | ||
|
||
readDeadline PipeDeadline | ||
writeDeadline PipeDeadline | ||
} | ||
|
||
func (p *pipe) read(b []byte) (n int, err error) { | ||
select { | ||
case <-p.done: | ||
return 0, p.readCloseError() | ||
case <-p.readDeadline.Wait(): | ||
return 0, os.ErrDeadlineExceeded | ||
default: | ||
} | ||
|
||
select { | ||
case bw := <-p.wrCh: | ||
nr := copy(b, bw) | ||
p.rdCh <- nr | ||
return nr, nil | ||
case <-p.done: | ||
return 0, p.readCloseError() | ||
case <-p.readDeadline.Wait(): | ||
return 0, os.ErrDeadlineExceeded | ||
} | ||
} | ||
|
||
func (p *pipe) closeRead(err error) error { | ||
if err == nil { | ||
err = io.ErrClosedPipe | ||
} | ||
p.rerr.Store(err) | ||
p.once.Do(func() { close(p.done) }) | ||
return nil | ||
} | ||
|
||
func (p *pipe) write(b []byte) (n int, err error) { | ||
select { | ||
case <-p.done: | ||
return 0, p.writeCloseError() | ||
case <-p.writeDeadline.Wait(): | ||
return 0, os.ErrDeadlineExceeded | ||
default: | ||
p.wrMu.Lock() | ||
defer p.wrMu.Unlock() | ||
} | ||
|
||
for once := true; once || len(b) > 0; once = false { | ||
select { | ||
case p.wrCh <- b: | ||
nw := <-p.rdCh | ||
b = b[nw:] | ||
n += nw | ||
case <-p.done: | ||
return n, p.writeCloseError() | ||
case <-p.writeDeadline.Wait(): | ||
return n, os.ErrDeadlineExceeded | ||
} | ||
} | ||
return n, nil | ||
} | ||
|
||
func (p *pipe) closeWrite(err error) error { | ||
if err == nil { | ||
err = io.EOF | ||
} | ||
p.werr.Store(err) | ||
p.once.Do(func() { close(p.done) }) | ||
return nil | ||
} | ||
|
||
// readCloseError is considered internal to the pipe type. | ||
func (p *pipe) readCloseError() error { | ||
rerr := p.rerr.Load() | ||
if werr := p.werr.Load(); rerr == nil && werr != nil { | ||
return werr | ||
} | ||
return io.ErrClosedPipe | ||
} | ||
|
||
// writeCloseError is considered internal to the pipe type. | ||
func (p *pipe) writeCloseError() error { | ||
werr := p.werr.Load() | ||
if rerr := p.rerr.Load(); werr == nil && rerr != nil { | ||
return rerr | ||
} | ||
return io.ErrClosedPipe | ||
} | ||
|
||
// A PipeReader is the read half of a pipe. | ||
type PipeReader struct{ pipe } | ||
|
||
// Read implements the standard Read interface: | ||
// it reads data from the pipe, blocking until a writer | ||
// arrives or the write end is closed. | ||
// If the write end is closed with an error, that error is | ||
// returned as err; otherwise err is EOF. | ||
func (r *PipeReader) Read(data []byte) (n int, err error) { | ||
return r.pipe.read(data) | ||
} | ||
|
||
// Close closes the reader; subsequent writes to the | ||
// write half of the pipe will return the error [ErrClosedPipe]. | ||
func (r *PipeReader) Close() error { | ||
return r.CloseWithError(nil) | ||
} | ||
|
||
// CloseWithError closes the reader; subsequent writes | ||
// to the write half of the pipe will return the error err. | ||
// | ||
// CloseWithError never overwrites the previous error if it exists | ||
// and always returns nil. | ||
func (r *PipeReader) CloseWithError(err error) error { | ||
return r.pipe.closeRead(err) | ||
} | ||
|
||
// A PipeWriter is the write half of a pipe. | ||
type PipeWriter struct{ r PipeReader } | ||
|
||
// Write implements the standard Write interface: | ||
// it writes data to the pipe, blocking until one or more readers | ||
// have consumed all the data or the read end is closed. | ||
// If the read end is closed with an error, that err is | ||
// returned as err; otherwise err is [ErrClosedPipe]. | ||
func (w *PipeWriter) Write(data []byte) (n int, err error) { | ||
return w.r.pipe.write(data) | ||
} | ||
|
||
// Close closes the writer; subsequent reads from the | ||
// read half of the pipe will return no bytes and EOF. | ||
func (w *PipeWriter) Close() error { | ||
return w.CloseWithError(nil) | ||
} | ||
|
||
// CloseWithError closes the writer; subsequent reads from the | ||
// read half of the pipe will return no bytes and the error err, | ||
// or EOF if err is nil. | ||
// | ||
// CloseWithError never overwrites the previous error if it exists | ||
// and always returns nil. | ||
func (w *PipeWriter) CloseWithError(err error) error { | ||
return w.r.pipe.closeWrite(err) | ||
} | ||
|
||
// Pipe creates a synchronous in-memory pipe. | ||
// It can be used to connect code expecting an [io.Reader] | ||
// with code expecting an [io.Writer]. | ||
// | ||
// Reads and Writes on the pipe are matched one to one | ||
// except when multiple Reads are needed to consume a single Write. | ||
// That is, each Write to the [PipeWriter] blocks until it has satisfied | ||
// one or more Reads from the [PipeReader] that fully consume | ||
// the written data. | ||
// The data is copied directly from the Write to the corresponding | ||
// Read (or Reads); there is no internal buffering. | ||
// | ||
// It is safe to call Read and Write in parallel with each other or with Close. | ||
// Parallel calls to Read and parallel calls to Write are also safe: | ||
// the individual calls will be gated sequentially. | ||
// | ||
// Added SetReadDeadline and SetWriteDeadline methods based on `io.Pipe`. | ||
func Pipe() (*PipeReader, *PipeWriter) { | ||
pw := &PipeWriter{r: PipeReader{pipe: pipe{ | ||
wrCh: make(chan []byte), | ||
rdCh: make(chan int), | ||
done: make(chan struct{}), | ||
readDeadline: MakePipeDeadline(), | ||
writeDeadline: MakePipeDeadline(), | ||
}}} | ||
return &pw.r, pw | ||
} | ||
|
||
func (p *PipeReader) SetReadDeadline(t time.Time) error { | ||
if isClosedChan(p.done) { | ||
return io.ErrClosedPipe | ||
} | ||
p.readDeadline.Set(t) | ||
return nil | ||
} | ||
|
||
func (p *PipeWriter) SetWriteDeadline(t time.Time) error { | ||
if isClosedChan(p.r.done) { | ||
return io.ErrClosedPipe | ||
} | ||
p.r.writeDeadline.Set(t) | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.