Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: bigquery.create NullPointerException when job already exists #3035

Merged
merged 2 commits into from Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Expand Up @@ -53,20 +53,20 @@ If you are using Maven without the BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.27.0')
implementation platform('com.google.cloud:libraries-bom:26.29.0')
implementation 'com.google.cloud:google-cloud-bigquery'
```
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-bigquery:2.34.2'
implementation 'com.google.cloud:google-cloud-bigquery:2.35.0'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-bigquery" % "2.34.2"
libraryDependencies += "com.google.cloud" % "google-cloud-bigquery" % "2.35.0"
```
<!-- {x-version-update-end} -->

Expand Down Expand Up @@ -351,7 +351,7 @@ Java is a registered trademark of Oracle and/or its affiliates.
[kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-bigquery/java11.html
[stability-image]: https://img.shields.io/badge/stability-stable-green
[maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-bigquery.svg
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigquery/2.34.2
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigquery/2.35.0
[authentication]: https://github.com/googleapis/google-cloud-java#authentication
[auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes
[predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles
Expand Down
Expand Up @@ -437,7 +437,7 @@ public com.google.api.services.bigquery.model.Job call() {

if (matcher.find()) {
// If the Job ALREADY EXISTS, retrieve it.
Job job = this.getJob(jobInfo.getJobId());
Job job = this.getJob(jobInfo.getJobId(), JobOption.fields(JobField.STATISTICS));

long jobCreationTime = job.getStatistics().getCreationTime();
long jobMinStaleTime = System.currentTimeMillis();
Expand Down
Expand Up @@ -16,47 +16,27 @@

package com.google.cloud.bigquery;

import static com.google.cloud.bigquery.BigQuery.JobField.STATISTICS;
import static com.google.cloud.bigquery.BigQuery.JobField.USER_EMAIL;
import static com.google.cloud.bigquery.BigQueryImpl.optionMap;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

import com.google.api.gax.paging.Page;
import com.google.api.services.bigquery.model.ErrorProto;
import com.google.api.services.bigquery.model.GetQueryResultsResponse;
import com.google.api.services.bigquery.model.JobConfigurationQuery;
import com.google.api.services.bigquery.model.QueryRequest;
import com.google.api.services.bigquery.model.TableCell;
import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
import com.google.api.services.bigquery.model.TableDataList;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.*;
import com.google.api.services.bigquery.model.JobStatistics;
import com.google.cloud.Policy;
import com.google.cloud.ServiceOptions;
import com.google.cloud.Tuple;
import com.google.cloud.bigquery.BigQuery.JobOption;
import com.google.cloud.bigquery.BigQuery.QueryResultsOption;
import com.google.cloud.bigquery.InsertAllRequest.RowToInsert;
import com.google.cloud.bigquery.spi.BigQueryRpcFactory;
import com.google.cloud.bigquery.spi.v2.BigQueryRpc;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.*;
import java.io.IOException;
import java.math.BigInteger;
import java.util.Collections;
Expand Down Expand Up @@ -422,12 +402,11 @@ public class BigQueryImplTest {
BigQueryRpc.Option.START_INDEX, 0L);

// Job options
private static final BigQuery.JobOption JOB_OPTION_FIELDS =
BigQuery.JobOption.fields(BigQuery.JobField.USER_EMAIL);
private static final JobOption JOB_OPTION_FIELDS = JobOption.fields(USER_EMAIL);

// Job list options
private static final BigQuery.JobListOption JOB_LIST_OPTION_FIELD =
BigQuery.JobListOption.fields(BigQuery.JobField.STATISTICS);
BigQuery.JobListOption.fields(STATISTICS);
private static final BigQuery.JobListOption JOB_LIST_ALL_USERS =
BigQuery.JobListOption.allUsers();
private static final BigQuery.JobListOption JOB_LIST_STATE_FILTER =
Expand Down Expand Up @@ -1615,7 +1594,7 @@ public void testCreateJobWithSelectedFields() {
any(com.google.api.services.bigquery.model.Job.class), capturedOptions.capture()))
.thenReturn(newJobPb());

BigQuery.JobOption jobOptions = BigQuery.JobOption.fields(BigQuery.JobField.USER_EMAIL);
JobOption jobOptions = JobOption.fields(USER_EMAIL);

bigquery = options.getService();
bigquery.create(JobInfo.of(QueryJobConfiguration.of("SOME QUERY")), jobOptions);
Expand Down Expand Up @@ -1674,6 +1653,35 @@ public JobId get() {
.getJob(any(String.class), eq(id), eq((String) null), eq(EMPTY_RPC_OPTIONS));
}

@Test
public void testCreateJobTryGetNotRandom() {
Map<BigQueryRpc.Option, ?> withStatisticOption = optionMap(JobOption.fields(STATISTICS));
final String id = "testCreateJobTryGet-id";
String query = "SELECT * in FOO";

when(bigqueryRpcMock.create(jobCapture.capture(), eq(EMPTY_RPC_OPTIONS)))
.thenThrow(
new BigQueryException(
409,
"already exists, for some reason",
new RuntimeException("Already Exists: Job")));
when(bigqueryRpcMock.getJob(
any(String.class), eq(id), eq((String) null), eq(withStatisticOption)))
.thenReturn(
newJobPb()
.setId(id)
.setStatistics(new JobStatistics().setCreationTime(System.currentTimeMillis())));

bigquery = options.getService();
Job job =
((BigQueryImpl) bigquery).create(JobInfo.of(JobId.of(id), QueryJobConfiguration.of(query)));
assertThat(job).isNotNull();
assertThat(jobCapture.getValue().getJobReference().getJobId()).isEqualTo(id);
verify(bigqueryRpcMock).create(jobCapture.capture(), eq(EMPTY_RPC_OPTIONS));
verify(bigqueryRpcMock)
.getJob(any(String.class), eq(id), eq((String) null), eq(withStatisticOption));
}

@Test
public void testCreateJobWithProjectId() {
JobInfo jobInfo =
Expand Down Expand Up @@ -1925,7 +1933,7 @@ public void testQueryRequestCompleted() throws InterruptedException {
JOB_INFO.toPb(), Collections.<BigQueryRpc.Option, Object>emptyMap()))
.thenReturn(jobResponsePb);
when(bigqueryRpcMock.getQueryResults(
PROJECT, JOB, null, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
PROJECT, JOB, null, optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
.thenReturn(responsePb);
when(bigqueryRpcMock.listTableData(
PROJECT, DATASET, TABLE, Collections.<BigQueryRpc.Option, Object>emptyMap()))
Expand All @@ -1946,8 +1954,7 @@ public void testQueryRequestCompleted() throws InterruptedException {
verify(bigqueryRpcMock)
.create(JOB_INFO.toPb(), Collections.<BigQueryRpc.Option, Object>emptyMap());
verify(bigqueryRpcMock)
.getQueryResults(
PROJECT, JOB, null, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS));
.getQueryResults(PROJECT, JOB, null, optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS));

verify(bigqueryRpcMock)
.listTableData(PROJECT, DATASET, TABLE, Collections.<BigQueryRpc.Option, Object>emptyMap());
Expand Down Expand Up @@ -2003,10 +2010,7 @@ public void testFastQueryMultiplePages() throws InterruptedException {
responseJob.getConfiguration().getQuery().setDestinationTable(TABLE_ID.toPb());
when(bigqueryRpcMock.getJob(PROJECT, JOB, null, EMPTY_RPC_OPTIONS)).thenReturn(responseJob);
when(bigqueryRpcMock.listTableData(
PROJECT,
DATASET,
TABLE,
BigQueryImpl.optionMap(BigQuery.TableDataListOption.pageToken(CURSOR))))
PROJECT, DATASET, TABLE, optionMap(BigQuery.TableDataListOption.pageToken(CURSOR))))
.thenReturn(
new TableDataList()
.setPageToken(CURSOR)
Expand Down Expand Up @@ -2044,10 +2048,7 @@ public void testFastQueryMultiplePages() throws InterruptedException {
verify(bigqueryRpcMock).getJob(PROJECT, JOB, null, EMPTY_RPC_OPTIONS);
verify(bigqueryRpcMock)
.listTableData(
PROJECT,
DATASET,
TABLE,
BigQueryImpl.optionMap(BigQuery.TableDataListOption.pageToken(CURSOR)));
PROJECT, DATASET, TABLE, optionMap(BigQuery.TableDataListOption.pageToken(CURSOR)));
verify(bigqueryRpcMock).queryRpc(eq(PROJECT), requestPbCapture.capture());
}

Expand Down Expand Up @@ -2084,7 +2085,7 @@ public void testFastQuerySlowDdl() throws InterruptedException {
responseJob.getConfiguration().getQuery().setDestinationTable(TABLE_ID.toPb());
when(bigqueryRpcMock.getJob(PROJECT, JOB, null, EMPTY_RPC_OPTIONS)).thenReturn(responseJob);
when(bigqueryRpcMock.getQueryResults(
PROJECT, JOB, null, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
PROJECT, JOB, null, optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
.thenReturn(queryResultsResponsePb);
when(bigqueryRpcMock.listTableData(PROJECT, DATASET, TABLE, EMPTY_RPC_OPTIONS))
.thenReturn(new TableDataList().setRows(ImmutableList.of(TABLE_ROW)).setTotalRows(1L));
Expand All @@ -2108,8 +2109,7 @@ public void testFastQuerySlowDdl() throws InterruptedException {
verify(bigqueryRpcMock).queryRpc(eq(PROJECT), requestPbCapture.capture());
verify(bigqueryRpcMock).getJob(PROJECT, JOB, null, EMPTY_RPC_OPTIONS);
verify(bigqueryRpcMock)
.getQueryResults(
PROJECT, JOB, null, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS));
.getQueryResults(PROJECT, JOB, null, optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS));
verify(bigqueryRpcMock).listTableData(PROJECT, DATASET, TABLE, EMPTY_RPC_OPTIONS);
}

Expand Down Expand Up @@ -2143,7 +2143,7 @@ public void testQueryRequestCompletedOptions() throws InterruptedException {
optionMap.put(pageSizeOption.getRpcOption(), pageSizeOption.getValue());

when(bigqueryRpcMock.getQueryResults(
PROJECT, JOB, null, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
PROJECT, JOB, null, optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
.thenReturn(responsePb);
when(bigqueryRpcMock.listTableData(PROJECT, DATASET, TABLE, optionMap))
.thenReturn(
Expand All @@ -2164,8 +2164,7 @@ public void testQueryRequestCompletedOptions() throws InterruptedException {
verify(bigqueryRpcMock)
.create(JOB_INFO.toPb(), Collections.<BigQueryRpc.Option, Object>emptyMap());
verify(bigqueryRpcMock)
.getQueryResults(
PROJECT, JOB, null, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS));
.getQueryResults(PROJECT, JOB, null, optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS));
verify(bigqueryRpcMock).listTableData(PROJECT, DATASET, TABLE, optionMap);
}

Expand Down Expand Up @@ -2199,10 +2198,10 @@ public void testQueryRequestCompletedOnSecondAttempt() throws InterruptedExcepti
JOB_INFO.toPb(), Collections.<BigQueryRpc.Option, Object>emptyMap()))
.thenReturn(jobResponsePb1);
when(bigqueryRpcMock.getQueryResults(
PROJECT, JOB, null, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
PROJECT, JOB, null, optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
.thenReturn(responsePb1);
when(bigqueryRpcMock.getQueryResults(
PROJECT, JOB, null, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
PROJECT, JOB, null, optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
.thenReturn(responsePb2);
when(bigqueryRpcMock.listTableData(
PROJECT, DATASET, TABLE, Collections.<BigQueryRpc.Option, Object>emptyMap()))
Expand All @@ -2223,11 +2222,9 @@ public void testQueryRequestCompletedOnSecondAttempt() throws InterruptedExcepti
verify(bigqueryRpcMock)
.create(JOB_INFO.toPb(), Collections.<BigQueryRpc.Option, Object>emptyMap());
verify(bigqueryRpcMock)
.getQueryResults(
PROJECT, JOB, null, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS));
.getQueryResults(PROJECT, JOB, null, optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS));
verify(bigqueryRpcMock)
.getQueryResults(
PROJECT, JOB, null, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS));
.getQueryResults(PROJECT, JOB, null, optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS));
verify(bigqueryRpcMock)
.listTableData(PROJECT, DATASET, TABLE, Collections.<BigQueryRpc.Option, Object>emptyMap());
}
Expand Down