Skip to content

Commit

Permalink
feat(bigquery/storage/managedwriter): switch to opt-in retry (#6765)
Browse files Browse the repository at this point in the history
This PR changes the write option from a disable (opt-out) to an
enable (opt-in) for performing retries of appends.  We add additional
guidance to the option indicating its usage.

We also expand the existing large insert integration test to validate
the expected behavior both with and without retries enabled.
  • Loading branch information
shollyman committed Sep 28, 2022
1 parent 76b06c8 commit a3e97a6
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 13 deletions.
2 changes: 0 additions & 2 deletions bigquery/storage/managedwriter/client.go
Expand Up @@ -117,8 +117,6 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient
gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10 * 1024 * 1024)),
},
open: createOpenF(ctx, streamFunc),
// We add the new retryer by default, and add a new option to disable it.
retry: newStatelessRetryer(),
}

// apply writer options
Expand Down
84 changes: 79 additions & 5 deletions bigquery/storage/managedwriter/integration_test.go
Expand Up @@ -159,8 +159,11 @@ func TestIntegration_ManagedWriter(t *testing.T) {
// Don't run this in parallel, we only want to collect stats from this subtest.
testInstrumentation(ctx, t, mwClient, bqClient, dataset)
})
t.Run("TestLargeInsert", func(t *testing.T) {
testLargeInsert(ctx, t, mwClient, bqClient, dataset)
t.Run("TestLargeInsertNoRetry", func(t *testing.T) {
testLargeInsertNoRetry(ctx, t, mwClient, bqClient, dataset)
})
t.Run("TestLargeInsertWithRetry", func(t *testing.T) {
testLargeInsertWithRetry(ctx, t, mwClient, bqClient, dataset)
})
})
}
Expand Down Expand Up @@ -596,7 +599,73 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
withExactRowCount(int64(len(testSimpleData))))
}

func testLargeInsert(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
func testLargeInsertNoRetry(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}

m := &testdata.SimpleMessageProto2{}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())

ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(CommittedStream),
WithSchemaDescriptor(descriptorProto),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))

// Construct a Very Large request.
var data [][]byte
targetSize := 11 * 1024 * 1024 // 11 MB
b, err := proto.Marshal(testSimpleData[0])
if err != nil {
t.Errorf("failed to marshal message: %v", err)
}

numRows := targetSize / len(b)
data = make([][]byte, numRows)

for i := 0; i < numRows; i++ {
data[i] = b
}

result, err := ms.AppendRows(ctx, data, WithOffset(0))
if err != nil {
t.Errorf("single append failed: %v", err)
}
_, err = result.GetResult(ctx)
if err != nil {
apiErr, ok := apierror.FromError(err)
if !ok {
t.Errorf("GetResult error was not an instance of ApiError")
}
status := apiErr.GRPCStatus()
if status.Code() != codes.InvalidArgument {
t.Errorf("expected InvalidArgument status, got %v", status)
}
}
// our next append should fail (we don't have retries enabled).
if _, err = ms.AppendRows(ctx, [][]byte{b}); err == nil {
t.Fatalf("expected second append to fail, got success: %v", err)
}

// The send failure triggers reconnect, so an additional append will succeed.
result, err = ms.AppendRows(ctx, [][]byte{b})
if err != nil {
t.Fatalf("third append expected to succeed, got error: %v", err)
}
_, err = result.GetResult(ctx)
if err != nil {
t.Errorf("failure result from third append: %v", err)
}
}

