New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Prevent watch stream from emitting events after close. #1471
fix: Prevent watch stream from emitting events after close. #1471
Conversation
@@ -916,7 +916,7 @@ private void sendBatchLocked(final BulkCommitBatch batch, final boolean flush) { | |||
bulkWriterExecutor); | |||
} else { | |||
long delayMs = rateLimiter.getNextRequestDelayMs(batch.getMutationsSize()); | |||
logger.log(Level.FINE, String.format("Backing off for %d seconds", delayMs / 1000)); | |||
logger.log(Level.FINE, () -> String.format("Backing off for %d seconds", delayMs / 1000)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't directly related to PR. But since I was adding logging...
I noticed this log was building a string, even if log level is not fine. A lambda allows lazy evaluation, such that production environments don't suffer work when logging is set to a more coarse level.
@@ -84,6 +86,10 @@ | |||
|
|||
@RunWith(MockitoJUnitRunner.class) | |||
public class WatchTest { | |||
|
|||
@Rule | |||
public Timeout timeout = new Timeout(1, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While working through problem, I introduced bug that made unit tests hang. To make sure tests completed, I added timeout to fail test if they don't run within 1 second.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also look into if we can add unit tests for the new stream class.
import java.util.function.Function; | ||
import java.util.logging.Logger; | ||
|
||
final class SuppressibleBidiStream<RequestT, ResponseT> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a class level comment to explain what this class does and why it is neccessary.
|
||
private final ClientStream<ListenRequest> stream; | ||
private final BidiStreamObserver<RequestT, ResponseT> delegate; | ||
private boolean silence = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the class is now call Suppressbile..
, this should be isSuppressed
?
stream.closeSend(); | ||
} | ||
|
||
public void closeAndSilence() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, use closeAndSuppress
instead.
@Override | ||
public void onReady(ClientStream<RequestT> stream) { | ||
if (silence) { | ||
LOGGER.info(() -> String.format("Silenced: %s", stream)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same for the log message.
if (listenResponse.getFilter().getCount() != currentSize()) { | ||
int filterCount = listenResponse.getFilter().getCount(); | ||
int currentSize = currentSize(); | ||
if (filterCount != currentSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inlining currentSize()
seems better IMO?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currentSize()
hides a lot of computation:
private int currentSize() {
ChangeSet changeSet = extractChanges(Timestamp.now());
return documentSet.size() + changeSet.adds.size() - changeSet.deletes.size();
}
private ChangeSet extractChanges(Timestamp readTime) {
ChangeSet changeSet = new ChangeSet();
for (Entry<ResourcePath, Document> change : changeMap.entrySet()) {
if (change.getValue() == null) {
if (documentSet.contains(change.getKey())) {
changeSet.deletes.add(documentSet.getDocument(change.getKey()));
}
continue;
}
QueryDocumentSnapshot snapshot =
QueryDocumentSnapshot.fromDocument(firestore, readTime, change.getValue());
if (documentSet.contains(change.getKey())) {
changeSet.updates.add(snapshot);
} else {
changeSet.adds.add(snapshot);
}
}
return changeSet;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I meant if (filterCount != currentSize())
listener.eventsCountDownLatch.awaitInitialEvents(); | ||
listener | ||
.assertionsForLastEvent() | ||
.noError() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add more comments explaining why we have this test and what would happen if SuppressiableStream is not used.
|
||
// A race condition will sometimes throw an error if the SuppressibleBidiStream does not | ||
// silence the old stream. This can be caused by `Preconditions.checkState(stream == null)` | ||
// in Watch class. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explanation added here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor nits, but LGTM in general, thanks!
* allowed to send `onError` or `onComplete` since that would signal the downstream that the stream | ||
* is finished. | ||
* | ||
* @param <RequestT> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't think these @param
s are useful here.
if (listenResponse.getFilter().getCount() != currentSize()) { | ||
int filterCount = listenResponse.getFilter().getCount(); | ||
int currentSize = currentSize(); | ||
if (filterCount != currentSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I meant if (filterCount != currentSize())
|
||
@Mock ClientStream<Integer> mockClientStream; | ||
|
||
SilenceableBidiStream<Integer, String> sut; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does sut
stand for here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
System Under Test
b/288270811