New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BufferTimeout with fair backpressure rework #3634
base: 3.5.x
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue sits deeper and as I suggested first requires complete rework.
To reproduce the issue try to modify discard setup as follows:
@Tag("slow")
public class OnDiscardShouldNotLeakTest {
private static final int NB_ITERATIONS = 100_000;
// add DiscardScenarios here to test more operators
private static final DiscardScenario[] SCENARIOS = new DiscardScenario[] {
DiscardScenario.fluxSource("bufferTimeout", f -> f.bufferTimeout(2, Duration.ofNanos(1), true).flatMapIterable(Function.identity())),
};
...
the problem appears when multiple threads comes into play. If you have race between scheduled timeout task trying to flash the window and cancellation ,then some elements might be undischarged
Although the above could be false negative, since the async task can be unawaited properly. Can you please doublecheck @chemicL |
I just remembered that it was one of the reasons why all the discard tests are in stress tests, therefore it could make sense to port part of them for bufferTimeout |
@OlegDokuka the commit 0334959 adds significant improvements to the test suite that helps catch the racy situations. In the next commit I will add temporary fixes for the identified issues, but afterwards will follow up with a state machine implementation to eliminate the last one with timeout racing with draining. |
The fair backpressure variant of the
bufferTimeout
operator has been reworked to use a state machine with a minimum number of volatile variables eliminating potential data races, such as skipping the delivery whenonNext
and timeout happen concurrently or cancellation happens whileonNext
is delivered, etc.Resolves #3531