Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: Rework retry memory leak fix in https://github.com/grpc/grpc-ja… #10209

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

jtk54
Copy link

@jtk54 jtk54 commented May 21, 2023

…va/pull/9360 to send fewer FIN packets.

Without the explicit stream flush after writes in RetriableStream, buffer data is being orphaned on RetriableStream cancellation since calls to writeRaw() can happen after dispose() calls. We should verify the framer is not closed before continuing with writes since writes and dispose can be interleaved.

@linux-foundation-easycla
Copy link

linux-foundation-easycla bot commented May 21, 2023

CLA Signed

The committers listed above are authorized under a signed CLA.

  • ✅ login: jtk54 / name: Jacob Kiefer (84cbeec)

@jtk54
Copy link
Author

jtk54 commented May 21, 2023

@ejona86 PTAL.

@jtk54 jtk54 force-pushed the retriable-stream-leak branch 2 times, most recently from 5bcde63 to e3a54b4 Compare May 21, 2023 03:19
@jtk54 jtk54 force-pushed the retriable-stream-leak branch 2 times, most recently from daec1df to f66e8df Compare May 24, 2023 18:42
@jtk54
Copy link
Author

jtk54 commented May 25, 2023

Friendly ping -- test coverage is fixed.

Copy link
Member

@ejona86 ejona86 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you investigated an AbstractClientStream test? You'd have an allocator that tracks the buffers created, and extend ByteWritableBuffer to track whether it has been closed.

