diff --git a/bigquery/storage/managedwriter/client.go b/bigquery/storage/managedwriter/client.go index 4408f1a4094..bc94e3182df 100644 --- a/bigquery/storage/managedwriter/client.go +++ b/bigquery/storage/managedwriter/client.go @@ -117,8 +117,6 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10 * 1024 * 1024)), }, open: createOpenF(ctx, streamFunc), - // We add the new retryer by default, and add a new option to disable it. - retry: newStatelessRetryer(), } // apply writer options diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index 9db77641d2f..ec7c0c22520 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -159,8 +159,11 @@ func TestIntegration_ManagedWriter(t *testing.T) { // Don't run this in parallel, we only want to collect stats from this subtest. testInstrumentation(ctx, t, mwClient, bqClient, dataset) }) - t.Run("TestLargeInsert", func(t *testing.T) { - testLargeInsert(ctx, t, mwClient, bqClient, dataset) + t.Run("TestLargeInsertNoRetry", func(t *testing.T) { + testLargeInsertNoRetry(ctx, t, mwClient, bqClient, dataset) + }) + t.Run("TestLargeInsertWithRetry", func(t *testing.T) { + testLargeInsertWithRetry(ctx, t, mwClient, bqClient, dataset) }) }) } @@ -596,7 +599,73 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl withExactRowCount(int64(len(testSimpleData)))) } -func testLargeInsert(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { +func testLargeInsertNoRetry(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { + testTable := dataset.Table(tableIDs.New()) + if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { + t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) + } + + m := &testdata.SimpleMessageProto2{} + descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) + + ms, err := mwClient.NewManagedStream(ctx, + WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)), + WithType(CommittedStream), + WithSchemaDescriptor(descriptorProto), + ) + if err != nil { + t.Fatalf("NewManagedStream: %v", err) + } + validateTableConstraints(ctx, t, bqClient, testTable, "before send", + withExactRowCount(0)) + + // Construct a Very Large request. + var data [][]byte + targetSize := 11 * 1024 * 1024 // 11 MB + b, err := proto.Marshal(testSimpleData[0]) + if err != nil { + t.Errorf("failed to marshal message: %v", err) + } + + numRows := targetSize / len(b) + data = make([][]byte, numRows) + + for i := 0; i < numRows; i++ { + data[i] = b + } + + result, err := ms.AppendRows(ctx, data, WithOffset(0)) + if err != nil { + t.Errorf("single append failed: %v", err) + } + _, err = result.GetResult(ctx) + if err != nil { + apiErr, ok := apierror.FromError(err) + if !ok { + t.Errorf("GetResult error was not an instance of ApiError") + } + status := apiErr.GRPCStatus() + if status.Code() != codes.InvalidArgument { + t.Errorf("expected InvalidArgument status, got %v", status) + } + } + // our next append should fail (we don't have retries enabled). + if _, err = ms.AppendRows(ctx, [][]byte{b}); err == nil { + t.Fatalf("expected second append to fail, got success: %v", err) + } + + // The send failure triggers reconnect, so an additional append will succeed. + result, err = ms.AppendRows(ctx, [][]byte{b}) + if err != nil { + t.Fatalf("third append expected to succeed, got error: %v", err) + } + _, err = result.GetResult(ctx) + if err != nil { + t.Errorf("failure result from third append: %v", err) + } +} + +func testLargeInsertWithRetry(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { testTable := dataset.Table(tableIDs.New()) if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) @@ -609,6 +678,7 @@ func testLargeInsert(ctx context.Context, t *testing.T, mwClient *Client, bqClie WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)), WithType(CommittedStream), WithSchemaDescriptor(descriptorProto), + EnableWriteRetries(true), ) if err != nil { t.Fatalf("NewManagedStream: %v", err) @@ -646,15 +716,19 @@ func testLargeInsert(ctx context.Context, t *testing.T, mwClient *Client, bqClie t.Errorf("expected InvalidArgument status, got %v", status) } } - // send a subsequent append as verification we can proceed. + + // The second append will succeed, but internally will show a retry. result, err = ms.AppendRows(ctx, [][]byte{b}) if err != nil { - t.Fatalf("subsequent append failed: %v", err) + t.Fatalf("second append expected to succeed, got error: %v", err) } _, err = result.GetResult(ctx) if err != nil { t.Errorf("failure result from second append: %v", err) } + if attempts, _ := result.TotalAttempts(ctx); attempts != 2 { + t.Errorf("expected 2 attempts, got %d", attempts) + } } func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index c1efb18b576..8ccae000ab5 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -527,7 +527,7 @@ func recvProcessor(ms *ManagedStream, arc storagepb.BigQueryWrite_AppendRowsClie func (ms *ManagedStream) processRetry(pw *pendingWrite, appendResp *storagepb.AppendRowsResponse, initialErr error) { err := initialErr for { - pause, shouldRetry := ms.retry.Retry(err, pw.attemptCount) + pause, shouldRetry := ms.statelessRetryer().Retry(err, pw.attemptCount) if !shouldRetry { // Should not attempt to re-append. pw.markDone(appendResp, err, ms.fc) diff --git a/bigquery/storage/managedwriter/options.go b/bigquery/storage/managedwriter/options.go index 341c31e2636..342a04b3c1b 100644 --- a/bigquery/storage/managedwriter/options.go +++ b/bigquery/storage/managedwriter/options.go @@ -98,11 +98,15 @@ func WithAppendRowsCallOption(o gax.CallOption) WriterOption { } } -// DisableWriteRetries disables the logic for automatically re-enqueuing failed writes. -func DisableWriteRetries(disable bool) WriterOption { +// EnableWriteRetries enables ManagedStream to automatically retry failed appends. +// +// Enabling retries is best suited for cases where users want to achieve at-least-once +// append semantics. Use of automatic retries may complicate patterns where the user +// is designing for exactly-once append semantics. +func EnableWriteRetries(enable bool) WriterOption { return func(ms *ManagedStream) { - if disable { - ms.retry = nil + if enable { + ms.retry = newStatelessRetryer() } } } diff --git a/bigquery/storage/managedwriter/options_test.go b/bigquery/storage/managedwriter/options_test.go index 03551a9600d..ad7cf96c47f 100644 --- a/bigquery/storage/managedwriter/options_test.go +++ b/bigquery/storage/managedwriter/options_test.go @@ -21,6 +21,7 @@ import ( "cloud.google.com/go/bigquery/internal" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/googleapis/gax-go/v2" "google.golang.org/grpc" ) @@ -122,12 +123,24 @@ func TestWriterOptions(t *testing.T) { return ms }(), }, + { + desc: "EnableRetries", + options: []WriterOption{EnableWriteRetries(true)}, + want: func() *ManagedStream { + ms := &ManagedStream{ + streamSettings: defaultStreamSettings(), + } + ms.retry = newStatelessRetryer() + return ms + }(), + }, { desc: "multiple", options: []WriterOption{ WithType(PendingStream), WithMaxInflightBytes(5), WithTraceID("traceid"), + EnableWriteRetries(true), }, want: func() *ManagedStream { ms := &ManagedStream{ @@ -136,6 +149,7 @@ func TestWriterOptions(t *testing.T) { ms.streamSettings.MaxInflightBytes = 5 ms.streamSettings.streamType = PendingStream ms.streamSettings.TraceID = fmt.Sprintf("go-managedwriter:%s traceid", internal.Version) + ms.retry = newStatelessRetryer() return ms }(), }, @@ -151,7 +165,8 @@ func TestWriterOptions(t *testing.T) { if diff := cmp.Diff(got, tc.want, cmp.AllowUnexported(ManagedStream{}, streamSettings{}), - cmp.AllowUnexported(sync.Mutex{})); diff != "" { + cmp.AllowUnexported(sync.Mutex{}), + cmpopts.IgnoreUnexported(statelessRetryer{})); diff != "" { t.Errorf("diff in case (%s):\n%v", tc.desc, diff) } }