Skip to content

Commit

Permalink
Created new WriteFlusher in /internal with otel related fixes (call W…
Browse files Browse the repository at this point in the history
…rite() on first Flush())

Signed-off-by: Christopher Petito <47751006+krissetto@users.noreply.github.com>
  • Loading branch information
krissetto committed Apr 17, 2024
1 parent 7decc85 commit b4ebe28
Showing 1 changed file with 86 additions and 0 deletions.
86 changes: 86 additions & 0 deletions internal/writeflusher/writeflusher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package writeflusher

import (
"io"
"sync"
)

type flusher interface {
Flush()
}

var errWriteFlusherClosed = io.EOF

// NopFlusher represents a type which flush operation is nop.
type NopFlusher struct{}

// Flush is a nop operation.
func (f *NopFlusher) Flush() {}

// WriteFlusher wraps the Write and Flush operation ensuring that every write
// is a flush. In addition, the Close method can be called to intercept
// Read/Write calls if the targets lifecycle has already ended.
type WriteFlusher struct {
w io.Writer
flusher flusher
closed chan struct{}
closeLock sync.Mutex
firstFlush sync.Once
}

// NewWriteFlusher returns a new WriteFlusher.
func NewWriteFlusher(w io.Writer) *WriteFlusher {
var fl flusher
if f, ok := w.(flusher); ok {
fl = f
} else {
fl = &NopFlusher{}
}
return &WriteFlusher{w: w, flusher: fl, closed: make(chan struct{})}
}

func (wf *WriteFlusher) Write(b []byte) (n int, err error) {
select {
case <-wf.closed:
return 0, errWriteFlusherClosed
default:
}

n, err = wf.w.Write(b)
wf.Flush() // every write is a flush.
return n, err
}

// Flush the stream immediately.
func (wf *WriteFlusher) Flush() {
select {
case <-wf.closed:
return
default:
}

// Here we Write() to ensure WriteHeader() gets called appropriately during the first flush
// even when the ResponseWriter has already been wrapped by OTEL instrumentation
// (which doesn't wrap the Flush() func. See https://github.com/moby/moby/pull/47715)
wf.firstFlush.Do(func() {
wf.w.Write([]byte{})
})

wf.flusher.Flush()
}

// Close closes the write flusher, disallowing any further writes to the
// target. After the flusher is closed, all calls to write or flush will
// result in an error.
func (wf *WriteFlusher) Close() error {
wf.closeLock.Lock()
defer wf.closeLock.Unlock()

select {
case <-wf.closed:
return errWriteFlusherClosed
default:
close(wf.closed)
}
return nil
}

0 comments on commit b4ebe28

Please sign in to comment.