Skip to content

Commit

Permalink
Deprecated and moved writeflusher impl to internal pkg as LegacyWrite…
Browse files Browse the repository at this point in the history
…Flusher - Created new internal writeflusher

Signed-off-by: Christopher Petito <47751006+krissetto@users.noreply.github.com>
  • Loading branch information
krissetto committed Apr 15, 2024
1 parent 8d5d655 commit 0ad6eb4
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 88 deletions.
102 changes: 102 additions & 0 deletions internal/writeflusher/legacywriteflusher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package writeflusher

import (
"io"
"sync"
)

// Deprecated: use the internal WriteFlusher instead.
// This is the old implementation that lived in ioutils.

// This struct and all funcs below used to live in the pkg/ioutils package
//
// LegacyWriteFlusher wraps the Write and Flush operation ensuring that every write
// is a flush. In addition, the Close method can be called to intercept
// Read/Write calls if the targets lifecycle has already ended.
//
// Deprecated: use WriteFlusher instead
type LegacyWriteFlusher struct {
w io.Writer
flusher flusher
flushed chan struct{}
flushedOnce sync.Once
closed chan struct{}
closeLock sync.Mutex
}

// NewLegacyWriteFlusher returns a new LegacyWriteFlusher.
//
// Deprecated: use WriteFlusher instead
func NewLegacyWriteFlusher(w io.Writer) *LegacyWriteFlusher {
var fl flusher
if f, ok := w.(flusher); ok {
fl = f
} else {
fl = &NopFlusher{}
}
return &LegacyWriteFlusher{w: w, flusher: fl, closed: make(chan struct{}), flushed: make(chan struct{})}
}

// Deprecated: use WriteFlusher instead
func (wf *LegacyWriteFlusher) Write(b []byte) (n int, err error) {
select {
case <-wf.closed:
return 0, errWriteFlusherClosed
default:
}

n, err = wf.w.Write(b)
wf.Flush() // every write is a flush.
return n, err
}

// Flush the stream immediately.
//
// Deprecated: use WriteFlusher instead
func (wf *LegacyWriteFlusher) Flush() {
select {
case <-wf.closed:
return
default:
}

wf.flushedOnce.Do(func() {
close(wf.flushed)
})
wf.flusher.Flush()
}

// Flushed returns the state of flushed.
// If it's flushed, return true, or else it return false.
//
// Deprecated: use WriteFlusher instead
func (wf *LegacyWriteFlusher) Flushed() bool {
// BUG(stevvooe): Remove this method. Its use is inherently racy. Seems to
// be used to detect whether or a response code has been issued or not.
// Another hook should be used instead.
var flushed bool
select {
case <-wf.flushed:
flushed = true
default:
}
return flushed
}

// Close closes the write flusher, disallowing any further writes to the
// target. After the flusher is closed, all calls to write or flush will
// result in an error.
//
// Deprecated: use WriteFlusher instead
func (wf *LegacyWriteFlusher) Close() error {
wf.closeLock.Lock()
defer wf.closeLock.Unlock()

select {
case <-wf.closed:
return errWriteFlusherClosed
default:
close(wf.closed)
}
return nil
}
83 changes: 83 additions & 0 deletions internal/writeflusher/writeflusher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package writeflusher

import (
"io"
"sync"
)

type flusher interface {
Flush()
}

var errWriteFlusherClosed = io.EOF

// NopFlusher represents a type which flush operation is nop.
type NopFlusher struct{}

// Flush is a nop operation.
func (f *NopFlusher) Flush() {}

// WriteFlusher wraps the Write and Flush operation ensuring that every write
// is a flush. In addition, the Close method can be called to intercept
// Read/Write calls if the targets lifecycle has already ended.
type WriteFlusher struct {
w io.Writer
flusher flusher
closed chan struct{}
closeLock sync.Mutex
}

// NewWriteFlusher returns a new WriteFlusher.
func NewWriteFlusher(w io.Writer) *WriteFlusher {
var fl flusher
if f, ok := w.(flusher); ok {
fl = f
} else {
fl = &NopFlusher{}
}
return &WriteFlusher{w: w, flusher: fl, closed: make(chan struct{})}
}

