Skip to content

Commit

Permalink
fix(bigquery): handle storage read api Recv call errors (#8666)
Browse files Browse the repository at this point in the history
Initial bug was found when the Storage Read API is called with a more restrict IAM/Role, which can cause an user to be able to create a ReadSession but not read from it (missing `bigquery.readsessions.getData` permission). This would make the process of reading the `read_streams` enter a retry loop because errors coming from the `Recv` calls are not handled properly, just the `ReadRows` call. This PR fixes this behavior.

Was reported on #8660 and tested locally by creating a custom role with the given configuration:

![image](https://togithub.com/googleapis/google-cloud-go/assets/1615543/b6dfdecf-5bb0-497f-8fcb-df8a8bdf1e3b)

Example of error:
```
failed to fetch via storage API: failed to read rows on stream projects/xxx/locations/us/sessions/yyy/streams/zzz: failed to consume rows on stream projects/xxx/locations/us/sessions/yyy/streams/zzz: rpc error: code = PermissionDenied desc = there was an error operating on 'projects/xxx/locations/us/sessions/yyy/streams/zzz': the user does not have 'bigquery.readsessions.getData' permission for 'projects/xxx/locations/us/sessions/yyy/streams/zzz
```

With the fix on this PR, now the processing of the stream stops and errors can be returned (like the PERMISSION_DENIED error in this scenario).
  • Loading branch information
alvarowolfx committed Oct 12, 2023
1 parent 65cb8bd commit c73963f
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 32 deletions.
8 changes: 8 additions & 0 deletions bigquery/storage_iterator.go
Expand Up @@ -270,6 +270,14 @@ func (it *arrowIterator) processStream(readStream string) {
if it.session.ctx.Err() != nil { // context cancelled, don't queue error
return
}
backoff, shouldRetry := retryReadRows(bo, err)
if shouldRetry {
if err := gax.Sleep(it.ctx, backoff); err != nil {
return // context cancelled
}
continue
}
it.errs <- fmt.Errorf("failed to read rows on stream %s: %w", readStream, err)
// try to re-open row stream with updated offset
}
}
Expand Down
87 changes: 55 additions & 32 deletions bigquery/storage_iterator_test.go
Expand Up @@ -39,7 +39,7 @@ func TestStorageIteratorRetry(t *testing.T) {
}{
{
desc: "no error",
errors: []error{nil},
errors: []error{},
wantFail: false,
},
{
Expand All @@ -49,10 +49,16 @@ func TestStorageIteratorRetry(t *testing.T) {
status.Errorf(codes.Unavailable, "try 2"),
status.Errorf(codes.Canceled, "try 3"),
status.Errorf(codes.Internal, "try 4"),
nil,
},
wantFail: false,
},
{
desc: "not enough permission",
errors: []error{
status.Errorf(codes.PermissionDenied, "the user does not have 'bigquery.readsessions.getData' permission"),
},
wantFail: true,
},
{
desc: "permanent error",
errors: []error{
Expand All @@ -71,18 +77,12 @@ func TestStorageIteratorRetry(t *testing.T) {
wantFail: true,
},
}

for _, tc := range testCases {
baseCtx := tc.ctx
if baseCtx == nil {
baseCtx = context.Background()
rrc := &testReadRowsClient{
errors: tc.errors,
}
ctx, cancel := context.WithTimeout(baseCtx, 5*time.Second)
defer cancel()
it, err := newRawStorageRowIterator(&readSession{
ctx: ctx,
settings: defaultReadClientSettings(),
readRowsFunc: func(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error) {
readRowsFuncs := map[string]func(context.Context, *storagepb.ReadRowsRequest, ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error){
"readRows fail on first call": func(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error) {
if len(tc.errors) == 0 {
return &testReadRowsClient{}, nil
}
Expand All @@ -93,39 +93,62 @@ func TestStorageIteratorRetry(t *testing.T) {
}
return &testReadRowsClient{}, nil
},
bqSession: &storagepb.ReadSession{},
})
if err != nil {
t.Fatalf("case %s: newRawStorageRowIterator: %v", tc.desc, err)
"readRows fails on Recv": func(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error) {
return rrc, nil
},
}
for readRowsFuncType, readRowsFunc := range readRowsFuncs {
baseCtx := tc.ctx
if baseCtx == nil {
baseCtx = context.Background()
}
ctx, cancel := context.WithTimeout(baseCtx, 5*time.Second)
defer cancel()

it.processStream("test-stream")
it, err := newRawStorageRowIterator(&readSession{
ctx: ctx,
settings: defaultReadClientSettings(),
readRowsFunc: readRowsFunc,
bqSession: &storagepb.ReadSession{},
})
if err != nil {
t.Fatalf("case %s: newRawStorageRowIterator: %v", tc.desc, err)
}

it.processStream("test-stream")

if errors.Is(it.ctx.Err(), context.Canceled) || errors.Is(it.ctx.Err(), context.DeadlineExceeded) {
if tc.wantFail {
continue
if errors.Is(it.ctx.Err(), context.Canceled) || errors.Is(it.ctx.Err(), context.DeadlineExceeded) {
if tc.wantFail {
continue
}
t.Fatalf("case %s(%s): deadline exceeded", tc.desc, readRowsFuncType)
}
if tc.wantFail && len(it.errs) == 0 {
t.Fatalf("case %s(%s):want test to fail, but found no errors", tc.desc, readRowsFuncType)
}
if !tc.wantFail && len(it.errs) > 0 {
t.Fatalf("case %s(%s):test should not fail, but found %d errors", tc.desc, readRowsFuncType, len(it.errs))
}
t.Fatalf("case %s: deadline exceeded", tc.desc)
}
if tc.wantFail && len(it.errs) == 0 {
t.Fatalf("case %s:want test to fail, but found no errors", tc.desc)
}
if !tc.wantFail && len(it.errs) > 0 {
t.Fatalf("case %s:test should not fail, but found %d errors", tc.desc, len(it.errs))
}
}
}

type testReadRowsClient struct {
storagepb.BigQueryRead_ReadRowsClient
responses []*storagepb.ReadRowsResponse
errors []error
}

func (trrc *testReadRowsClient) Recv() (*storagepb.ReadRowsResponse, error) {
if len(trrc.responses) == 0 {
return nil, io.EOF
if len(trrc.errors) > 0 {
err := trrc.errors[0]
trrc.errors = trrc.errors[1:]
return nil, err
}
if len(trrc.responses) > 0 {
r := trrc.responses[0]
trrc.responses = trrc.responses[:1]
return r, nil
}
r := trrc.responses[0]
trrc.responses = trrc.responses[:1]
return r, nil
return nil, io.EOF
}

0 comments on commit c73963f

Please sign in to comment.