From c987a5bccb53c29eb344aad2b37f6209a8b256fd Mon Sep 17 00:00:00 2001 From: Chris Cotter Date: Tue, 1 Feb 2022 14:47:56 -0500 Subject: [PATCH] feat(gensupport): per-chunk deadline configs (#1414) Allow users to configure the per-chunk deadline for retries that's used during resumable uploads. Needs to be exposed via the manual layer for storage. Fixes #685 --- go.mod | 1 - googleapi/googleapi.go | 23 ++++++++++++++-- internal/gensupport/media.go | 16 ++++++----- internal/gensupport/media_test.go | 38 ++++++++++++++++++++++++++- internal/gensupport/resumable.go | 12 +++++++++ internal/gensupport/resumable_test.go | 22 +++++++++------- internal/gensupport/retry.go | 4 +-- 7 files changed, 95 insertions(+), 21 deletions(-) diff --git a/go.mod b/go.mod index 65c6f6c45b5..6e9d79241d1 100644 --- a/go.mod +++ b/go.mod @@ -16,5 +16,4 @@ require ( google.golang.org/appengine v1.6.7 google.golang.org/genproto v0.0.0-20220126215142-9970aeb2e350 google.golang.org/grpc v1.40.1 - gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect ) diff --git a/googleapi/googleapi.go b/googleapi/googleapi.go index 1a79e6d533f..ae010adce9d 100644 --- a/googleapi/googleapi.go +++ b/googleapi/googleapi.go @@ -15,6 +15,7 @@ import ( "net/http" "net/url" "strings" + "time" "google.golang.org/api/internal/third_party/uritemplates" ) @@ -245,12 +246,30 @@ func ChunkSize(size int) MediaOption { return chunkSizeOption(size) } +type chunkRetryDeadlineOption time.Duration + +func (cd chunkRetryDeadlineOption) setOptions(o *MediaOptions) { + o.ChunkRetryDeadline = time.Duration(cd) +} + +// ChunkRetryDeadline returns a MediaOption which sets a per-chunk retry +// deadline. If a single chunk has been attempting to upload for longer than +// this time and the request fails, it will no longer be retried, and the error +// will be returned to the caller. +// This is only applicable for files which are large enough to require +// a multi-chunk resumable upload. +// The default value is 32s. +// To set a deadline on the entire upload, use context timeout or cancellation. +func ChunkRetryDeadline(deadline time.Duration) MediaOption { + return chunkRetryDeadlineOption(deadline) +} + // MediaOptions stores options for customizing media upload. It is not used by developers directly. type MediaOptions struct { ContentType string ForceEmptyContentType bool - - ChunkSize int + ChunkSize int + ChunkRetryDeadline time.Duration } // ProcessMediaOptions stores options from opts in a MediaOptions. diff --git a/internal/gensupport/media.go b/internal/gensupport/media.go index 0460ab59406..d14a22470c1 100644 --- a/internal/gensupport/media.go +++ b/internal/gensupport/media.go @@ -15,6 +15,7 @@ import ( "net/textproto" "strings" "sync" + "time" "google.golang.org/api/googleapi" ) @@ -217,12 +218,13 @@ func PrepareUpload(media io.Reader, chunkSize int) (r io.Reader, mb *MediaBuffer // code only. type MediaInfo struct { // At most one of Media and MediaBuffer will be set. - media io.Reader - buffer *MediaBuffer - singleChunk bool - mType string - size int64 // mediaSize, if known. Used only for calls to progressUpdater_. - progressUpdater googleapi.ProgressUpdater + media io.Reader + buffer *MediaBuffer + singleChunk bool + mType string + size int64 // mediaSize, if known. Used only for calls to progressUpdater_. + progressUpdater googleapi.ProgressUpdater + chunkRetryDeadline time.Duration } // NewInfoFromMedia should be invoked from the Media method of a call. It returns a @@ -234,6 +236,7 @@ func NewInfoFromMedia(r io.Reader, options []googleapi.MediaOption) *MediaInfo { if !opts.ForceEmptyContentType { r, mi.mType = DetermineContentType(r, opts.ContentType) } + mi.chunkRetryDeadline = opts.ChunkRetryDeadline mi.media, mi.buffer, mi.singleChunk = PrepareUpload(r, opts.ChunkSize) return mi } @@ -356,6 +359,7 @@ func (mi *MediaInfo) ResumableUpload(locURI string) *ResumableUpload { mi.progressUpdater(curr, mi.size) } }, + ChunkRetryDeadline: mi.chunkRetryDeadline, } } diff --git a/internal/gensupport/media_test.go b/internal/gensupport/media_test.go index ee4f6d7ace4..c05cc0cd739 100644 --- a/internal/gensupport/media_test.go +++ b/internal/gensupport/media_test.go @@ -14,6 +14,7 @@ import ( "reflect" "strings" "testing" + "time" "google.golang.org/api/googleapi" ) @@ -155,6 +156,7 @@ func TestNewInfoFromMedia(t *testing.T) { opts []googleapi.MediaOption wantType string wantMedia, wantBuffer, wantSingleChunk bool + wantDeadline time.Duration }{ { desc: "an empty reader results in a MediaBuffer with a single, empty chunk", @@ -172,6 +174,15 @@ func TestNewInfoFromMedia(t *testing.T) { wantBuffer: true, wantSingleChunk: true, }, + { + desc: "ChunkRetryDeadline is observed", + r: new(bytes.Buffer), + opts: []googleapi.MediaOption{googleapi.ChunkRetryDeadline(time.Second)}, + wantType: textType, + wantBuffer: true, + wantSingleChunk: true, + wantDeadline: time.Second, + }, { desc: "chunk size of zero: don't use a MediaBuffer; upload as a single chunk", r: strings.NewReader("12345"), @@ -220,6 +231,9 @@ func TestNewInfoFromMedia(t *testing.T) { if got, want := mi.singleChunk, test.wantSingleChunk; got != want { t.Errorf("%s: singleChunk: got %t, want %t", test.desc, got, want) } + if got, want := mi.chunkRetryDeadline, test.wantDeadline; got != want { + t.Errorf("%s: chunkRetryDeadline: got %v, want %v", test.desc, got, want) + } } } @@ -341,6 +355,7 @@ func TestResumableUpload(t *testing.T) { chunkSize int wantUploadType string wantResumableUpload bool + chunkRetryDeadline time.Duration }{ { desc: "chunk size of zero: don't use a MediaBuffer; upload as a single chunk", @@ -372,14 +387,35 @@ func TestResumableUpload(t *testing.T) { wantUploadType: "resumable", wantResumableUpload: true, }, + { + desc: "confirm that ChunkRetryDeadline is carried to ResumableUpload", + r: &nullReader{2 * googleapi.MinUploadChunkSize}, + chunkSize: 1, + wantUploadType: "resumable", + wantResumableUpload: true, + chunkRetryDeadline: 1 * time.Second, + }, } { - mi := NewInfoFromMedia(test.r, []googleapi.MediaOption{googleapi.ChunkSize(test.chunkSize)}) + opts := []googleapi.MediaOption{googleapi.ChunkSize(test.chunkSize)} + if test.chunkRetryDeadline != 0 { + opts = append(opts, googleapi.ChunkRetryDeadline(test.chunkRetryDeadline)) + } + mi := NewInfoFromMedia(test.r, opts) if got, want := mi.UploadType(), test.wantUploadType; got != want { t.Errorf("%s: upload type: got %q, want %q", test.desc, got, want) } if got, want := mi.ResumableUpload("") != nil, test.wantResumableUpload; got != want { t.Errorf("%s: resumable upload non-nil: got %t, want %t", test.desc, got, want) } + if test.chunkRetryDeadline != 0 { + if got := mi.ResumableUpload(""); got != nil { + if got.ChunkRetryDeadline != test.chunkRetryDeadline { + t.Errorf("%s: ChunkRetryDeadline: got %v, want %v", test.desc, got.ChunkRetryDeadline, test.chunkRetryDeadline) + } + } else { + t.Errorf("%s: test case invalid; resumable upload is nil", test.desc) + } + } } } diff --git a/internal/gensupport/resumable.go b/internal/gensupport/resumable.go index 6796a07c984..ce4272509f6 100644 --- a/internal/gensupport/resumable.go +++ b/internal/gensupport/resumable.go @@ -34,6 +34,10 @@ type ResumableUpload struct { // Retry optionally configures retries for requests made against the upload. Retry *RetryConfig + + // ChunkRetryDeadline configures the per-chunk deadline after which no further + // retries should happen. + ChunkRetryDeadline time.Duration } // Progress returns the number of bytes uploaded at this point. @@ -156,6 +160,14 @@ func (rx *ResumableUpload) Upload(ctx context.Context) (resp *http.Response, err // Configure retryable error criteria. errorFunc := rx.Retry.errorFunc() + // Configure per-chunk retry deadline. + var retryDeadline time.Duration + if rx.ChunkRetryDeadline != 0 { + retryDeadline = rx.ChunkRetryDeadline + } else { + retryDeadline = defaultRetryDeadline + } + // Send all chunks. for { var pause time.Duration diff --git a/internal/gensupport/resumable_test.go b/internal/gensupport/resumable_test.go index bffd88098e2..3d6bc068bab 100644 --- a/internal/gensupport/resumable_test.go +++ b/internal/gensupport/resumable_test.go @@ -305,6 +305,9 @@ func TestRetry_EachChunkHasItsOwnRetryDeadline(t *testing.T) { ) media := strings.NewReader(strings.Repeat("a", mediaSize)) + // This transport returns multiple errors on both the first chunk and third + // chunk of the upload. If the timeout were not reset between chunks, the + // errors on the third chunk would not retry and cause a failure. tr := &interruptibleTransport{ buf: make([]byte, 0, mediaSize), events: []event{ @@ -320,24 +323,25 @@ func TestRetry_EachChunkHasItsOwnRetryDeadline(t *testing.T) { // cum: 1s sleep <-- resets because it's a new chunk {"bytes 90-179/*", 308}, // cum: 1s sleep <-- resets because it's a new chunk + {"bytes 180-269/*", http.StatusServiceUnavailable}, + // cum: 1s sleep on later chunk + {"bytes 180-269/*", http.StatusServiceUnavailable}, + // cum: 2s sleep on later chunk {"bytes 180-269/*", 308}, - // cum: 1s sleep <-- resets because it's a new chunk + // cum: 3s sleep <-- resets because it's a new chunk {"bytes 270-299/300", 200}, }, bodies: bodyTracker{}, } rx := &ResumableUpload{ - Client: &http.Client{Transport: tr}, - Media: NewMediaBuffer(media, chunkSize), - MediaType: "text/plain", - Callback: func(int64) {}, + Client: &http.Client{Transport: tr}, + Media: NewMediaBuffer(media, chunkSize), + MediaType: "text/plain", + Callback: func(int64) {}, + ChunkRetryDeadline: 5 * time.Second, } - oldRetryDeadline := retryDeadline - retryDeadline = 5 * time.Second - defer func() { retryDeadline = oldRetryDeadline }() - oldBackoff := backoff backoff = func() Backoff { return new(PauseOneSecond) } defer func() { backoff = oldBackoff }() diff --git a/internal/gensupport/retry.go b/internal/gensupport/retry.go index cea725a3b16..873dab36334 100644 --- a/internal/gensupport/retry.go +++ b/internal/gensupport/retry.go @@ -20,8 +20,8 @@ type Backoff interface { // These are declared as global variables so that tests can overwrite them. var ( - // Per-chunk deadline for resumable uploads. - retryDeadline = 32 * time.Second + // Default per-chunk deadline for resumable uploads. + defaultRetryDeadline = 32 * time.Second // Default backoff timer. backoff = func() Backoff { return &gax.Backoff{Initial: 100 * time.Millisecond}