Skip to content

Commit

Permalink
fix: catch all throwables so version mismatch won't hang the client (#…
Browse files Browse the repository at this point in the history
…1402)

* fix: catch all throwables so version mismatch won't hang the client

* create a SafeResponseObserver

* format

* extend SafeResponseObserver

* catch stream cancellation

* update error log

* update

* throw on onStart

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix version

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
mutianf and gcf-owl-bot[bot] committed Oct 7, 2022
1 parent e62e91b commit c03b8a4
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 19 deletions.
7 changes: 6 additions & 1 deletion google-cloud-bigtable/clirr-ignored-differences.xml
Expand Up @@ -49,7 +49,7 @@
<differenceType>8001</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracer$Builder</className>
</difference>
change method args is ok because HeaderTracerStreamingCallable is InternalApi
<!-- change method args is ok because HeaderTracerStreamingCallable is InternalApi -->
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerStreamingCallable</className>
Expand All @@ -76,4 +76,9 @@
<differenceType>8001</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsConvertExceptionCallable</className>
</difference>
<difference>
<differenceType>5001</differenceType>
<className>com/google/cloud/bigtable/gaxx/reframing/ReframingResponseObserver</className>
<to>com/google/api/gax/rpc/StateCheckingResponseObserver</to>
</difference>
</differences>
Expand Up @@ -42,31 +42,32 @@ public void call(
innerCallable.call(request, observer, context);
}

