Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Prevent watch stream from emitting events after close. #1471

Merged
merged 12 commits into from Nov 14, 2023

Conversation

tom-andersen
Copy link
Contributor

@tom-andersen tom-andersen commented Nov 8, 2023

b/288270811

@product-auto-label product-auto-label bot added size: l Pull request size is large. api: firestore Issues related to the googleapis/java-firestore API. labels Nov 8, 2023
@@ -916,7 +916,7 @@ private void sendBatchLocked(final BulkCommitBatch batch, final boolean flush) {
bulkWriterExecutor);
} else {
long delayMs = rateLimiter.getNextRequestDelayMs(batch.getMutationsSize());
logger.log(Level.FINE, String.format("Backing off for %d seconds", delayMs / 1000));
logger.log(Level.FINE, () -> String.format("Backing off for %d seconds", delayMs / 1000));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't directly related to PR. But since I was adding logging...

I noticed this log was building a string, even if log level is not fine. A lambda allows lazy evaluation, such that production environments don't suffer work when logging is set to a more coarse level.

@@ -84,6 +86,10 @@

@RunWith(MockitoJUnitRunner.class)
public class WatchTest {

@Rule
public Timeout timeout = new Timeout(1, TimeUnit.SECONDS);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While working through problem, I introduced bug that made unit tests hang. To make sure tests completed, I added timeout to fail test if they don't run within 1 second.

@tom-andersen tom-andersen changed the title Prevent watch stream from emitting events after close. fix: Prevent watch stream from emitting events after close. Nov 8, 2023
@tom-andersen tom-andersen added the kokoro:force-run Add this label to force Kokoro to re-run the tests. label Nov 8, 2023
@yoshi-kokoro yoshi-kokoro removed the kokoro:force-run Add this label to force Kokoro to re-run the tests. label Nov 8, 2023
@tom-andersen tom-andersen marked this pull request as ready for review November 9, 2023 14:07
@tom-andersen tom-andersen requested a review from a team as a code owner November 9, 2023 14:07
Copy link
Contributor

@wu-hui wu-hui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also look into if we can add unit tests for the new stream class.

import java.util.function.Function;
import java.util.logging.Logger;

final class SuppressibleBidiStream<RequestT, ResponseT>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a class level comment to explain what this class does and why it is neccessary.


private final ClientStream<ListenRequest> stream;
private final BidiStreamObserver<RequestT, ResponseT> delegate;
private boolean silence = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the class is now call Suppressbile.., this should be isSuppressed?

stream.closeSend();
}

public void closeAndSilence() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, use closeAndSuppress instead.

@Override
public void onReady(ClientStream<RequestT> stream) {
if (silence) {
LOGGER.info(() -> String.format("Silenced: %s", stream));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for the log message.

if (listenResponse.getFilter().getCount() != currentSize()) {
int filterCount = listenResponse.getFilter().getCount();
int currentSize = currentSize();
if (filterCount != currentSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inlining currentSize() seems better IMO?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currentSize() hides a lot of computation:

  private int currentSize() {
    ChangeSet changeSet = extractChanges(Timestamp.now());
    return documentSet.size() + changeSet.adds.size() - changeSet.deletes.size();
  }
  private ChangeSet extractChanges(Timestamp readTime) {
    ChangeSet changeSet = new ChangeSet();

    for (Entry<ResourcePath, Document> change : changeMap.entrySet()) {
      if (change.getValue() == null) {
        if (documentSet.contains(change.getKey())) {
          changeSet.deletes.add(documentSet.getDocument(change.getKey()));
        }
        continue;
      }

      QueryDocumentSnapshot snapshot =
          QueryDocumentSnapshot.fromDocument(firestore, readTime, change.getValue());

      if (documentSet.contains(change.getKey())) {
        changeSet.updates.add(snapshot);
      } else {
        changeSet.adds.add(snapshot);
      }
    }

    return changeSet;
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I meant if (filterCount != currentSize())

listener.eventsCountDownLatch.awaitInitialEvents();
listener
.assertionsForLastEvent()
.noError()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add more comments explaining why we have this test and what would happen if SuppressiableStream is not used.


// A race condition will sometimes throw an error if the SuppressibleBidiStream does not
// silence the old stream. This can be caused by `Preconditions.checkState(stream == null)`
// in Watch class.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explanation added here.

@gcf-owl-bot gcf-owl-bot bot requested a review from a team as a code owner November 9, 2023 21:09
Copy link
Contributor

@wu-hui wu-hui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor nits, but LGTM in general, thanks!

* allowed to send `onError` or `onComplete` since that would signal the downstream that the stream
* is finished.
*
* @param <RequestT>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think these @params are useful here.

if (listenResponse.getFilter().getCount() != currentSize()) {
int filterCount = listenResponse.getFilter().getCount();
int currentSize = currentSize();
if (filterCount != currentSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I meant if (filterCount != currentSize())


@Mock ClientStream<Integer> mockClientStream;

SilenceableBidiStream<Integer, String> sut;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does sut stand for here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

System Under Test

@tom-andersen tom-andersen merged commit ee3f8c0 into main Nov 14, 2023
21 of 22 checks passed
@tom-andersen tom-andersen deleted the tomandersen/FixWatchFilterMismatchRetryLogic branch November 14, 2023 17:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: firestore Issues related to the googleapis/java-firestore API. size: l Pull request size is large.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants