Skip to content

Commit

Permalink
Fix: bigquery.create NullPointerException when job already exists (#3035
Browse files Browse the repository at this point in the history
)

* 3034: Reproduce and propose fix

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Louis-Maxime Crédeville <Lumenol@users.noreply.github.com>
Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people committed Dec 13, 2023
1 parent f64326a commit 38191b1
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 61 deletions.
8 changes: 4 additions & 4 deletions README.md
Expand Up @@ -53,20 +53,20 @@ If you are using Maven without the BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.27.0')
implementation platform('com.google.cloud:libraries-bom:26.29.0')
implementation 'com.google.cloud:google-cloud-bigquery'
```
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-bigquery:2.34.2'
implementation 'com.google.cloud:google-cloud-bigquery:2.35.0'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-bigquery" % "2.34.2"
libraryDependencies += "com.google.cloud" % "google-cloud-bigquery" % "2.35.0"
```
<!-- {x-version-update-end} -->

Expand Down Expand Up @@ -351,7 +351,7 @@ Java is a registered trademark of Oracle and/or its affiliates.
[kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-bigquery/java11.html
[stability-image]: https://img.shields.io/badge/stability-stable-green
[maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-bigquery.svg
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigquery/2.34.2
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigquery/2.35.0
[authentication]: https://github.com/googleapis/google-cloud-java#authentication
[auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes
[predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles
Expand Down
Expand Up @@ -437,7 +437,7 @@ public com.google.api.services.bigquery.model.Job call() {

if (matcher.find()) {
// If the Job ALREADY EXISTS, retrieve it.
Job job = this.getJob(jobInfo.getJobId());
Job job = this.getJob(jobInfo.getJobId(), JobOption.fields(JobField.STATISTICS));

long jobCreationTime = job.getStatistics().getCreationTime();
long jobMinStaleTime = System.currentTimeMillis();
Expand Down
Expand Up @@ -16,47 +16,27 @@

package com.google.cloud.bigquery;

import static com.google.cloud.bigquery.BigQuery.JobField.STATISTICS;
import static com.google.cloud.bigquery.BigQuery.JobField.USER_EMAIL;
import static com.google.cloud.bigquery.BigQueryImpl.optionMap;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

import com.google.api.gax.paging.Page;
import com.google.api.services.bigquery.model.ErrorProto;
import com.google.api.services.bigquery.model.GetQueryResultsResponse;
import com.google.api.services.bigquery.model.JobConfigurationQuery;
import com.google.api.services.bigquery.model.QueryRequest;
import com.google.api.services.bigquery.model.TableCell;
import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
import com.google.api.services.bigquery.model.TableDataList;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.*;
import com.google.api.services.bigquery.model.JobStatistics;
import com.google.cloud.Policy;
import com.google.cloud.ServiceOptions;
import com.google.cloud.Tuple;
import com.google.cloud.bigquery.BigQuery.JobOption;
import com.google.cloud.bigquery.BigQuery.QueryResultsOption;
import com.google.cloud.bigquery.InsertAllRequest.RowToInsert;
import com.google.cloud.bigquery.spi.BigQueryRpcFactory;
import com.google.cloud.bigquery.spi.v2.BigQueryRpc;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.*;
import java.io.IOException;
import java.math.BigInteger;
import java.util.Collections;
Expand Down Expand Up @@ -422,12 +402,11 @@ public class BigQueryImplTest {
BigQueryRpc.Option.START_INDEX, 0L);

// Job options
private static final BigQuery.JobOption JOB_OPTION_FIELDS =
BigQuery.JobOption.fields(BigQuery.JobField.USER_EMAIL);
private static final JobOption JOB_OPTION_FIELDS = JobOption.fields(USER_EMAIL);

// Job list options
private static final BigQuery.JobListOption JOB_LIST_OPTION_FIELD =
BigQuery.JobListOption.fields(BigQuery.JobField.STATISTICS);
BigQuery.JobListOption.fields(STATISTICS);
private static final BigQuery.JobListOption JOB_LIST_ALL_USERS =
BigQuery.JobListOption.allUsers();
private static final BigQuery.JobListOption JOB_LIST_STATE_FILTER =
Expand Down Expand Up @@ -1615,7 +1594,7 @@ public void testCreateJobWithSelectedFields() {
any(com.google.api.services.bigquery.model.Job.class), capturedOptions.capture()))
.thenReturn(newJobPb());

BigQuery.JobOption jobOptions = BigQuery.JobOption.fields(BigQuery.JobField.USER_EMAIL);
JobOption jobOptions = JobOption.fields(USER_EMAIL);

bigquery = options.getService();
bigquery.create(JobInfo.of(QueryJobConfiguration.of("SOME QUERY")), jobOptions);
Expand Down Expand Up @@ -1674,6 +1653,35 @@ public JobId get() {
.getJob(any(String.class), eq(id), eq((String) null), eq(EMPTY_RPC_OPTIONS));
}

@Test
public void testCreateJobTryGetNotRandom() {
Map<BigQueryRpc.Option, ?> withStatisticOption = optionMap(JobOption.fields(STATISTICS));
final String id = "testCreateJobTryGet-id";
String query = "SELECT * in FOO";

when(bigqueryRpcMock.create(jobCapture.capture(), eq(EMPTY_RPC_OPTIONS)))
.thenThrow(
new BigQueryException(
409,
"already exists, for some reason",
new RuntimeException("Already Exists: Job")));
when(bigqueryRpcMock.getJob(
any(String.class), eq(id), eq((String) null), eq(withStatisticOption)))
.thenReturn(
newJobPb()
.setId(id)
.setStatistics(new JobStatistics().setCreationTime(System.currentTimeMillis())));

bigquery = options.getService();
Job job =
((BigQueryImpl) bigquery).create(JobInfo.of(JobId.of(id), QueryJobConfiguration.of(query)));
assertThat(job).isNotNull();
assertThat(jobCapture.getValue().getJobReference().getJobId()).isEqualTo(id);
verify(bigqueryRpcMock).create(jobCapture.capture(), eq(EMPTY_RPC_OPTIONS));
verify(bigqueryRpcMock)
.getJob(any(String.class), eq(id), eq((String) null), eq(withStatisticOption));
}

@Test
public void testCreateJobWithProjectId() {
JobInfo jobInfo =
Expand Down Expand Up @@ -1925,7 +1933,7 @@ public void testQueryRequestCompleted() throws InterruptedException {
JOB_INFO.toPb(), Collections.<BigQueryRpc.Option, Object>emptyMap()))
.thenReturn(jobResponsePb);
when(bigqueryRpcMock.getQueryResults(
PROJECT, JOB, null, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
PROJECT, JOB, null, optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
.thenReturn(responsePb);
when(bigqueryRpcMock.listTableData(
PROJECT, DATASET, TABLE, Collections.<BigQueryRpc.Option, Object>emptyMap()))
Expand All @@ -1946,8 +1954,7 @@ public void testQueryRequestCompleted() throws InterruptedException {
verify(bigqueryRpcMock)
.create(JOB_INFO.toPb(), Collections.<BigQueryRpc.Option, Object>emptyMap());
verify(bigqueryRpcMock)
.getQueryResults(
PROJECT, JOB, null, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS));
.getQueryResults(PROJECT, JOB, null, optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS));

verify(bigqueryRpcMock)
.listTableData(PROJECT, DATASET, TABLE, Collections.<BigQueryRpc.Option, Object>emptyMap());
Expand Down Expand Up @@ -2003,10 +2010,7 @@ public void testFastQueryMultiplePages() throws InterruptedException {
responseJob.getConfiguration().getQuery().setDestinationTable(TABLE_ID.toPb());
when(bigqueryRpcMock.getJob(PROJECT, JOB, null, EMPTY_RPC_OPTIONS)).thenReturn(responseJob);
when(bigqueryRpcMock.listTableData(
PROJECT,
DATASET,
TABLE,
BigQueryImpl.optionMap(BigQuery.TableDataListOption.pageToken(CURSOR))))
PROJECT, DATASET, TABLE, optionMap(BigQuery.TableDataListOption.pageToken(CURSOR))))
.thenReturn(
new TableDataList()
.setPageToken(CURSOR)
Expand Down Expand Up @@ -2044,10 +2048,7 @@ public void testFastQueryMultiplePages() throws InterruptedException {
verify(bigqueryRpcMock).getJob(PROJECT, JOB, null, EMPTY_RPC_OPTIONS);
verify(bigqueryRpcMock)
.listTableData(
PROJECT,
DATASET,
TABLE,
BigQueryImpl.optionMap(BigQuery.TableDataListOption.pageToken(CURSOR)));
PROJECT, DATASET, TABLE, optionMap(BigQuery.TableDataListOption.pageToken(CURSOR)));
verify(bigqueryRpcMock).queryRpc(eq(PROJECT), requestPbCapture.capture());
}

Expand Down Expand Up @@ -2084,7 +2085,7 @@ public void testFastQuerySlowDdl() throws InterruptedException {
responseJob.getConfiguration().getQuery().setDestinationTable(TABLE_ID.toPb());
when(bigqueryRpcMock.getJob(PROJECT, JOB, null, EMPTY_RPC_OPTIONS)).thenReturn(responseJob);
when(bigqueryRpcMock.getQueryResults(
PROJECT, JOB, null, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
PROJECT, JOB, null, optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
.thenReturn(queryResultsResponsePb);
when(bigqueryRpcMock.listTableData(PROJECT, DATASET, TABLE, EMPTY_RPC_OPTIONS))
.thenReturn(new TableDataList().setRows(ImmutableList.of(TABLE_ROW)).setTotalRows(1L));
Expand All @@ -2108,8 +2109,7 @@ public void testFastQuerySlowDdl() throws InterruptedException {
verify(bigqueryRpcMock).queryRpc(eq(PROJECT), requestPbCapture.capture());
verify(bigqueryRpcMock).getJob(PROJECT, JOB, null, EMPTY_RPC_OPTIONS);
verify(bigqueryRpcMock)
.getQueryResults(
PROJECT, JOB, null, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS));
.getQueryResults(PROJECT, JOB, null, optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS));
verify(bigqueryRpcMock).listTableData(PROJECT, DATASET, TABLE, EMPTY_RPC_OPTIONS);
}

Expand Down Expand Up @@ -2143,7 +2143,7 @@ public void testQueryRequestCompletedOptions() throws InterruptedException {
optionMap.put(pageSizeOption.getRpcOption(), pageSizeOption.getValue());

when(bigqueryRpcMock.getQueryResults(
PROJECT, JOB, null, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
PROJECT, JOB, null, optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
.thenReturn(responsePb);
when(bigqueryRpcMock.listTableData(PROJECT, DATASET, TABLE, optionMap))
.thenReturn(
Expand All @@ -2164,8 +2164,7 @@ public void testQueryRequestCompletedOptions() throws InterruptedException {
verify(bigqueryRpcMock)
.create(JOB_INFO.toPb(), Collections.<BigQueryRpc.Option, Object>emptyMap());
verify(bigqueryRpcMock)
.getQueryResults(
PROJECT, JOB, null, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS));
.getQueryResults(PROJECT, JOB, null, optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS));
verify(bigqueryRpcMock).listTableData(PROJECT, DATASET, TABLE, optionMap);
}

Expand Down Expand Up @@ -2199,10 +2198,10 @@ public void testQueryRequestCompletedOnSecondAttempt() throws InterruptedExcepti
JOB_INFO.toPb(), Collections.<BigQueryRpc.Option, Object>emptyMap()))
.thenReturn(jobResponsePb1);
when(bigqueryRpcMock.getQueryResults(
PROJECT, JOB, null, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
PROJECT, JOB, null, optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
.thenReturn(responsePb1);
when(bigqueryRpcMock.getQueryResults(
PROJECT, JOB, null, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
PROJECT, JOB, null, optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
.thenReturn(responsePb2);
when(bigqueryRpcMock.listTableData(
PROJECT, DATASET, TABLE, Collections.<BigQueryRpc.Option, Object>emptyMap()))
Expand All @@ -2223,11 +2222,9 @@ public void testQueryRequestCompletedOnSecondAttempt() throws InterruptedExcepti
verify(bigqueryRpcMock)
.create(JOB_INFO.toPb(), Collections.<BigQueryRpc.Option, Object>emptyMap());
verify(bigqueryRpcMock)
.getQueryResults(
PROJECT, JOB, null, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS));
.getQueryResults(PROJECT, JOB, null, optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS));
verify(bigqueryRpcMock)
.getQueryResults(
PROJECT, JOB, null, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS));
.getQueryResults(PROJECT, JOB, null, optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS));
verify(bigqueryRpcMock)
.listTableData(PROJECT, DATASET, TABLE, Collections.<BigQueryRpc.Option, Object>emptyMap());
}
Expand Down

0 comments on commit 38191b1

Please sign in to comment.