func testLargeInsertWithRetry(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
Expand All @@ -609,6 +678,7 @@ func testLargeInsert(ctx context.Context, t *testing.T, mwClient *Client, bqClie
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(CommittedStream),
WithSchemaDescriptor(descriptorProto),
EnableWriteRetries(true),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
Expand Down Expand Up @@ -646,15 +716,19 @@ func testLargeInsert(ctx context.Context, t *testing.T, mwClient *Client, bqClie
t.Errorf("expected InvalidArgument status, got %v", status)
}
}
// send a subsequent append as verification we can proceed.

// The second append will succeed, but internally will show a retry.
result, err = ms.AppendRows(ctx, [][]byte{b})
if err != nil {
t.Fatalf("subsequent append failed: %v", err)
t.Fatalf("second append expected to succeed, got error: %v", err)
}
_, err = result.GetResult(ctx)
if err != nil {
t.Errorf("failure result from second append: %v", err)
}
if attempts, _ := result.TotalAttempts(ctx); attempts != 2 {
t.Errorf("expected 2 attempts, got %d", attempts)
}
}

func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
Expand Down
2 changes: 1 addition & 1 deletion bigquery/storage/managedwriter/managed_stream.go
Expand Up @@ -527,7 +527,7 @@ func recvProcessor(ms *ManagedStream, arc storagepb.BigQueryWrite_AppendRowsClie
func (ms *ManagedStream) processRetry(pw *pendingWrite, appendResp *storagepb.AppendRowsResponse, initialErr error) {
err := initialErr
for {
pause, shouldRetry := ms.retry.Retry(err, pw.attemptCount)
pause, shouldRetry := ms.statelessRetryer().Retry(err, pw.attemptCount)
if !shouldRetry {
// Should not attempt to re-append.
pw.markDone(appendResp, err, ms.fc)
Expand Down
12 changes: 8 additions & 4 deletions bigquery/storage/managedwriter/options.go
Expand Up @@ -98,11 +98,15 @@ func WithAppendRowsCallOption(o gax.CallOption) WriterOption {
}
}

// DisableWriteRetries disables the logic for automatically re-enqueuing failed writes.
func DisableWriteRetries(disable bool) WriterOption {
// EnableWriteRetries enables ManagedStream to automatically retry failed appends.
//
// Enabling retries is best suited for cases where users want to achieve at-least-once
// append semantics. Use of automatic retries may complicate patterns where the user
// is designing for exactly-once append semantics.
func EnableWriteRetries(enable bool) WriterOption {
return func(ms *ManagedStream) {
if disable {
ms.retry = nil
if enable {
ms.retry = newStatelessRetryer()
}
}
}
Expand Down
17 changes: 16 additions & 1 deletion bigquery/storage/managedwriter/options_test.go
Expand Up @@ -21,6 +21,7 @@ import (

"cloud.google.com/go/bigquery/internal"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/googleapis/gax-go/v2"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -122,12 +123,24 @@ func TestWriterOptions(t *testing.T) {
return ms
}(),
},
{
desc: "EnableRetries",
options: []WriterOption{EnableWriteRetries(true)},
want: func() *ManagedStream {
ms := &ManagedStream{
streamSettings: defaultStreamSettings(),
}
ms.retry = newStatelessRetryer()
return ms
}(),
},
{
desc: "multiple",
options: []WriterOption{
WithType(PendingStream),
WithMaxInflightBytes(5),
WithTraceID("traceid"),
EnableWriteRetries(true),
},
want: func() *ManagedStream {
ms := &ManagedStream{
Expand All @@ -136,6 +149,7 @@ func TestWriterOptions(t *testing.T) {
ms.streamSettings.MaxInflightBytes = 5
ms.streamSettings.streamType = PendingStream
ms.streamSettings.TraceID = fmt.Sprintf("go-managedwriter:%s traceid", internal.Version)
ms.retry = newStatelessRetryer()
return ms
}(),
},
Expand All @@ -151,7 +165,8 @@ func TestWriterOptions(t *testing.T) {

if diff := cmp.Diff(got, tc.want,
cmp.AllowUnexported(ManagedStream{}, streamSettings{}),
cmp.AllowUnexported(sync.Mutex{})); diff != "" {
cmp.AllowUnexported(sync.Mutex{}),
cmpopts.IgnoreUnexported(statelessRetryer{})); diff != "" {
t.Errorf("diff in case (%s):\n%v", tc.desc, diff)
}
}
Expand Down

0 comments on commit a3e97a6

Please sign in to comment.