private int writeUncompressed(InputStream message, int messageLength) throws IOException {
@VisibleForTesting
int writeUncompressed(InputStream message, int messageLength) throws IOException {
if (isClosed()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you making this change? I have already mentioned we have essentially this same thing in AbstractStream. What value does it add to have it here too?

You add it to writeRaw(), which is only called from writeKnownLengthUncompressed() which already has the isClosed() check. What is the objective?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was trying to avoid the allocation in https://github.com/grpc/grpc-java/blob/master/core/src/main/java/io/grpc/internal/MessageFramer.java#L230 which is probably insufficient. I moved all the early exit logic into writeRaw() now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But why do you need the check there at all? I pointed out there is already a check. I don't understand what your goal is here. Are you trying to deal with the cross-thread race?

@jtk54
Copy link
Author

jtk54 commented May 26, 2023

Added a new cancel test to AbstractClientStream and tried to clean up the approach to make more sense.

@jtk54 jtk54 force-pushed the retriable-stream-leak branch 2 times, most recently from df1fcdc to 82c2f0e Compare May 26, 2023 19:27
public void writeRawNoopIfDisposed() {
byte[] bytes = new byte[]{3, 14};
framer.dispose();
framer.writeRaw(bytes, 0, bytes.length);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't dig this deep into the implementation. Just call one of the public methods.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that since the public method is going to check for being closed you'll need to start the write, have the bufferAllocator wait, call dispose(), wake up the bufferAllocator and check that it stops the next time around.

private int writeUncompressed(InputStream message, int messageLength) throws IOException {
@VisibleForTesting
int writeUncompressed(InputStream message, int messageLength) throws IOException {
if (isClosed()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But why do you need the check there at all? I pointed out there is already a check. I don't understand what your goal is here. Are you trying to deal with the cross-thread race?

@jtk54
Copy link
Author

jtk54 commented May 26, 2023

Are you trying to deal with the cross-thread race?

Yes. Removing the explicit stream.flush() and only adding the framer.dispose() to stream.cancel() is not sufficient to fix the leak due to orphaned messages in this code path. framer.writeRaw() calls happen after framer.dispose() in the logs in the repro (#9340), and allocated buffers are orphaned and the leak reproduces regardless of the AbstractStream check if this logic is removed. The allocations in MessageFramer do not check if the framer is closed, only if the buffer is null (example: https://github.com/grpc/grpc-java/blob/master/core/src/main/java/io/grpc/internal/MessageFramer.java#L230), which is why I think the leak happens.

@ejona86
Copy link
Member

ejona86 commented May 30, 2023

Sending my comment as it is right now. I think the case you've narrowed this down to is "a problem," and I'm not quite sure how best to address it.

The allocations in MessageFramer do not check if the framer is closed, only if the buffer is null

They do check, only earlier. It just isn't thread-safe. But the code isn't thread-safe, so I think you are explaining that you aren't solving the problem and instead making it less likely.

If we would go this route, then that would need a huge big comment to describe what we're trying to solve, because the code right now looks unnecessary.

I think the important part is "why is retry involved?" Not even hedging; just plain RetriableStream, and not even when retrying. I think that's because we commit the noop stream, which prevents the flush from being delivered.

In the non-RetriableStream case, the cancellation doesn't impact the framer at all, which will be flushed appropriately by the application thread. For retries, we could commit an in-progress stream (if there is one) instead of noop, which would allow the application thread to flush. But that wouldn't work for hedging.

I'm a even considering not committing. That would cause the flush()s to still propagate to all the streams, on the proper threads. But it also doesn't solve the case when one hedge is committed so others are cancelled.

@jtk54
Copy link
Author

jtk54 commented May 31, 2023

so I think you are explaining that you aren't solving the problem and instead making it less likely.

Yeah, agreed. It's probably better to address this a different way. I'm going to familiarize myself with the options you are suggesting + hedging and see if I can figure something out. If that doesn't turn up a solution I will reach out to discuss further.

@ejona86
Copy link
Member

ejona86 commented May 31, 2023

Thinking more about this today, assuming the change isn't really annoying/hard/complex, a "quick fix" for retries-only may be okay. We can leave the "always flush after write" for hedging. That approach would at least let us fix the readily solvable cases while not harming hedging any more that it is now.

Runnable runnable = commit(substream);
if (runnable != null) {
runnable.run();
// substream.stream.cancel(reason);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm at a bit of a loss as to how to proceed here.

This is an attempt at committing an in-progress stream (please correct if it's wrong) rather than the noop stream for non-hedging retries. Committing an in-progress stream seems to prevent the committed stream from being cancelled, which breaks the contract with ManagedChannelImpl. Adding the cancel after the commit (i.e. uncommenting this line) fixes the contract but prevents the flush and causes the buffer leak.

Am I missing something about the usage here, or is this not possible without a larger rework for retriable streams? Thanks for the patience in any case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the cancel after the commit (i.e. uncommenting this line) fixes the contract but prevents the flush and causes the buffer leak.

Only because you added framer().dispose(); to cancel. If we keep the "cancel doesn't impact the framer," then this would still work.

I know it seems really strange that we can avoid a memory leak by not closing a resource. But we're essentially modeling this as "that's the other thread's problem," which has some wisdom in it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I tried removing the framer().dispose() from cancel but the leak still occurs if we keep the stream.cancel() but we break the ManagedChannelImpl + DelayedClientTransport contract if we omit it, i.e. ./gradlew :grpc-core:test --tests io.grpc.internal.ManagedChannelImplIdlenessTest.newCallRefreshesIdlenessTimer. To be clear: we should be cancelling the committed stream here, right?

Do I need to move the framer().dispose() call to another location to flush and free the buffer if we keep the cancel?

return;
}
} else {
Optional<Substream> substreamToCommit;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't use optional. We support Android versions that don't have it. Ditto to stream(). We'd use Iterables.getFirst(state.drainedSubstreams, null). Animalsniffer should catch these things.

Runnable runnable = commit(substream);
if (runnable != null) {
runnable.run();
// substream.stream.cancel(reason);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the cancel after the commit (i.e. uncommenting this line) fixes the contract but prevents the flush and causes the buffer leak.

Only because you added framer().dispose(); to cancel. If we keep the "cancel doesn't impact the framer," then this would still work.

I know it seems really strange that we can avoid a memory leak by not closing a resource. But we're essentially modeling this as "that's the other thread's problem," which has some wisdom in it.

if (runnable != null) {
synchronized (lock) {
state = state.substreamDrained(noopSubstream);
// Handle everything the same way, e.g. isHedging vs not.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This strategy plus attempting to brute-force flush in drain() (even non-conditionally as the code is now) is still apparently not enough to fix the leak and get rid of the explicit flush after writes. I think this is a reasonable attempt at what we discussed offline, am I missing anything obvious?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants