Skip to content

Commit

Permalink
fix: BigQueryTemplate now closes its BigQueryJsonDataWriter (#2765)
Browse files Browse the repository at this point in the history
* fix: remove unnecessary 'throws' from BigQueryJsonDataWriter.close()

* fix: BigQueryTemplate now closes its BigQueryJsonDataWriter

* chore: add error logic coverage

* fix: use BigQueryException(String, Exception) when throwing.
  • Loading branch information
burkedavison committed Apr 30, 2024
1 parent f7c3710 commit 1e0c206
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public String getStreamName() {
}

@Override
public void close() throws Exception {
public void close() {
// Close the connection to the server.
streamWriter.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,58 +294,55 @@ public WriteApiResponse getWriteApiResponse(String tableName, InputStream jsonIn
TableName parentTable =
TableName.of(bigQuery.getOptions().getProjectId(), datasetName, tableName);

// Initialize a write stream for the specified table.
BigQueryJsonDataWriter writer = getBigQueryJsonDataWriter(parentTable);

try {
// Write data in batches. Ref: https://cloud.google.com/bigquery/quotas#write-api-limits
long offset = 0;
int currentBatchSize = 0;

BufferedReader jsonReader = new BufferedReader(new InputStreamReader(jsonInputStream));
String jsonLine = null;
JSONArray jsonBatch = new JSONArray();
while ((jsonLine = jsonReader.readLine()) != null) { // read the input stream line by line
JSONObject jsonObj = new JSONObject(jsonLine); // cast the JSON string into JSON Object
jsonBatch.put(jsonObj);
currentBatchSize++;
if (currentBatchSize
== getBatchSize()) { // append the batch, increment the offset and reset
// the batch
try (BigQueryJsonDataWriter writer = getBigQueryJsonDataWriter(parentTable)) {
try {
// Write data in batches. Ref: https://cloud.google.com/bigquery/quotas#write-api-limits
long offset = 0;
int currentBatchSize = 0;

BufferedReader jsonReader = new BufferedReader(new InputStreamReader(jsonInputStream));
String jsonLine = null;
JSONArray jsonBatch = new JSONArray();
while ((jsonLine = jsonReader.readLine()) != null) { // read the inputstream line by line
JSONObject jsonObj = new JSONObject(jsonLine); // cast the JSON string into JSON Object
jsonBatch.put(jsonObj);
currentBatchSize++;
if (currentBatchSize == getBatchSize()) {
// append the batch, increment the offset and reset the batch
writer.append(jsonBatch, offset);
offset += jsonBatch.length();
jsonBatch = new JSONArray();
currentBatchSize = 0;
}
}

if (jsonBatch.length() != 0) {
// there might be records less than JSON_STREAM_WRITER_BATCH_SIZE, append those as well
writer.append(jsonBatch, offset);
offset += jsonBatch.length();
jsonBatch = new JSONArray();
currentBatchSize = 0;
}
}

if (jsonBatch.length()
!= 0) { // there might be records less than JSON_STREAM_WRITER_BATCH_SIZE, append those as
// well
writer.append(jsonBatch, offset);
} catch (Exception e) {
throw new BigQueryException("Failed to append records.", e);
}

} catch (Exception e) {
throw new BigQueryException("Failed to append records. \n" + e);
}
// Finalize the stream before committing it
writer.finalizeWriteStream();

// Finalize the stream before committing it
writer.finalizeWriteStream();
BatchCommitWriteStreamsResponse commitResponse = getCommitResponse(parentTable, writer);
// If the response does not have a commit time, it means the commit operation failed.
if (!commitResponse.hasCommitTime()) {
for (StorageError err : commitResponse.getStreamErrorsList()) {
apiResponse.addError(err); // this object is returned to the user
}
}

BatchCommitWriteStreamsResponse commitResponse = getCommitResponse(parentTable, writer);
// If the response does not have a commit time, it means the commit operation failed.
if (!commitResponse.hasCommitTime()) {
for (StorageError err : commitResponse.getStreamErrorsList()) {
apiResponse.addError(err); // this object is returned to the user
// set isSuccessful flag to true of there were no errors
if (apiResponse.getErrors().isEmpty()) {
apiResponse.setSuccessful(true);
}
}

// set isSuccessful flag to true of there were no errors
if (apiResponse.getErrors().isEmpty()) {
apiResponse.setSuccessful(true);
return apiResponse;
}

return apiResponse;
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.cloud.ServiceOptions;
Expand All @@ -43,6 +44,7 @@
import com.google.cloud.bigquery.spi.v2.BigQueryRpc;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.StorageError;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.spring.bigquery.core.BigQueryJsonDataWriter;
import com.google.cloud.spring.bigquery.core.BigQueryTemplate;
Expand Down Expand Up @@ -333,4 +335,43 @@ void writeJsonStreamFailsOnGenericWritingException()
bqTemplateSpy.writeJsonStream(TABLE, jsonInputStream, getDefaultSchema());
assertThat(futRes).withFailMessage("boom!").failsWithin(Duration.ofSeconds(1));
}

@Test
void testWriterAppendsErrors() throws Exception {
BigQueryJsonDataWriter writer = mock(BigQueryJsonDataWriter.class);
doReturn(writer)
.when(bqTemplateSpy)
.getBigQueryJsonDataWriter(any(TableName.class));

StorageError storageError = StorageError.newBuilder().build();
doReturn(BatchCommitWriteStreamsResponse.getDefaultInstance()
.toBuilder().clearCommitTime().addStreamErrors(storageError).build())
.when(bqTemplateSpy)
.getCommitResponse(any(TableName.class), any(BigQueryJsonDataWriter.class));

WriteApiResponse apiRes = bqTemplateSpy.getWriteApiResponse(
TABLE,
new ByteArrayInputStream(newLineSeperatedJson.getBytes()));

assertThat(apiRes.isSuccessful()).isFalse();
assertThat(apiRes.getErrors()).contains(storageError);
}

@Test
void testWriterIsClosed() throws Exception {
BigQueryJsonDataWriter writer = mock(BigQueryJsonDataWriter.class);
doReturn(writer)
.when(bqTemplateSpy)
.getBigQueryJsonDataWriter(any(TableName.class));

doReturn(BatchCommitWriteStreamsResponse.getDefaultInstance())
.when(bqTemplateSpy)
.getCommitResponse(any(TableName.class), any(BigQueryJsonDataWriter.class));

WriteApiResponse apiRes = bqTemplateSpy.getWriteApiResponse(
TABLE,
new ByteArrayInputStream(newLineSeperatedJson.getBytes()));

verify(writer).close();
}
}

0 comments on commit 1e0c206

Please sign in to comment.