Skip to content

Commit

Permalink
use context to wait for cancelation on GetObject() (#1666)
Browse files Browse the repository at this point in the history
it is potentially possible that caller has canceled
the context, however the GetObject() go-routine will
be stuck forever since no one is reading anymore.
  • Loading branch information
harshavardhana committed Jun 17, 2022
1 parent ff482a1 commit ad1e06e
Showing 1 changed file with 21 additions and 14 deletions.
35 changes: 21 additions & 14 deletions api-get-object.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o
return nil, err
}

gctx, cancel := context.WithCancel(ctx)

// Detect if snowball is server location we are talking to.
var snowball bool
if location, ok := c.bucketLocCache.Get(bucketName); ok {
Expand All @@ -59,8 +61,6 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o
reqCh := make(chan getRequest)
// Create response channel.
resCh := make(chan getResponse)
// Create done channel.
doneCh := make(chan struct{})

// This routine feeds partial object data as and when the caller reads.
go func() {
Expand All @@ -73,8 +73,8 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o
// Loop through the incoming control messages and read data.
for {
select {
// When the done channel is closed exit our routine.
case <-doneCh:
// When context is closed exit our routine.
case <-gctx.Done():
// Close the http response body before returning.
// This ends the connection with the server.
if httpReader != nil {
Expand All @@ -98,7 +98,7 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o
} else if req.Offset > 0 {
opts.SetRange(req.Offset, 0)
}
httpReader, objectInfo, _, err = c.getObject(ctx, bucketName, objectName, opts)
httpReader, objectInfo, _, err = c.getObject(gctx, bucketName, objectName, opts)
if err != nil {
resCh <- getResponse{Error: err}
return
Expand Down Expand Up @@ -140,7 +140,7 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o

// Remove range header if already set, for stat Operations to get original file size.
delete(opts.headers, "Range")
objectInfo, err = c.StatObject(ctx, bucketName, objectName, StatObjectOptions(opts))
objectInfo, err = c.StatObject(gctx, bucketName, objectName, StatObjectOptions(opts))
if err != nil {
resCh <- getResponse{
Error: err,
Expand All @@ -163,7 +163,7 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o
if etag != "" && !snowball {
opts.SetMatchETag(etag)
}
objectInfo, err := c.StatObject(ctx, bucketName, objectName, StatObjectOptions(opts))
objectInfo, err := c.StatObject(gctx, bucketName, objectName, StatObjectOptions(opts))
if err != nil {
resCh <- getResponse{
Error: err,
Expand Down Expand Up @@ -203,7 +203,7 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o
// Remove range header if already set
delete(opts.headers, "Range")
}
httpReader, objectInfo, _, err = c.getObject(ctx, bucketName, objectName, opts)
httpReader, objectInfo, _, err = c.getObject(gctx, bucketName, objectName, opts)
if err != nil {
resCh <- getResponse{
Error: err,
Expand Down Expand Up @@ -250,7 +250,7 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o
}()

// Create a newObject through the information sent back by reqCh.
return newObject(reqCh, resCh, doneCh), nil
return newObject(gctx, cancel, reqCh, resCh), nil
}

// get request message container to communicate with internal
Expand Down Expand Up @@ -283,7 +283,8 @@ type Object struct {
// User allocated and defined.
reqCh chan<- getRequest
resCh <-chan getResponse
doneCh chan<- struct{}
ctx context.Context
cancel context.CancelFunc
currOffset int64
objectInfo ObjectInfo

Expand Down Expand Up @@ -311,7 +312,12 @@ type Object struct {
// as any error encountered. For all first requests sent on the object
// it is also responsible for sending back the objectInfo.
func (o *Object) doGetRequest(request getRequest) (getResponse, error) {
o.reqCh <- request
select {
case <-o.ctx.Done():
return getResponse{}, o.ctx.Err()
case o.reqCh <- request:
}

response := <-o.resCh

// Return any error to the top level.
Expand Down Expand Up @@ -615,7 +621,7 @@ func (o *Object) Close() (err error) {
}

// Close successfully.
close(o.doneCh)
o.cancel()

// Save for future operations.
errMsg := "Object is already closed. Bad file descriptor."
Expand All @@ -627,12 +633,13 @@ func (o *Object) Close() (err error) {

// newObject instantiates a new *minio.Object*
// ObjectInfo will be set by setObjectInfo
func newObject(reqCh chan<- getRequest, resCh <-chan getResponse, doneCh chan<- struct{}) *Object {
func newObject(ctx context.Context, cancel context.CancelFunc, reqCh chan<- getRequest, resCh <-chan getResponse) *Object {
return &Object{
ctx: ctx,
cancel: cancel,
mutex: &sync.Mutex{},
reqCh: reqCh,
resCh: resCh,
doneCh: doneCh,
}
}

Expand Down

0 comments on commit ad1e06e

Please sign in to comment.