From 3b2258c996ca52b76c4e3697706c13da6d6faed1 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Wed, 28 Sep 2022 17:13:29 +0000 Subject: [PATCH 1/3] feat(bigquery/storage/managedwriter): switch to opt-in retry This PR changes the write option from a disable (opt-out) to an enable (opt-in) for performing retries of appends. We add additional guidance to the option indicating its usage. We also expand the existing large insert integration test to validate the expected behavior both with and without retries enabled. --- bigquery/storage/managedwriter/client.go | 2 - .../storage/managedwriter/integration_test.go | 86 +++++++++++++++++-- .../storage/managedwriter/managed_stream.go | 2 +- bigquery/storage/managedwriter/options.go | 12 ++- .../storage/managedwriter/options_test.go | 17 +++- 5 files changed, 105 insertions(+), 14 deletions(-) diff --git a/bigquery/storage/managedwriter/client.go b/bigquery/storage/managedwriter/client.go index 4408f1a4094..bc94e3182df 100644 --- a/bigquery/storage/managedwriter/client.go +++ b/bigquery/storage/managedwriter/client.go @@ -117,8 +117,6 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10 * 1024 * 1024)), }, open: createOpenF(ctx, streamFunc), - // We add the new retryer by default, and add a new option to disable it. - retry: newStatelessRetryer(), } // apply writer options diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index 9db77641d2f..ba7d4949116 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -159,8 +159,11 @@ func TestIntegration_ManagedWriter(t *testing.T) { // Don't run this in parallel, we only want to collect stats from this subtest. testInstrumentation(ctx, t, mwClient, bqClient, dataset) }) - t.Run("TestLargeInsert", func(t *testing.T) { - testLargeInsert(ctx, t, mwClient, bqClient, dataset) + t.Run("TestLargeInsertNoRetry", func(t *testing.T) { + testLargeInsertNoRetry(ctx, t, mwClient, bqClient, dataset) + }) + t.Run("TestLargeInsertWithRetry", func(t *testing.T) { + testLargeInsertWithRetry(ctx, t, mwClient, bqClient, dataset) }) }) } @@ -596,7 +599,73 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl withExactRowCount(int64(len(testSimpleData)))) } -func testLargeInsert(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { +func testLargeInsertNoRetry(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { + testTable := dataset.Table(tableIDs.New()) + if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { + t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) + } + + m := &testdata.SimpleMessageProto2{} + descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) + + ms, err := mwClient.NewManagedStream(ctx, + WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)), + WithType(CommittedStream), + WithSchemaDescriptor(descriptorProto), + ) + if err != nil { + t.Fatalf("NewManagedStream: %v", err) + } + validateTableConstraints(ctx, t, bqClient, testTable, "before send", + withExactRowCount(0)) + + // Construct a Very Large request. + var data [][]byte + targetSize := 11 * 1024 * 1024 // 11 MB + b, err := proto.Marshal(testSimpleData[0]) + if err != nil { + t.Errorf("failed to marshal message: %v", err) + } + + numRows := targetSize / len(b) + data = make([][]byte, numRows) + + for i := 0; i < numRows; i++ { + data[i] = b + } + + result, err := ms.AppendRows(ctx, data, WithOffset(0)) + if err != nil { + t.Errorf("single append failed: %v", err) + } + _, err = result.GetResult(ctx) + if err != nil { + apiErr, ok := apierror.FromError(err) + if !ok { + t.Errorf("GetResult error was not an instance of ApiError") + } + status := apiErr.GRPCStatus() + if status.Code() != codes.InvalidArgument { + t.Errorf("expected InvalidArgument status, got %v", status) + } + } + // our next append should fail (we don't have retries enabled). + if _, err = ms.AppendRows(ctx, [][]byte{b}); err == nil { + t.Fatalf("expected second append to fail, got success: %v", err) + } + + // The send failure triggers reconnect, so an additional append will succeed. + result, err = ms.AppendRows(ctx, [][]byte{b}) + if err != nil { + t.Fatalf("third append expected to succeed, got error: %v", err) + } + _, err = result.GetResult(ctx) + if err != nil { + t.Errorf("failure result from third append: %v", err) + } +} + +func testLargeInsertWithRetry(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { testTable := dataset.Table(tableIDs.New()) if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) @@ -609,6 +678,7 @@ func testLargeInsert(ctx context.Context, t *testing.T, mwClient *Client, bqClie WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)), WithType(CommittedStream), WithSchemaDescriptor(descriptorProto), + EnableWriteRetries(true), ) if err != nil { t.Fatalf("NewManagedStream: %v", err) @@ -646,14 +716,18 @@ func testLargeInsert(ctx context.Context, t *testing.T, mwClient *Client, bqClie t.Errorf("expected InvalidArgument status, got %v", status) } } - // send a subsequent append as verification we can proceed. + + // The second append will succeed, but internally will show a retry. result, err = ms.AppendRows(ctx, [][]byte{b}) if err != nil { - t.Fatalf("subsequent append failed: %v", err) + t.Fatalf("third append expected to succeed, got error: %v", err) } _, err = result.GetResult(ctx) if err != nil { - t.Errorf("failure result from second append: %v", err) + t.Errorf("failure result from third append: %v", err) + } + if attempts, _ := result.TotalAttempts(ctx); attempts != 2 { + t.Errorf("expected 2 attempts, got %d", attempts) } } diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index c1efb18b576..8ccae000ab5 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -527,7 +527,7 @@ func recvProcessor(ms *ManagedStream, arc storagepb.BigQueryWrite_AppendRowsClie func (ms *ManagedStream) processRetry(pw *pendingWrite, appendResp *storagepb.AppendRowsResponse, initialErr error) { err := initialErr for { - pause, shouldRetry := ms.retry.Retry(err, pw.attemptCount) + pause, shouldRetry := ms.statelessRetryer().Retry(err, pw.attemptCount) if !shouldRetry { // Should not attempt to re-append. pw.markDone(appendResp, err, ms.fc) diff --git a/bigquery/storage/managedwriter/options.go b/bigquery/storage/managedwriter/options.go index 341c31e2636..644cf0705c3 100644 --- a/bigquery/storage/managedwriter/options.go +++ b/bigquery/storage/managedwriter/options.go @@ -98,11 +98,15 @@ func WithAppendRowsCallOption(o gax.CallOption) WriterOption { } } -// DisableWriteRetries disables the logic for automatically re-enqueuing failed writes. -func DisableWriteRetries(disable bool) WriterOption { +// EnableWriteRetry enables ManagedStream to automatically retry failed appends. +// +// Enabling retries is best suited for cases where users want to achieve at-least-once +// append semantics. Use of automatic retries may complicate patterns where the user +// is designing for exactly-once append semantics. +func EnableWriteRetries(enable bool) WriterOption { return func(ms *ManagedStream) { - if disable { - ms.retry = nil + if enable { + ms.retry = newStatelessRetryer() } } } diff --git a/bigquery/storage/managedwriter/options_test.go b/bigquery/storage/managedwriter/options_test.go index 03551a9600d..ad7cf96c47f 100644 --- a/bigquery/storage/managedwriter/options_test.go +++ b/bigquery/storage/managedwriter/options_test.go @@ -21,6 +21,7 @@ import ( "cloud.google.com/go/bigquery/internal" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/googleapis/gax-go/v2" "google.golang.org/grpc" ) @@ -122,12 +123,24 @@ func TestWriterOptions(t *testing.T) { return ms }(), }, + { + desc: "EnableRetries", + options: []WriterOption{EnableWriteRetries(true)}, + want: func() *ManagedStream { + ms := &ManagedStream{ + streamSettings: defaultStreamSettings(), + } + ms.retry = newStatelessRetryer() + return ms + }(), + }, { desc: "multiple", options: []WriterOption{ WithType(PendingStream), WithMaxInflightBytes(5), WithTraceID("traceid"), + EnableWriteRetries(true), }, want: func() *ManagedStream { ms := &ManagedStream{ @@ -136,6 +149,7 @@ func TestWriterOptions(t *testing.T) { ms.streamSettings.MaxInflightBytes = 5 ms.streamSettings.streamType = PendingStream ms.streamSettings.TraceID = fmt.Sprintf("go-managedwriter:%s traceid", internal.Version) + ms.retry = newStatelessRetryer() return ms }(), }, @@ -151,7 +165,8 @@ func TestWriterOptions(t *testing.T) { if diff := cmp.Diff(got, tc.want, cmp.AllowUnexported(ManagedStream{}, streamSettings{}), - cmp.AllowUnexported(sync.Mutex{})); diff != "" { + cmp.AllowUnexported(sync.Mutex{}), + cmpopts.IgnoreUnexported(statelessRetryer{})); diff != "" { t.Errorf("diff in case (%s):\n%v", tc.desc, diff) } } From d7cd9ac0baed43c9923e315dc996bcf65df55cdf Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Wed, 28 Sep 2022 17:18:06 +0000 Subject: [PATCH 2/3] correct error string --- bigquery/storage/managedwriter/integration_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index ba7d4949116..ec7c0c22520 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -720,11 +720,11 @@ func testLargeInsertWithRetry(ctx context.Context, t *testing.T, mwClient *Clien // The second append will succeed, but internally will show a retry. result, err = ms.AppendRows(ctx, [][]byte{b}) if err != nil { - t.Fatalf("third append expected to succeed, got error: %v", err) + t.Fatalf("second append expected to succeed, got error: %v", err) } _, err = result.GetResult(ctx) if err != nil { - t.Errorf("failure result from third append: %v", err) + t.Errorf("failure result from second append: %v", err) } if attempts, _ := result.TotalAttempts(ctx); attempts != 2 { t.Errorf("expected 2 attempts, got %d", attempts) From 59968a851d1642bcdd302ed7e12524254fcc35b3 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Wed, 28 Sep 2022 17:37:50 +0000 Subject: [PATCH 3/3] lint --- bigquery/storage/managedwriter/options.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigquery/storage/managedwriter/options.go b/bigquery/storage/managedwriter/options.go index 644cf0705c3..342a04b3c1b 100644 --- a/bigquery/storage/managedwriter/options.go +++ b/bigquery/storage/managedwriter/options.go @@ -98,7 +98,7 @@ func WithAppendRowsCallOption(o gax.CallOption) WriterOption { } } -// EnableWriteRetry enables ManagedStream to automatically retry failed appends. +// EnableWriteRetries enables ManagedStream to automatically retry failed appends. // // Enabling retries is best suited for cases where users want to achieve at-least-once // append semantics. Use of automatic retries may complicate patterns where the user