From a3e97a6f15ad1989ef815b9bd5838192f9f226f1 Mon Sep 17 00:00:00 2001 From: shollyman Date: Wed, 28 Sep 2022 10:58:18 -0700 Subject: [PATCH] feat(bigquery/storage/managedwriter): switch to opt-in retry (#6765) This PR changes the write option from a disable (opt-out) to an enable (opt-in) for performing retries of appends. We add additional guidance to the option indicating its usage. We also expand the existing large insert integration test to validate the expected behavior both with and without retries enabled. --- bigquery/storage/managedwriter/client.go | 2 - .../storage/managedwriter/integration_test.go | 84 +++++++++++++++++-- .../storage/managedwriter/managed_stream.go | 2 +- bigquery/storage/managedwriter/options.go | 12 ++- .../storage/managedwriter/options_test.go | 17 +++- 5 files changed, 104 insertions(+), 13 deletions(-) diff --git a/bigquery/storage/managedwriter/client.go b/bigquery/storage/managedwriter/client.go index 4408f1a4094..bc94e3182df 100644 --- a/bigquery/storage/managedwriter/client.go +++ b/bigquery/storage/managedwriter/client.go @@ -117,8 +117,6 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10 * 1024 * 1024)), }, open: createOpenF(ctx, streamFunc), - // We add the new retryer by default, and add a new option to disable it. - retry: newStatelessRetryer(), } // apply writer options diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index 9db77641d2f..ec7c0c22520 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -159,8 +159,11 @@ func TestIntegration_ManagedWriter(t *testing.T) { // Don't run this in parallel, we only want to collect stats from this subtest. testInstrumentation(ctx, t, mwClient, bqClient, dataset) }) - t.Run("TestLargeInsert", func(t *testing.T) { - testLargeInsert(ctx, t, mwClient, bqClient, dataset) + t.Run("TestLargeInsertNoRetry", func(t *testing.T) { + testLargeInsertNoRetry(ctx, t, mwClient, bqClient, dataset) + }) + t.Run("TestLargeInsertWithRetry", func(t *testing.T) { + testLargeInsertWithRetry(ctx, t, mwClient, bqClient, dataset) }) }) } @@ -596,7 +599,73 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl withExactRowCount(int64(len(testSimpleData)))) } -func testLargeInsert(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { +func testLargeInsertNoRetry(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { + testTable := dataset.Table(tableIDs.New()) + if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { + t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) + } + + m := &testdata.SimpleMessageProto2{} + descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) + + ms, err := mwClient.NewManagedStream(ctx, + WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)), + WithType(CommittedStream), + WithSchemaDescriptor(descriptorProto), + ) + if err != nil { + t.Fatalf("NewManagedStream: %v", err) + } + validateTableConstraints(ctx, t, bqClient, testTable, "before send", + withExactRowCount(0)) + + // Construct a Very Large request. + var data [][]byte + targetSize := 11 * 1024 * 1024 // 11 MB + b, err := proto.Marshal(testSimpleData[0]) + if err != nil { + t.Errorf("failed to marshal message: %v", err) + } + + numRows := targetSize / len(b) + data = make([][]byte, numRows) + + for i := 0; i < numRows; i++ { + data[i] = b + } + + result, err := ms.AppendRows(ctx, data, WithOffset(0)) + if err != nil { + t.Errorf("single append failed: %v", err) + } + _, err = result.GetResult(ctx) + if err != nil { + apiErr, ok := apierror.FromError(err) + if !ok { + t.Errorf("GetResult error was not an instance of ApiError") + } + status := apiErr.GRPCStatus() + if status.Code() != codes.InvalidArgument { + t.Errorf("expected InvalidArgument status, got %v", status) + } + } + // our next append should fail (we don't have retries enabled). + if _, err = ms.AppendRows(ctx, [][]byte{b}); err == nil { + t.Fatalf("expected second append to fail, got success: %v", err) + } + + // The send failure triggers reconnect, so an additional append will succeed. + result, err = ms.AppendRows(ctx, [][]byte{b}) + if err != nil { + t.Fatalf("third append expected to succeed, got error: %v", err) + } + _, err = result.GetResult(ctx) + if err != nil { + t.Errorf("failure result from third append: %v", err) + } +} + +func testLargeInsertWithRetry(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { testTable := dataset.Table(tableIDs.New()) if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) @@ -609,6 +678,7 @@ func testLargeInsert(ctx context.Context, t *testing.T, mwClient *Client, bqClie WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)), WithType(CommittedStream), WithSchemaDescriptor(descriptorProto), + EnableWriteRetries(true), ) if err != nil { t.Fatalf("NewManagedStream: %v", err) @@ -646,15 +716,19 @@ func testLargeInsert(ctx context.Context, t *testing.T, mwClient *Client, bqClie t.Errorf("expected InvalidArgument status, got %v", status) } } - // send a subsequent append as verification we can proceed. + + // The second append will succeed, but internally will show a retry. result, err = ms.AppendRows(ctx, [][]byte{b}) if err != nil { - t.Fatalf("subsequent append failed: %v", err) + t.Fatalf("second append expected to succeed, got error: %v", err) } _, err = result.GetResult(ctx) if err != nil { t.Errorf("failure result from second append: %v", err) } + if attempts, _ := result.TotalAttempts(ctx); attempts != 2 { + t.Errorf("expected 2 attempts, got %d", attempts) + } } func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index c1efb18b576..8ccae000ab5 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -527,7 +527,7 @@ func recvProcessor(ms *ManagedStream, arc storagepb.BigQueryWrite_AppendRowsClie func (ms *ManagedStream) processRetry(pw *pendingWrite, appendResp *storagepb.AppendRowsResponse, initialErr error) { err := initialErr for { - pause, shouldRetry := ms.retry.Retry(err, pw.attemptCount) + pause, shouldRetry := ms.statelessRetryer().Retry(err, pw.attemptCount) if !shouldRetry { // Should not attempt to re-append. pw.markDone(appendResp, err, ms.fc) diff --git a/bigquery/storage/managedwriter/options.go b/bigquery/storage/managedwriter/options.go index 341c31e2636..342a04b3c1b 100644 --- a/bigquery/storage/managedwriter/options.go +++ b/bigquery/storage/managedwriter/options.go @@ -98,11 +98,15 @@ func WithAppendRowsCallOption(o gax.CallOption) WriterOption { } } -// DisableWriteRetries disables the logic for automatically re-enqueuing failed writes. -func DisableWriteRetries(disable bool) WriterOption { +// EnableWriteRetries enables ManagedStream to automatically retry failed appends. +// +// Enabling retries is best suited for cases where users want to achieve at-least-once +// append semantics. Use of automatic retries may complicate patterns where the user +// is designing for exactly-once append semantics. +func EnableWriteRetries(enable bool) WriterOption { return func(ms *ManagedStream) { - if disable { - ms.retry = nil + if enable { + ms.retry = newStatelessRetryer() } } } diff --git a/bigquery/storage/managedwriter/options_test.go b/bigquery/storage/managedwriter/options_test.go index 03551a9600d..ad7cf96c47f 100644 --- a/bigquery/storage/managedwriter/options_test.go +++ b/bigquery/storage/managedwriter/options_test.go @@ -21,6 +21,7 @@ import ( "cloud.google.com/go/bigquery/internal" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/googleapis/gax-go/v2" "google.golang.org/grpc" ) @@ -122,12 +123,24 @@ func TestWriterOptions(t *testing.T) { return ms }(), }, + { + desc: "EnableRetries", + options: []WriterOption{EnableWriteRetries(true)}, + want: func() *ManagedStream { + ms := &ManagedStream{ + streamSettings: defaultStreamSettings(), + } + ms.retry = newStatelessRetryer() + return ms + }(), + }, { desc: "multiple", options: []WriterOption{ WithType(PendingStream), WithMaxInflightBytes(5), WithTraceID("traceid"), + EnableWriteRetries(true), }, want: func() *ManagedStream { ms := &ManagedStream{ @@ -136,6 +149,7 @@ func TestWriterOptions(t *testing.T) { ms.streamSettings.MaxInflightBytes = 5 ms.streamSettings.streamType = PendingStream ms.streamSettings.TraceID = fmt.Sprintf("go-managedwriter:%s traceid", internal.Version) + ms.retry = newStatelessRetryer() return ms }(), }, @@ -151,7 +165,8 @@ func TestWriterOptions(t *testing.T) { if diff := cmp.Diff(got, tc.want, cmp.AllowUnexported(ManagedStream{}, streamSettings{}), - cmp.AllowUnexported(sync.Mutex{})); diff != "" { + cmp.AllowUnexported(sync.Mutex{}), + cmpopts.IgnoreUnexported(statelessRetryer{})); diff != "" { t.Errorf("diff in case (%s):\n%v", tc.desc, diff) } }