Skip to content

Commit

Permalink
copy: check if writer was closed before setting a pipe
Browse files Browse the repository at this point in the history
If Close is called externally before a request is attempted, then we
will accidentally attempt to send to a closed channel, causing a panic.

To avoid this, we can check to see if Close has been called, using a
done channel. If this channel is ever done, we drop any incoming errors,
requests or pipes - we don't need them, since we're done.

Signed-off-by: Justin Chadwell <me@jedevc.com>
  • Loading branch information
jedevc committed Mar 4, 2024
1 parent 2a25c08 commit d081da8
Showing 1 changed file with 35 additions and 19 deletions.
54 changes: 35 additions & 19 deletions remotes/docker/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,10 +335,12 @@ type pushWriter struct {

pipe *io.PipeWriter

pipeC chan *io.PipeWriter
respC chan *http.Response
done chan struct{}
closeOnce sync.Once
errC chan error

pipeC chan *io.PipeWriter
respC chan *http.Response
errC chan error

isManifest bool

Expand All @@ -356,19 +358,30 @@ func newPushWriter(db *dockerBase, ref string, expected digest.Digest, tracker S
pipeC: make(chan *io.PipeWriter, 1),
respC: make(chan *http.Response, 1),
errC: make(chan error, 1),
done: make(chan struct{}),
isManifest: isManifest,
}
}

func (pw *pushWriter) setPipe(p *io.PipeWriter) {
pw.pipeC <- p
select {
case <-pw.done:
case pw.pipeC <- p:
}
}

func (pw *pushWriter) setError(err error) {
pw.errC <- err
select {
case <-pw.done:
case pw.errC <- err:
}
}

func (pw *pushWriter) setResponse(resp *http.Response) {
pw.respC <- resp
select {
case <-pw.done:
case pw.respC <- resp:
}
}

func (pw *pushWriter) Write(p []byte) (n int, err error) {
Expand All @@ -378,22 +391,26 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) {
}

if pw.pipe == nil {
p, ok := <-pw.pipeC
if !ok {
select {
case <-pw.done:
return 0, io.ErrClosedPipe
case p := <-pw.pipeC:
pw.pipe = p
}
pw.pipe = p
} else {
select {
case p, ok := <-pw.pipeC:
if !ok {
return 0, io.ErrClosedPipe
}
case <-pw.done:
return 0, io.ErrClosedPipe
case p := <-pw.pipeC:
pw.pipe.CloseWithError(content.ErrReset)
pw.pipe = p

// If content has already been written, the bytes
// cannot be written and the caller must reset
// cannot be written again and the caller must reset
status, err := pw.tracker.GetStatus(pw.ref)
if err != nil {
return 0, err
}
status.Offset = 0
status.UpdatedAt = time.Now()
pw.tracker.SetStatus(pw.ref, status)
Expand Down Expand Up @@ -422,7 +439,7 @@ func (pw *pushWriter) Close() error {
// Ensure pipeC is closed but handle `Close()` being
// called multiple times without panicking
pw.closeOnce.Do(func() {
close(pw.pipeC)
close(pw.done)
})
if pw.pipe != nil {
status, err := pw.tracker.GetStatus(pw.ref)
Expand Down Expand Up @@ -462,17 +479,16 @@ func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Di
// TODO: timeout waiting for response
var resp *http.Response
select {
case <-pw.done:
return io.ErrClosedPipe
case err := <-pw.errC:
return err
case resp = <-pw.respC:
defer resp.Body.Close()
case p, ok := <-pw.pipeC:
case p := <-pw.pipeC:
// check whether the pipe has changed in the commit, because sometimes Write
// can complete successfully, but the pipe may have changed. In that case, the
// content needs to be reset.
if !ok {
return io.ErrClosedPipe
}
pw.pipe.CloseWithError(content.ErrReset)
pw.pipe = p

Expand Down

0 comments on commit d081da8

Please sign in to comment.