func (wf *WriteFlusher) Write(b []byte) (n int, err error) {
select {
case <-wf.closed:
return 0, errWriteFlusherClosed
default:
}

n, err = wf.w.Write(b)
wf.Flush() // every write is a flush.
return n, err
}

// Flush the stream immediately.
func (wf *WriteFlusher) Flush() {
select {
case <-wf.closed:
return
default:
}

// Here we Write() to ensure WriteHeader() gets called appropriately
// even when the ResponseWriter has already been wrapped by OTEL instrumentation
// (which doesn't wrap the Flush() func. See https://github.com/moby/moby/pull/47715)
wf.w.Write([]byte{})

wf.flusher.Flush()
}

// Close closes the write flusher, disallowing any further writes to the
// target. After the flusher is closed, all calls to write or flush will
// result in an error.
func (wf *WriteFlusher) Close() error {
wf.closeLock.Lock()
defer wf.closeLock.Unlock()

select {
case <-wf.closed:
return errWriteFlusherClosed
default:
close(wf.closed)
}
return nil
}
92 changes: 10 additions & 82 deletions pkg/ioutils/writeflusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,91 +2,19 @@ package ioutils // import "github.com/docker/docker/pkg/ioutils"

import (
"io"
"sync"
)

// WriteFlusher wraps the Write and Flush operation ensuring that every write
// is a flush. In addition, the Close method can be called to intercept
// Read/Write calls if the targets lifecycle has already ended.
type WriteFlusher struct {
w io.Writer
flusher flusher
flushed chan struct{}
flushedOnce sync.Once
closed chan struct{}
closeLock sync.Mutex
}

type flusher interface {
Flush()
}

var errWriteFlusherClosed = io.EOF

func (wf *WriteFlusher) Write(b []byte) (n int, err error) {
select {
case <-wf.closed:
return 0, errWriteFlusherClosed
default:
}

n, err = wf.w.Write(b)
wf.Flush() // every write is a flush.
return n, err
}

// Flush the stream immediately.
func (wf *WriteFlusher) Flush() {
select {
case <-wf.closed:
return
default:
}

wf.flushedOnce.Do(func() {
close(wf.flushed)
})
wf.flusher.Flush()
}

// Flushed returns the state of flushed.
// If it's flushed, return true, or else it return false.
func (wf *WriteFlusher) Flushed() bool {
// BUG(stevvooe): Remove this method. Its use is inherently racy. Seems to
// be used to detect whether or a response code has been issued or not.
// Another hook should be used instead.
var flushed bool
select {
case <-wf.flushed:
flushed = true
default:
}
return flushed
}
"github.com/docker/docker/internal/writeflusher"
)

// Close closes the write flusher, disallowing any further writes to the
// target. After the flusher is closed, all calls to write or flush will
// result in an error.
func (wf *WriteFlusher) Close() error {
wf.closeLock.Lock()
defer wf.closeLock.Unlock()
// NopFlusher represents a type which flush operation is nop.
type NopFlusher struct{}

select {
case <-wf.closed:
return errWriteFlusherClosed
default:
close(wf.closed)
}
return nil
}
// Flush is a nop operation.
func (f *NopFlusher) Flush() {}

// NewWriteFlusher returns a new WriteFlusher.
func NewWriteFlusher(w io.Writer) *WriteFlusher {
var fl flusher
if f, ok := w.(flusher); ok {
fl = f
} else {
fl = &NopFlusher{}
}
return &WriteFlusher{w: w, flusher: fl, closed: make(chan struct{}), flushed: make(chan struct{})}
//
// Deprecated: use the internal/writeflusher WriteFlusher instead.
func NewWriteFlusher(w io.Writer) *writeflusher.LegacyWriteFlusher {
return writeflusher.NewLegacyWriteFlusher(w)
}
6 changes: 0 additions & 6 deletions pkg/ioutils/writers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,6 @@ func NopWriteCloser(w io.Writer) io.WriteCloser {
return &nopWriteCloser{w}
}

// NopFlusher represents a type which flush operation is nop.
type NopFlusher struct{}

// Flush is a nop operation.
func (f *NopFlusher) Flush() {}

type writeCloserWrapper struct {
io.Writer
closer func() error
Expand Down

0 comments on commit 0ad6eb4

Please sign in to comment.