Skip to content

Commit

Permalink
feat: add support to a few more specific StorageErrors for the Write …
Browse files Browse the repository at this point in the history
…API (#1563)

* feat: add support to a few more specific StorageErrors for the Write API

OFFSET_OUT_OF_RANGE
OFFSET_ALREADY_EXISTS
STREAM_NOT_FOUND

Towards b/220198094

* add integration test for streamNotFound

* add integration test for streamNotFound

* add more changes since backend changes have rolled out

* update clirr-ignored-differences.xml

* update IT

* nit

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* add error message match pattern

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
stephaniewang526 and gcf-owl-bot[bot] committed May 5, 2022
1 parent 60ed6b0 commit c26091e
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 16 deletions.
10 changes: 4 additions & 6 deletions google-cloud-bigquerystorage/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<difference>
<differenceType>7005</differenceType>
<differenceType>7004</differenceType>
<className>com/google/cloud/bigquery/storage/v1/Exceptions$SchemaMismatchedException</className>
<method>Exceptions$SchemaMismatchedException(java.lang.String, java.lang.String, java.lang.Throwable)</method>
<to>Exceptions$SchemaMismatchedException(io.grpc.Status, io.grpc.Metadata, java.lang.String)</to>
<method>Exceptions$SchemaMismatchedException(io.grpc.Status, io.grpc.Metadata, java.lang.String)</method>
</difference>
<difference>
<differenceType>7005</differenceType>
<differenceType>7004</differenceType>
<className>com/google/cloud/bigquery/storage/v1/Exceptions$StreamFinalizedException</className>
<method>Exceptions$StreamFinalizedException(java.lang.String, java.lang.String, java.lang.Throwable)</method>
<to>Exceptions$StreamFinalizedException(io.grpc.Status, io.grpc.Metadata, java.lang.String)</to>
<method>Exceptions$StreamFinalizedException(io.grpc.Status, io.grpc.Metadata, java.lang.String)</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.protobuf.StatusProto;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;

/** Exceptions for Storage Client Libraries. */
Expand All @@ -37,18 +38,23 @@ public static class StorageException extends StatusRuntimeException {

private final ImmutableMap<String, GrpcStatusCode> errors;
private final String streamName;
private final Long expectedOffset;
private final Long actualOffset;

private StorageException() {
this(null, null, null, ImmutableMap.of());
this(null, null, null, null, ImmutableMap.of());
}

private StorageException(
@Nullable Status grpcStatus,
@Nullable Metadata metadata,
@Nullable String streamName,
@Nullable Long expectedOffset,
@Nullable Long actualOffset,
ImmutableMap<String, GrpcStatusCode> errors) {
super(grpcStatus, metadata);
super(grpcStatus);
this.streamName = streamName;
this.expectedOffset = expectedOffset;
this.actualOffset = actualOffset;
this.errors = errors;
}

Expand All @@ -59,12 +65,20 @@ public ImmutableMap<String, GrpcStatusCode> getErrors() {
public String getStreamName() {
return streamName;
}

public long getExpectedOffset() {
return expectedOffset;
}

public long getActualOffset() {
return actualOffset;
}
}

/** Stream has already been finalized. */
public static final class StreamFinalizedException extends StorageException {
protected StreamFinalizedException(Status grpcStatus, Metadata metadata, String name) {
super(grpcStatus, metadata, name, ImmutableMap.of());
protected StreamFinalizedException(Status grpcStatus, String name) {
super(grpcStatus, name, null, null, ImmutableMap.of());
}
}

Expand All @@ -73,8 +87,31 @@ protected StreamFinalizedException(Status grpcStatus, Metadata metadata, String
* This can be resolved by updating the table's schema with the message schema.
*/
public static final class SchemaMismatchedException extends StorageException {
protected SchemaMismatchedException(Status grpcStatus, Metadata metadata, String name) {
super(grpcStatus, metadata, name, ImmutableMap.of());
protected SchemaMismatchedException(Status grpcStatus, String name) {
super(grpcStatus, name, null, null, ImmutableMap.of());
}
}

/** Offset already exists. */
public static final class OffsetAlreadyExists extends StorageException {
protected OffsetAlreadyExists(
Status grpcStatus, String name, Long expectedOffset, Long actualOffset) {
super(grpcStatus, name, expectedOffset, actualOffset, ImmutableMap.of());
}
}

/** Offset out of range. */
public static final class OffsetOutOfRange extends StorageException {
protected OffsetOutOfRange(
Status grpcStatus, String name, Long expectedOffset, Long actualOffset) {
super(grpcStatus, name, expectedOffset, actualOffset, ImmutableMap.of());
}
}

/** Stream is not found. */
public static final class StreamNotFound extends StorageException {
protected StreamNotFound(Status grpcStatus, String name) {
super(grpcStatus, name, null, null, ImmutableMap.of());
}
}

Expand Down Expand Up @@ -106,12 +143,48 @@ public static StorageException toStorageException(
if (error == null) {
return null;
}
String streamName = error.getEntity();
// The error message should have Entity but it's missing from the message for
// OFFSET_ALREADY_EXISTS
// TODO: Simplify the logic below when backend fixes passing Entity for OFFSET_ALREADY_EXISTS
// error
String errorMessage =
error.getErrorMessage().indexOf("Entity") > 0
? error.getErrorMessage().substring(0, error.getErrorMessage().indexOf("Entity")).trim()
: error.getErrorMessage().trim();

// Ensure that erro message has the desirable pattern for parsing
String errormessagePatternString = "expected offset [0-9]+, received [0-9]+";
Pattern errorMessagePattern = Pattern.compile(errormessagePatternString);
Matcher errorMessageMatcher = errorMessagePattern.matcher(errorMessage);

Long expectedOffet;
Long actualOffset;
if (!errorMessageMatcher.find()) {
expectedOffet = -1L;
actualOffset = -1L;
} else {
expectedOffet =
Long.parseLong(
errorMessage.substring(
errorMessage.lastIndexOf("offset") + 7, errorMessage.lastIndexOf(",")));
actualOffset = Long.parseLong(errorMessage.substring(errorMessage.lastIndexOf(" ") + 1));
}
switch (error.getCode()) {
case STREAM_FINALIZED:
return new StreamFinalizedException(grpcStatus, null, error.getEntity());
return new StreamFinalizedException(grpcStatus, streamName);

case STREAM_NOT_FOUND:
return new StreamNotFound(grpcStatus, streamName);

case SCHEMA_MISMATCH_EXTRA_FIELDS:
return new SchemaMismatchedException(grpcStatus, null, error.getEntity());
return new SchemaMismatchedException(grpcStatus, streamName);

case OFFSET_OUT_OF_RANGE:
return new OffsetOutOfRange(grpcStatus, streamName, expectedOffet, actualOffset);

case OFFSET_ALREADY_EXISTS:
return new OffsetAlreadyExists(grpcStatus, streamName, expectedOffet, actualOffset);

default:
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.storage.test.Test.*;
import com.google.cloud.bigquery.storage.v1.*;
import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetAlreadyExists;
import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetOutOfRange;
import com.google.cloud.bigquery.storage.v1.Exceptions.SchemaMismatchedException;
import com.google.cloud.bigquery.storage.v1.Exceptions.StreamFinalizedException;
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
Expand Down Expand Up @@ -901,6 +903,75 @@ public void testStreamFinalizedError()
}
}

@Test
public void testOffsetAlreadyExistsError()
throws IOException, ExecutionException, InterruptedException {
WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(tableId)
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());
try (StreamWriter streamWriter =
StreamWriter.newBuilder(writeStream.getName())
.setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor()))
.build()) {
// Append once with correct offset
ApiFuture<AppendRowsResponse> response =
streamWriter.append(CreateProtoRowsMultipleColumns(new String[] {"a"}), /*offset=*/ 0);
response.get();
// Append again with the same offset
ApiFuture<AppendRowsResponse> response2 =
streamWriter.append(CreateProtoRowsMultipleColumns(new String[] {"a"}), /*offset=*/ 0);
try {
response2.get();
Assert.fail("Should fail");
} catch (ExecutionException e) {
assertEquals(Exceptions.OffsetAlreadyExists.class, e.getCause().getClass());
Exceptions.OffsetAlreadyExists actualError = (OffsetAlreadyExists) e.getCause();
assertNotNull(actualError.getStreamName());
assertEquals(1, actualError.getExpectedOffset());
assertEquals(0, actualError.getActualOffset());
assertEquals(Code.ALREADY_EXISTS, Status.fromThrowable(e.getCause()).getCode());
assertThat(e.getCause().getMessage())
.contains("The offset is within stream, expected offset 1, received 0");
}
}
}

@Test
public void testOffsetOutOfRangeError() throws IOException, InterruptedException {
WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(tableId)
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());
try (StreamWriter streamWriter =
StreamWriter.newBuilder(writeStream.getName())
.setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor()))
.build()) {
// Append with an out of range offset
ApiFuture<AppendRowsResponse> response =
streamWriter.append(CreateProtoRowsMultipleColumns(new String[] {"a"}), /*offset=*/ 10);
try {
response.get();
Assert.fail("Should fail");
} catch (ExecutionException e) {
assertEquals(Exceptions.OffsetOutOfRange.class, e.getCause().getClass());
Exceptions.OffsetOutOfRange actualError = (OffsetOutOfRange) e.getCause();
assertNotNull(actualError.getStreamName());
assertEquals(0, actualError.getExpectedOffset());
assertEquals(10, actualError.getActualOffset());
assertEquals(Code.OUT_OF_RANGE, Status.fromThrowable(e.getCause()).getCode());
assertThat(e.getCause().getMessage())
.contains("The offset is beyond stream, expected offset 0, received 10");
}
}
}

@Test
public void testStreamReconnect() throws IOException, InterruptedException, ExecutionException {
WriteStream writeStream =
Expand Down

0 comments on commit c26091e

Please sign in to comment.