diff --git a/bigquery/bigquery.go b/bigquery/bigquery.go index c95ec5435ba..7170f73c323 100644 --- a/bigquery/bigquery.go +++ b/bigquery/bigquery.go @@ -102,7 +102,7 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio // large datasets from tables, jobs or queries. // Calling this method twice will return an error. func (c *Client) EnableStorageReadClient(ctx context.Context, opts ...option.ClientOption) error { - if c.rc != nil { + if c.isStorageReadAvailable() { return fmt.Errorf("failed: storage read client already set up") } rc, err := newReadClient(ctx, c.projectID, opts...) @@ -113,6 +113,10 @@ func (c *Client) EnableStorageReadClient(ctx context.Context, opts ...option.Cli return nil } +func (c *Client) isStorageReadAvailable() bool { + return c.rc != nil +} + // Project returns the project ID or number for this instance of the client, which may have // either been explicitly specified or autodetected. func (c *Client) Project() string { @@ -123,7 +127,7 @@ func (c *Client) Project() string { // Close should be called when the client is no longer needed. // It need not be called at program exit. func (c *Client) Close() error { - if c.rc != nil { + if c.isStorageReadAvailable() { err := c.rc.close() if err != nil { return err diff --git a/bigquery/job.go b/bigquery/job.go index 1319e322cf4..e6be579c07a 100644 --- a/bigquery/job.go +++ b/bigquery/job.go @@ -322,7 +322,7 @@ func (j *Job) read(ctx context.Context, waitForQuery func(context.Context, strin return nil, err } var it *RowIterator - if j.c.rc != nil { + if j.c.isStorageReadAvailable() { it, err = newStorageRowIteratorFromJob(ctx, j) if err != nil { it = nil diff --git a/bigquery/query.go b/bigquery/query.go index 95f5565aa00..7ac35f88af3 100644 --- a/bigquery/query.go +++ b/bigquery/query.go @@ -402,13 +402,8 @@ func (q *Query) Read(ctx context.Context) (it *RowIterator, err error) { if resp.JobComplete { // If more pages are available, discard and use the Storage API instead - if resp.PageToken != "" && q.client.rc != nil { - // Needed to fetch destination table - job, err := q.client.JobFromID(ctx, resp.JobReference.JobId) - if err != nil { - return nil, err - } - it, err = newStorageRowIteratorFromJob(ctx, job) + if resp.PageToken != "" && q.client.isStorageReadAvailable() { + it, err = newStorageRowIteratorFromJob(ctx, minimalJob) if err == nil { return it, nil } @@ -439,7 +434,7 @@ func (q *Query) Read(ctx context.Context) (it *RowIterator, err error) { // user's Query configuration. If all the options set on the job are supported on the // faster query path, this method returns a QueryRequest suitable for execution. func (q *Query) probeFastPath() (*bq.QueryRequest, error) { - if q.forceStorageAPI && q.client.rc != nil { + if q.forceStorageAPI && q.client.isStorageReadAvailable() { return nil, fmt.Errorf("force Storage API usage") } // This is a denylist of settings which prevent us from composing an equivalent diff --git a/bigquery/storage_integration_test.go b/bigquery/storage_integration_test.go index 6def5d51b03..b0f66430cfb 100644 --- a/bigquery/storage_integration_test.go +++ b/bigquery/storage_integration_test.go @@ -272,6 +272,7 @@ func TestIntegration_StorageReadQueryMorePages(t *testing.T) { sql := fmt.Sprintf(`SELECT repository_url as url, repository_owner as owner, repository_forks as forks FROM %s`, table) // Don't forceStorageAPI usage and still see internally Storage API is selected q := storageOptimizedClient.Query(sql) + q.DisableQueryCache = true it, err := q.Read(ctx) if err != nil { t.Fatal(err) @@ -304,12 +305,13 @@ func TestIntegration_StorageReadCancel(t *testing.T) { t.Skip("Integration tests skipped") } ctx := context.Background() - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + ctx, cancel := context.WithCancel(ctx) defer cancel() table := "`bigquery-public-data.samples.github_timeline`" sql := fmt.Sprintf(`SELECT repository_url as url, repository_owner as owner, repository_forks as forks FROM %s`, table) storageOptimizedClient.rc.settings.maxWorkerCount = 1 q := storageOptimizedClient.Query(sql) + q.DisableQueryCache = true q.forceStorageAPI = true it, err := q.Read(ctx) if err != nil { @@ -319,6 +321,8 @@ func TestIntegration_StorageReadCancel(t *testing.T) { t.Fatal("expected query to use Storage API") } + // Cancel read after readings 1000 rows + rowsRead := 0 for { var dst []Value err := it.Next(&dst) @@ -326,14 +330,19 @@ func TestIntegration_StorageReadCancel(t *testing.T) { break } if err != nil { - if errors.Is(err, context.DeadlineExceeded) { + if errors.Is(err, context.DeadlineExceeded) || + errors.Is(err, context.Canceled) { break } t.Fatalf("failed to fetch via storage API: %v", err) } + rowsRead++ + if rowsRead > 1000 { + cancel() + } } // resources are cleaned asynchronously - time.Sleep(500 * time.Millisecond) + time.Sleep(time.Second) if !it.arrowIterator.isDone() { t.Fatal("expected stream to be done") } diff --git a/bigquery/storage_iterator.go b/bigquery/storage_iterator.go index 702b90b6769..1c7fa3cc6c5 100644 --- a/bigquery/storage_iterator.go +++ b/bigquery/storage_iterator.go @@ -64,7 +64,12 @@ func newStorageRowIteratorFromTable(ctx context.Context, table *Table, ordered b return it, nil } -func newStorageRowIteratorFromJob(ctx context.Context, job *Job) (*RowIterator, error) { +func newStorageRowIteratorFromJob(ctx context.Context, j *Job) (*RowIterator, error) { + // Needed to fetch destination table + job, err := j.c.JobFromID(ctx, j.jobID) + if err != nil { + return nil, err + } cfg, err := job.Config() if err != nil { return nil, err diff --git a/bigquery/table.go b/bigquery/table.go index f0ec2929792..fc0e1e3029b 100644 --- a/bigquery/table.go +++ b/bigquery/table.go @@ -813,7 +813,7 @@ func (t *Table) Read(ctx context.Context) *RowIterator { } func (t *Table) read(ctx context.Context, pf pageFetcher) *RowIterator { - if t.c.rc != nil { + if t.c.isStorageReadAvailable() { it, err := newStorageRowIteratorFromTable(ctx, t, false) if err == nil { return it