Skip to content

Commit

Permalink
fix: Prevent watch stream from emitting events after close. (#1471)
Browse files Browse the repository at this point in the history
* Prevent watch stream from emitting events after close.

* Cleanup

* Copyright

* Make classes final and hide visibility.

* Add comments

* Hand roll spy

* Add unit test, rename class, add comments.

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Copyright

* Comment

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Comments

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
tom-andersen and gcf-owl-bot[bot] committed Nov 14, 2023
1 parent c1f478f commit ee3f8c0
Show file tree
Hide file tree
Showing 8 changed files with 542 additions and 45 deletions.
Expand Up @@ -916,7 +916,7 @@ private void sendBatchLocked(final BulkCommitBatch batch, final boolean flush) {
bulkWriterExecutor);
} else {
long delayMs = rateLimiter.getNextRequestDelayMs(batch.getMutationsSize());
logger.log(Level.FINE, String.format("Backing off for %d seconds", delayMs / 1000));
logger.log(Level.FINE, () -> String.format("Backing off for %d seconds", delayMs / 1000));
bulkWriterExecutor.schedule(
() -> {
synchronized (lock) {
Expand Down
@@ -0,0 +1,125 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.firestore;

import com.google.api.gax.rpc.BidiStreamObserver;
import com.google.api.gax.rpc.ClientStream;
import com.google.api.gax.rpc.StreamController;
import java.util.function.Function;
import java.util.logging.Logger;

/**
* Conditionally pass through callbacks to wrapped `BidiStreamObserver`.
*
* <p>Due to the asynchronous nature of a stream, there can be a delay between closing a stream and
* the upstream no longer sending responses. Receiving callbacks after closing upstream can have
* undesirable consequences.
*
* <p>The underlying `ClientStream` can be called through the `SilenceableBidiStream`. Methods such
* as `send()` and `closeSend()` are exposed.
*
* <p>The `SilenceableBidiStream` wraps a `BidiStreamObserver`. This is helpful for situations where
* the observer should be detached from a stream. Instead of calling the `closeSend()` method, the
* `closeSendAndSilence()` method will silence the stream by preventing further callbacks including
* `onError` and `onComplete`.
*
* <p>If silenced, the observer could be safely attached to a new stream. This is useful for error
* handling where upstream must be stopped, but a new stream can continue to service the observer.
* In these cases, the old stream cannot be allowed to send more responses, and especially cannot be
* allowed to send `onError` or `onComplete` since that would signal the downstream that the stream
* is finished.
*/
final class SilenceableBidiStream<RequestT, ResponseT>
implements BidiStreamObserver<RequestT, ResponseT> {

private final ClientStream<RequestT> stream;
private final BidiStreamObserver<RequestT, ResponseT> delegate;
private boolean silence = false;
private static final Logger LOGGER = Logger.getLogger(Watch.class.getName());

SilenceableBidiStream(
BidiStreamObserver<RequestT, ResponseT> responseObserverT,
Function<BidiStreamObserver<RequestT, ResponseT>, ClientStream<RequestT>> streamSupplier) {
this.delegate = responseObserverT;
stream = streamSupplier.apply(this);
}

public boolean isSilenced() {
return silence;
}

public void send(RequestT request) {
LOGGER.info(stream.toString());
stream.send(request);
}

public void closeSend() {
LOGGER.info(stream::toString);
stream.closeSend();
}

public void closeSendAndSilence() {
LOGGER.info(stream::toString);
silence = true;
stream.closeSend();
}

@Override
public void onReady(ClientStream<RequestT> stream) {
if (silence) {
LOGGER.info(() -> String.format("Silenced: %s", stream));
} else {
delegate.onReady(stream);
}
}

@Override
public void onStart(StreamController controller) {
if (silence) {
LOGGER.info(() -> String.format("Silenced: %s", stream));
} else {
delegate.onStart(controller);
}
}

@Override
public void onResponse(ResponseT response) {
if (silence) {
LOGGER.info(() -> String.format("Silenced: %s", stream));
} else {
delegate.onResponse(response);
}
}

@Override
public void onError(Throwable t) {
if (silence) {
LOGGER.info(() -> String.format("Silenced: %s", stream));
} else {
delegate.onError(t);
}
}

@Override
public void onComplete() {
if (silence) {
LOGGER.info(() -> String.format("Silenced: %s", stream));
} else {
delegate.onComplete();
}
}
}
Expand Up @@ -49,6 +49,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import javax.annotation.Nullable;

/**
Expand All @@ -59,7 +60,7 @@
* It synchronizes on its own instance so it is advisable not to use this class for external
* synchronization.
*/
class Watch implements BidiStreamObserver<ListenRequest, ListenResponse> {
final class Watch implements BidiStreamObserver<ListenRequest, ListenResponse> {
/**
* Target ID used by watch. Watch uses a fixed target id since we only support one target per
* stream. The actual target ID we use is arbitrary.
Expand All @@ -73,7 +74,7 @@ class Watch implements BidiStreamObserver<ListenRequest, ListenResponse> {
private final ExponentialRetryAlgorithm backoff;
private final Target target;
private TimedAttemptSettings nextAttempt;
private ClientStream<ListenRequest> stream;
private SilenceableBidiStream<ListenRequest, ListenResponse> stream;

/** The sorted tree of DocumentSnapshots as sent in the last snapshot. */
private DocumentSet documentSet;
Expand Down Expand Up @@ -115,6 +116,8 @@ static class ChangeSet {
List<QueryDocumentSnapshot> updates = new ArrayList<>();
}

private static final Logger LOGGER = Logger.getLogger(Watch.class.getName());

/**
* @param firestore The Firestore Database client.
* @param query The query that is used to order the document snapshots returned by this watch.
Expand Down Expand Up @@ -246,7 +249,16 @@ && affectsTarget(change.getTargetIdsList(), WATCH_TARGET_ID)) {
changeMap.put(ResourcePath.create(listenResponse.getDocumentRemove().getDocument()), null);
break;
case FILTER:
if (listenResponse.getFilter().getCount() != currentSize()) {
// Keep copy of counts for producing log message.
// The method currentSize() is computationally expensive, and should only be run once.
int filterCount = listenResponse.getFilter().getCount();
int currentSize = currentSize();
if (filterCount != currentSize) {
LOGGER.info(
() ->
String.format(
"filter: count mismatch filter count %d != current size %d",
filterCount, currentSize));
// We need to remove all the current results.
resetDocs();
// The filter didn't match, so re-issue the query.
Expand Down Expand Up @@ -318,7 +330,7 @@ private void resetDocs() {
resumeToken = null;

for (DocumentSnapshot snapshot : documentSet) {
// Mark each document as deleted. If documents are not deleted, they will be send again by
// Mark each document as deleted. If documents are not deleted, they will be sent again by
// the server.
changeMap.put(snapshot.getReference().getResourcePath(), null);
}
Expand All @@ -329,7 +341,7 @@ private void resetDocs() {
/** Closes the stream and calls onError() if the stream is still active. */
private void closeStream(final Throwable throwable) {
if (stream != null) {
stream.closeSend();
stream.closeSendAndSilence();
stream = null;
}

Expand Down Expand Up @@ -371,7 +383,7 @@ private void maybeReopenStream(Throwable throwable) {
/** Helper to restart the outgoing stream to the backend. */
private void resetStream() {
if (stream != null) {
stream.closeSend();
stream.closeSendAndSilence();
stream = null;
}

Expand All @@ -398,7 +410,12 @@ private void initStream() {
nextAttempt = backoff.createNextAttempt(nextAttempt);

Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_LISTEN);
stream = firestore.streamRequest(Watch.this, firestore.getClient().listenCallable());
stream =
new SilenceableBidiStream<>(
Watch.this,
observer ->
firestore.streamRequest(
observer, firestore.getClient().listenCallable()));

ListenRequest.Builder request = ListenRequest.newBuilder();
request.setDatabase(firestore.getDatabaseName());
Expand Down Expand Up @@ -459,6 +476,7 @@ private void pushSnapshot(final Timestamp readTime, ByteString nextResumeToken)
if (!hasPushed || !changes.isEmpty()) {
final QuerySnapshot querySnapshot =
QuerySnapshot.withChanges(query, readTime, documentSet, changes);
LOGGER.info(querySnapshot.toString());
userCallbackExecutor.execute(() -> listener.onEvent(querySnapshot, null));
hasPushed = true;
}
Expand Down
@@ -0,0 +1,40 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.firestore;

import com.google.api.gax.rpc.BidiStreamObserver;
import com.google.api.gax.rpc.BidiStreamingCallable;
import com.google.api.gax.rpc.ClientStream;

public final class FirestoreSpy {

public final FirestoreImpl spy;
public BidiStreamObserver streamRequestBidiStreamObserver;

public FirestoreSpy(FirestoreOptions firestoreOptions) {
spy =
new FirestoreImpl(firestoreOptions) {
@Override
public <RequestT, ResponseT> ClientStream<RequestT> streamRequest(
BidiStreamObserver<RequestT, ResponseT> responseObserverT,
BidiStreamingCallable<RequestT, ResponseT> callable) {
streamRequestBidiStreamObserver = responseObserverT;
return super.streamRequest(responseObserverT, callable);
}
};
}
}

0 comments on commit ee3f8c0

Please sign in to comment.