-
Notifications
You must be signed in to change notification settings - Fork 18.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Deprecated and moved writeflusher impl to internal pkg as LegacyWrite…
…Flusher - Created new internal writeflusher Signed-off-by: Christopher Petito <47751006+krissetto@users.noreply.github.com>
- Loading branch information
Showing
3 changed files
with
191 additions
and
84 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
package writeflusher | ||
|
||
import ( | ||
"io" | ||
"sync" | ||
) | ||
|
||
// Deprecated: use the internal WriteFlusher instead. | ||
// This is the old implementation that lived in ioutils. | ||
|
||
// This struct and all funcs below used to live in the pkg/ioutils package | ||
// | ||
// LegacyWriteFlusher wraps the Write and Flush operation ensuring that every write | ||
// is a flush. In addition, the Close method can be called to intercept | ||
// Read/Write calls if the targets lifecycle has already ended. | ||
// | ||
// Deprecated: use WriteFlusher instead | ||
type LegacyWriteFlusher struct { | ||
w io.Writer | ||
flusher flusher | ||
flushed chan struct{} | ||
flushedOnce sync.Once | ||
closed chan struct{} | ||
closeLock sync.Mutex | ||
} | ||
|
||
// NewLegacyWriteFlusher returns a new LegacyWriteFlusher. | ||
// | ||
// Deprecated: use WriteFlusher instead | ||
func NewLegacyWriteFlusher(w io.Writer) *LegacyWriteFlusher { | ||
var fl flusher | ||
if f, ok := w.(flusher); ok { | ||
fl = f | ||
} else { | ||
fl = &NopFlusher{} | ||
} | ||
return &LegacyWriteFlusher{w: w, flusher: fl, closed: make(chan struct{}), flushed: make(chan struct{})} | ||
} | ||
|
||
// Deprecated: use WriteFlusher instead | ||
func (wf *LegacyWriteFlusher) Write(b []byte) (n int, err error) { | ||
select { | ||
case <-wf.closed: | ||
return 0, errWriteFlusherClosed | ||
default: | ||
} | ||
|
||
n, err = wf.w.Write(b) | ||
wf.Flush() // every write is a flush. | ||
return n, err | ||
} | ||
|
||
// Flush the stream immediately. | ||
// | ||
// Deprecated: use WriteFlusher instead | ||
func (wf *LegacyWriteFlusher) Flush() { | ||
select { | ||
case <-wf.closed: | ||
return | ||
default: | ||
} | ||
|
||
wf.flushedOnce.Do(func() { | ||
close(wf.flushed) | ||
}) | ||
wf.flusher.Flush() | ||
} | ||
|
||
// Flushed returns the state of flushed. | ||
// If it's flushed, return true, or else it return false. | ||
// | ||
// Deprecated: use WriteFlusher instead | ||
func (wf *LegacyWriteFlusher) Flushed() bool { | ||
// BUG(stevvooe): Remove this method. Its use is inherently racy. Seems to | ||
// be used to detect whether or a response code has been issued or not. | ||
// Another hook should be used instead. | ||
var flushed bool | ||
select { | ||
case <-wf.flushed: | ||
flushed = true | ||
default: | ||
} | ||
return flushed | ||
} | ||
|
||
// Close closes the write flusher, disallowing any further writes to the | ||
// target. After the flusher is closed, all calls to write or flush will | ||
// result in an error. | ||
// | ||
// Deprecated: use WriteFlusher instead | ||
func (wf *LegacyWriteFlusher) Close() error { | ||
wf.closeLock.Lock() | ||
defer wf.closeLock.Unlock() | ||
|
||
select { | ||
case <-wf.closed: | ||
return errWriteFlusherClosed | ||
default: | ||
close(wf.closed) | ||
} | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
package writeflusher | ||
|
||
import ( | ||
"io" | ||
"sync" | ||
) | ||
|
||
type flusher interface { | ||
Flush() | ||
} | ||
|
||
var errWriteFlusherClosed = io.EOF | ||
|
||
// NopFlusher represents a type which flush operation is nop. | ||
type NopFlusher struct{} | ||
|
||
// Flush is a nop operation. | ||
func (f *NopFlusher) Flush() {} | ||
|
||
// WriteFlusher wraps the Write and Flush operation ensuring that every write | ||
// is a flush. In addition, the Close method can be called to intercept | ||
// Read/Write calls if the targets lifecycle has already ended. | ||
type WriteFlusher struct { | ||
w io.Writer | ||
flusher flusher | ||
closed chan struct{} | ||
closeLock sync.Mutex | ||
} | ||
|
||
// NewWriteFlusher returns a new WriteFlusher. | ||
func NewWriteFlusher(w io.Writer) *WriteFlusher { | ||
var fl flusher | ||
if f, ok := w.(flusher); ok { | ||
fl = f | ||
} else { | ||
fl = &NopFlusher{} | ||
} | ||
return &WriteFlusher{w: w, flusher: fl, closed: make(chan struct{})} | ||
} | ||
|
||
func (wf *WriteFlusher) Write(b []byte) (n int, err error) { | ||
select { | ||
case <-wf.closed: | ||
return 0, errWriteFlusherClosed | ||
default: | ||
} | ||
|
||
n, err = wf.w.Write(b) | ||
wf.Flush() // every write is a flush. | ||
return n, err | ||
} | ||
|
||
// Flush the stream immediately. | ||
func (wf *WriteFlusher) Flush() { | ||
select { | ||
case <-wf.closed: | ||
return | ||
default: | ||
} | ||
|
||
// Here we Write() to ensure WriteHeader() gets called appropriately | ||
// even when the ResponseWriter has already been wrapped by OTEL instrumentation | ||
// (which doesn't wrap the Flush() func. See https://github.com/moby/moby/pull/47715) | ||
wf.w.Write([]byte{}) | ||
|
||
wf.flusher.Flush() | ||
} | ||
|
||
// Close closes the write flusher, disallowing any further writes to the | ||
// target. After the flusher is closed, all calls to write or flush will | ||
// result in an error. | ||
func (wf *WriteFlusher) Close() error { | ||
wf.closeLock.Lock() | ||
defer wf.closeLock.Unlock() | ||
|
||
select { | ||
case <-wf.closed: | ||
return errWriteFlusherClosed | ||
default: | ||
close(wf.closed) | ||
} | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters