Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecated and moved writeflusher impl to internal pkg #47718

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
102 changes: 102 additions & 0 deletions internal/writeflusher/legacywriteflusher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package writeflusher

import (
"io"
"sync"
)

// Deprecated: use the internal WriteFlusher instead.
// This is the old implementation that lived in ioutils.

// This struct and all funcs below used to live in the pkg/ioutils package
//
// LegacyWriteFlusher wraps the Write and Flush operation ensuring that every write
// is a flush. In addition, the Close method can be called to intercept
// Read/Write calls if the targets lifecycle has already ended.
//
// Deprecated: use the internal writeflusher.WriteFlusher instead
type LegacyWriteFlusher struct {
w io.Writer
flusher flusher
flushed chan struct{}
flushedOnce sync.Once
closed chan struct{}
closeLock sync.Mutex
}

// NewLegacyWriteFlusher returns a new LegacyWriteFlusher.
//
// Deprecated: use the internal writeflusher.NewWriteFlusher() instead
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add the deprecation and removal notice to https://github.com/docker/cli/blob/master/docs/deprecated.md? @thaJeztah

While not strictly an API, if we're bothering with the deprecation-cycle anyway, perhaps it wouldn't hurt to call it out in our docs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR was created as a solution to the "superfluous call to writeheader" issue. It seemed like a good moment to go ahead and deprecate the old writeflusher implementation which was exposing some known buggy functionality (

func (wf *WriteFlusher) Flushed() bool {
// BUG(stevvooe): Remove this method. Its use is inherently racy. Seems to
// be used to detect whether or a response code has been issued or not.
// Another hook should be used instead.
var flushed bool
select {
case <-wf.flushed:
flushed = true
default:
}
return flushed
)

If we merge #47796, this PR will no longer be necessary unless we still want to deprecate the old writeflusher implementation

func NewLegacyWriteFlusher(w io.Writer) *LegacyWriteFlusher {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still used in a bunch of places in API, which makes the linter complain:

api/server/router/build/build_routes.go:234:12: SA1019: ioutils.NewWriteFlusher is deprecated: use the internal/writeflusher WriteFlusher instead. (staticcheck)
	output := ioutils.NewWriteFlusher(ww)
...

we should also replace these with the new implementation.

var fl flusher
if f, ok := w.(flusher); ok {
fl = f
} else {
fl = &NopFlusher{}
}
return &LegacyWriteFlusher{w: w, flusher: fl, closed: make(chan struct{}), flushed: make(chan struct{})}
}

// Deprecated: use the internal writeflusher.Write() instead
func (wf *LegacyWriteFlusher) Write(b []byte) (n int, err error) {
select {
case <-wf.closed:
return 0, errWriteFlusherClosed
default:
}

n, err = wf.w.Write(b)
wf.Flush() // every write is a flush.
return n, err
}

// Flush the stream immediately.
//
// Deprecated: use the internal writeflusher.Flush() instead
func (wf *LegacyWriteFlusher) Flush() {
select {
case <-wf.closed:
return
default:
}

wf.flushedOnce.Do(func() {
close(wf.flushed)
})
wf.flusher.Flush()
}

// Flushed returns the state of flushed.
// If it's flushed, return true, or else it return false.
//
// Deprecated: use the internal writeflusher.WriteFlusher instead
func (wf *LegacyWriteFlusher) Flushed() bool {
// BUG(stevvooe): Remove this method. Its use is inherently racy. Seems to
// be used to detect whether or a response code has been issued or not.
// Another hook should be used instead.
var flushed bool
select {
case <-wf.flushed:
flushed = true
default:
}
return flushed
}

// Close closes the write flusher, disallowing any further writes to the
// target. After the flusher is closed, all calls to write or flush will
// result in an error.
//
// Deprecated: use the internal writeflusher.Close() instead
func (wf *LegacyWriteFlusher) Close() error {
wf.closeLock.Lock()
defer wf.closeLock.Unlock()

select {
case <-wf.closed:
return errWriteFlusherClosed
default:
close(wf.closed)
}
return nil
}
89 changes: 89 additions & 0 deletions internal/writeflusher/writeflusher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package writeflusher

import (
"io"
"net/http"
"sync"
)

type flusher interface {
Flush()
}

var errWriteFlusherClosed = io.EOF

// NopFlusher represents a type which flush operation is nop.
type NopFlusher struct{}

// Flush is a nop operation.
func (f *NopFlusher) Flush() {}

// WriteFlusher wraps the Write and Flush operation ensuring that every write
// is a flush. In addition, the Close method can be called to intercept
// Read/Write calls if the targets lifecycle has already ended.
type WriteFlusher struct {
w io.Writer
flusher flusher
closed chan struct{}
closeLock sync.Mutex
firstFlush sync.Once
}

// NewWriteFlusher returns a new WriteFlusher.
func NewWriteFlusher(w io.Writer) *WriteFlusher {
var fl flusher
if f, ok := w.(flusher); ok {
fl = f
} else {
fl = &NopFlusher{}
}
return &WriteFlusher{w: w, flusher: fl, closed: make(chan struct{})}
}

func (wf *WriteFlusher) Write(b []byte) (n int, err error) {
select {
case <-wf.closed:
return 0, errWriteFlusherClosed
default:
}

n, err = wf.w.Write(b)
wf.Flush() // every write is a flush.
return n, err
}

// Flush the stream immediately.
func (wf *WriteFlusher) Flush() {
select {
case <-wf.closed:
return
default:
}

// Here we call WriteHeader() if the io.Writer is an http.ResponseWriter to ensure that we don't try
// to send headers multiple times if the writer has already been wrapped by OTEL instrumentation
// (which doesn't wrap the Flush() func. See https://github.com/moby/moby/issues/47448)
wf.firstFlush.Do(func() {
if rw, ok := wf.w.(http.ResponseWriter); ok {
rw.WriteHeader(http.StatusOK)
}
})

wf.flusher.Flush()
}

// Close closes the write flusher, disallowing any further writes to the
// target. After the flusher is closed, all calls to write or flush will
// result in an error.
func (wf *WriteFlusher) Close() error {
wf.closeLock.Lock()
defer wf.closeLock.Unlock()

select {
case <-wf.closed:
return errWriteFlusherClosed
default:
close(wf.closed)
}
return nil
}
90 changes: 6 additions & 84 deletions pkg/ioutils/writeflusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,91 +2,13 @@ package ioutils // import "github.com/docker/docker/pkg/ioutils"

import (
"io"
"sync"
)

// WriteFlusher wraps the Write and Flush operation ensuring that every write
// is a flush. In addition, the Close method can be called to intercept
// Read/Write calls if the targets lifecycle has already ended.
type WriteFlusher struct {
w io.Writer
flusher flusher
flushed chan struct{}
flushedOnce sync.Once
closed chan struct{}
closeLock sync.Mutex
}

type flusher interface {
Flush()
}

var errWriteFlusherClosed = io.EOF

func (wf *WriteFlusher) Write(b []byte) (n int, err error) {
select {
case <-wf.closed:
return 0, errWriteFlusherClosed
default:
}

n, err = wf.w.Write(b)
wf.Flush() // every write is a flush.
return n, err
}

// Flush the stream immediately.
func (wf *WriteFlusher) Flush() {
select {
case <-wf.closed:
return
default:
}

wf.flushedOnce.Do(func() {
close(wf.flushed)
})
wf.flusher.Flush()
}

// Flushed returns the state of flushed.
// If it's flushed, return true, or else it return false.
func (wf *WriteFlusher) Flushed() bool {
// BUG(stevvooe): Remove this method. Its use is inherently racy. Seems to
// be used to detect whether or a response code has been issued or not.
// Another hook should be used instead.
var flushed bool
select {
case <-wf.flushed:
flushed = true
default:
}
return flushed
}

// Close closes the write flusher, disallowing any further writes to the
// target. After the flusher is closed, all calls to write or flush will
// result in an error.
func (wf *WriteFlusher) Close() error {
wf.closeLock.Lock()
defer wf.closeLock.Unlock()

select {
case <-wf.closed:
return errWriteFlusherClosed
default:
close(wf.closed)
}
return nil
}
"github.com/docker/docker/internal/writeflusher"
)

// NewWriteFlusher returns a new WriteFlusher.
func NewWriteFlusher(w io.Writer) *WriteFlusher {
var fl flusher
if f, ok := w.(flusher); ok {
fl = f
} else {
fl = &NopFlusher{}
}
return &WriteFlusher{w: w, flusher: fl, closed: make(chan struct{}), flushed: make(chan struct{})}
//
// Deprecated: use the internal/writeflusher WriteFlusher instead.
func NewWriteFlusher(w io.Writer) *writeflusher.LegacyWriteFlusher {
return writeflusher.NewLegacyWriteFlusher(w)
}