Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drain SinkManyEmitterProcessor buffer after cancel #3789

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

bajibalu
Copy link

The internal buffer/queue in SinkManyEmitterProcessor will be drained after all the subscriptions are canceled.

As explained here the queue/buffer in SinkManyEmitterProcessor is not drained properly after the last subscriber canceled the subscription. This was happening due to the WIP marker is left in an unclean state. This PR fixes the issue by updating the WIP marker. I am not sure if this is the ideal approach though.

Fixes #3715

@@ -383,10 +383,12 @@ public Object scanUnsafe(Attr key) {
}

final void drain() {
if (WIP.getAndIncrement(this) != 0) {
if (WIP.get(this) != 0) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the cases where more than 1 thread calling the drain function, all the threads are incrementing the WIP counter while only one of them proceeds to clear the queue and decrements the counter. This leaves the WIP in unclean state.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By separating the atomic operation into two operations you break the exclusive access to the critical section below. Please start with creating JCStress tests if you're considering to spend more time on this. These drain methods are quite a critical piece and ensuring proper lock-free coordination between different actors is essential. Another aspect is performance so that we make as little volatile accesses as we can.

@@ -398,6 +400,7 @@ final void drain() {
boolean empty = q == null || q.isEmpty();

if (checkTerminated(d, empty)) {
WIP.addAndGet(this, -missed);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, these returns also leave the WIP in an unclean state.

@@ -383,10 +383,12 @@ public Object scanUnsafe(Attr key) {
}

final void drain() {
if (WIP.getAndIncrement(this) != 0) {
if (WIP.get(this) != 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By separating the atomic operation into two operations you break the exclusive access to the critical section below. Please start with creating JCStress tests if you're considering to spend more time on this. These drain methods are quite a critical piece and ensuring proper lock-free coordination between different actors is essential. Another aspect is performance so that we make as little volatile accesses as we can.

@bajibalu
Copy link
Author

bajibalu commented Apr 27, 2024

@chemicL I addressed your comments and also added a JCStress test. But I am not sure whether the test case is correct.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Auto-cancelled Sink still accepts emissions
2 participants