private class ReadRowsConvertExceptionResponseObserver<RowT> implements ResponseObserver<RowT> {
private class ReadRowsConvertExceptionResponseObserver<RowT> extends SafeResponseObserver<RowT> {

private final ResponseObserver<RowT> outerObserver;

ReadRowsConvertExceptionResponseObserver(ResponseObserver<RowT> outerObserver) {
super(outerObserver);
this.outerObserver = outerObserver;
}

@Override
public void onStart(StreamController controller) {
protected void onStartImpl(StreamController controller) {
outerObserver.onStart(controller);
}

@Override
public void onResponse(RowT response) {
protected void onResponseImpl(RowT response) {
outerObserver.onResponse(response);
}

@Override
public void onError(Throwable t) {
protected void onErrorImpl(Throwable t) {
outerObserver.onError(convertException(t));
}

@Override
public void onComplete() {
protected void onCompleteImpl() {
outerObserver.onComplete();
}
}
Expand Down
@@ -0,0 +1,123 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StreamController;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Base implementation of {@link ResponseObserver} that checks the state and catches all the
* throwables.
*/
@InternalApi
public abstract class SafeResponseObserver<ResponseT> implements ResponseObserver<ResponseT> {

private static final Logger LOGGER = Logger.getLogger(SafeResponseObserver.class.getName());
private AtomicBoolean isStarted = new AtomicBoolean(false);
private AtomicBoolean isClosed = new AtomicBoolean(false);
private StreamController streamController;
private ResponseObserver outerObserver;

public SafeResponseObserver(ResponseObserver outerObserver) {
this.outerObserver = outerObserver;
}

@Override
public final void onStart(StreamController streamController) {
if (!isStarted.compareAndSet(false, true)) {
throw new IllegalStateException("A stream is already started");
}

this.streamController = streamController;
try {
onStartImpl(streamController);
} catch (Throwable t) {
if (!isClosed.compareAndSet(false, true)) {
logException("Tried to cancel a closed stream");
return;
}
streamController.cancel();
outerObserver.onError(t);
}
}

@Override
public final void onResponse(ResponseT response) {
if (isClosed.get()) {
logException("Received a response after the stream is closed");
return;
}
try {
onResponseImpl(response);
} catch (Throwable t1) {
try {
if (!isClosed.compareAndSet(false, true)) {
logException("Tried to cancel a closed stream");
return;
}
streamController.cancel();
} catch (Throwable t2) {
t1.addSuppressed(t2);
}
outerObserver.onError(t1);
}
}

@Override
public final void onError(Throwable throwable) {
if (!isClosed.compareAndSet(false, true)) {
logException("Received error after the stream is closed");
return;
}

try {
onErrorImpl(throwable);
} catch (Throwable t) {
throwable.addSuppressed(t);
outerObserver.onError(throwable);
}
}

@Override
public final void onComplete() {
if (!isClosed.compareAndSet(false, true)) {
logException("Tried to double close the stream");
return;
}

try {
onCompleteImpl();
} catch (Throwable t) {
outerObserver.onError(t);
}
}

private void logException(String message) {
LOGGER.log(Level.WARNING, message, new IllegalStateException(message));
}

protected abstract void onStartImpl(StreamController streamController);

protected abstract void onResponseImpl(ResponseT response);

protected abstract void onErrorImpl(Throwable throwable);

protected abstract void onCompleteImpl();
}
Expand Up @@ -22,6 +22,7 @@
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StreamController;
import com.google.bigtable.v2.ResponseParams;
import com.google.cloud.bigtable.data.v2.stub.SafeResponseObserver;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.protobuf.InvalidProtocolBufferException;
Expand Down Expand Up @@ -68,7 +69,7 @@ public void call(
}
}

private class BigtableTracerResponseObserver<ResponseT> implements ResponseObserver<ResponseT> {
private class BigtableTracerResponseObserver<ResponseT> extends SafeResponseObserver<ResponseT> {

private final BigtableTracer tracer;
private final ResponseObserver<ResponseT> outerObserver;
Expand All @@ -78,26 +79,28 @@ private class BigtableTracerResponseObserver<ResponseT> implements ResponseObser
ResponseObserver<ResponseT> observer,
BigtableTracer tracer,
GrpcResponseMetadata metadata) {
super(observer);

this.tracer = tracer;
this.outerObserver = observer;
this.responseMetadata = metadata;
}

@Override
public void onStart(final StreamController controller) {
protected void onStartImpl(final StreamController controller) {
TracedStreamController tracedController = new TracedStreamController(controller, tracer);
outerObserver.onStart(tracedController);
}

@Override
public void onResponse(ResponseT response) {
protected void onResponseImpl(ResponseT response) {
Stopwatch stopwatch = Stopwatch.createStarted();
outerObserver.onResponse(response);
tracer.afterResponse(stopwatch.elapsed(TimeUnit.MILLISECONDS));
}

@Override
public void onError(Throwable t) {
protected void onErrorImpl(Throwable t) {
// server-timing metric will be added through GrpcResponseMetadata#onHeaders(Metadata),
// so it's not checking trailing metadata here.
Metadata metadata = responseMetadata.getMetadata();
Expand All @@ -122,13 +125,14 @@ public void onError(Throwable t) {
}
}
} catch (InvalidProtocolBufferException e) {
t.addSuppressed(t);
}

outerObserver.onError(t);
}

@Override
public void onComplete() {
protected void onCompleteImpl() {
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, null);
Expand All @@ -151,6 +155,9 @@ public void onComplete() {
}
}
} catch (InvalidProtocolBufferException e) {
// InvalidProtocolBufferException will only throw if something changed on
// the server side. Location info won't be populated as a result. Ignore
// this error and don't bubble it up to user.
}

outerObserver.onComplete();
Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.google.api.gax.rpc.StreamController;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.cloud.bigtable.data.v2.models.RowAdapter;
import com.google.cloud.bigtable.data.v2.stub.SafeResponseObserver;

/**
* Remove the special marker rows generated by {@link RowMergingCallable}.
Expand All @@ -47,17 +48,18 @@ public void call(
innerCallable.call(request, innerObserver, context);
}

private class FilteringResponseObserver implements ResponseObserver<RowT> {
private class FilteringResponseObserver extends SafeResponseObserver<RowT> {
private final ResponseObserver<RowT> outerObserver;
private StreamController innerController;
private boolean autoFlowControl = true;

FilteringResponseObserver(ResponseObserver<RowT> outerObserver) {
super(outerObserver);
this.outerObserver = outerObserver;
}

@Override
public void onStart(final StreamController controller) {
protected void onStartImpl(final StreamController controller) {
innerController = controller;

outerObserver.onStart(
Expand All @@ -81,7 +83,7 @@ public void request(int count) {
}

@Override
public void onResponse(RowT response) {
protected void onResponseImpl(RowT response) {
if (rowAdapter.isScanMarkerRow(response)) {
if (!autoFlowControl) {
innerController.request(1);
Expand All @@ -92,12 +94,12 @@ public void onResponse(RowT response) {
}

@Override
public void onError(Throwable t) {
protected void onErrorImpl(Throwable t) {
outerObserver.onError(t);
}

@Override
public void onComplete() {
protected void onCompleteImpl() {
outerObserver.onComplete();
}
}
Expand Down
Expand Up @@ -17,8 +17,8 @@

import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StateCheckingResponseObserver;
import com.google.api.gax.rpc.StreamController;
import com.google.cloud.bigtable.data.v2.stub.SafeResponseObserver;
import com.google.common.base.Preconditions;
import com.google.common.math.IntMath;
import java.util.concurrent.CancellationException;
Expand Down Expand Up @@ -56,8 +56,7 @@
* }</pre>
*/
@InternalApi
public class ReframingResponseObserver<InnerT, OuterT>
extends StateCheckingResponseObserver<InnerT> {
public class ReframingResponseObserver<InnerT, OuterT> extends SafeResponseObserver<InnerT> {
// Used as a nonblocking mutex for deliver().
// 0 means unlocked
// 1 means locked without contention
Expand Down Expand Up @@ -97,6 +96,7 @@ public class ReframingResponseObserver<InnerT, OuterT>

public ReframingResponseObserver(
ResponseObserver<OuterT> observer, Reframer<OuterT, InnerT> reframer) {
super(observer);
this.outerResponseObserver = observer;
this.reframer = reframer;
}
Expand Down

0 comments on commit c03b8a4

Please sign in to comment.