Skip to content

Commit

Permalink
Drain SinkManyEmitterProcessor buffer after cancel
Browse files Browse the repository at this point in the history
  • Loading branch information
bajibalu committed Apr 18, 2024
1 parent cc4955f commit d7e3d7d
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 0 deletions.
Expand Up @@ -384,6 +384,7 @@ public Object scanUnsafe(Attr key) {

final void drain() {
if (WIP.getAndIncrement(this) != 0) {
WIP.decrementAndGet(this);
return;
}

Expand All @@ -398,6 +399,7 @@ final void drain() {
boolean empty = q == null || q.isEmpty();

if (checkTerminated(d, empty)) {
WIP.addAndGet(this, -missed);
return;
}

Expand Down Expand Up @@ -432,6 +434,7 @@ final void drain() {
v = null;
}
if (checkTerminated(d, v == null)) {
WIP.addAndGet(this, -missed);
return;
}
if (sourceMode != Fuseable.SYNC) {
Expand Down Expand Up @@ -459,6 +462,7 @@ final void drain() {
empty = v == null;

if (checkTerminated(d, empty)) {
WIP.addAndGet(this, -missed);
return;
}

Expand Down Expand Up @@ -593,13 +597,15 @@ final void remove(FluxPublish.PubSubInner<T> inner) {
//happens when the removed inner makes the subscribers array EMPTY
if (autoCancel && b == EMPTY && Operators.terminate(S, this)) {
if (WIP.getAndIncrement(this) != 0) {
WIP.decrementAndGet(this);
return;
}
terminate();
Queue<T> q = queue;
if (q != null) {
q.clear();
}
WIP.decrementAndGet(this);
}
return;
}
Expand Down
Expand Up @@ -921,4 +921,22 @@ void emitNextWithNoSubscriberNoCapacityKeepsSinkOpenWithBuffer() {
.expectTimeout(Duration.ofSeconds(1))
.verify();
}

@Test
public void cancelledSinkClearsQueue() {
SinkManyEmitterProcessor<Integer> sinkManyEmitterProcessor = new SinkManyEmitterProcessor<>(true, 1);
// fill the buffer
assertThat(sinkManyEmitterProcessor.tryEmitNext(1)).as("filling buffer").isEqualTo(Sinks.EmitResult.OK);
StepVerifier.create(sinkManyEmitterProcessor)
.expectNext(1)
.expectTimeout(Duration.ofSeconds(1))
.verify();
sinkManyEmitterProcessor.asFlux().subscribe().dispose();

// fill the buffer
assertThat(sinkManyEmitterProcessor.tryEmitNext(1)).as("filling buffer").isEqualTo(Sinks.EmitResult.OK);
StepVerifier.create(sinkManyEmitterProcessor)
.verifyComplete();
assertThat(sinkManyEmitterProcessor.queue.isEmpty()).as("Buffer should be empty").isTrue();
}
}

0 comments on commit d7e3d7d

Please sign in to comment.