Skip to content

Commit

Permalink
Drain SinkManyEmitterProcessor buffer after cancel
Browse files Browse the repository at this point in the history
  • Loading branch information
bajibalu committed Apr 18, 2024
1 parent cc4955f commit 3ad4a0b
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 1 deletion.
Expand Up @@ -383,10 +383,12 @@ public Object scanUnsafe(Attr key) {
}

final void drain() {
if (WIP.getAndIncrement(this) != 0) {
if (WIP.get(this) != 0) {
return;
}

WIP.getAndIncrement(this);

int missed = 1;

for (; ; ) {
Expand All @@ -398,6 +400,7 @@ final void drain() {
boolean empty = q == null || q.isEmpty();

if (checkTerminated(d, empty)) {
WIP.addAndGet(this, -missed);
return;
}

Expand Down Expand Up @@ -432,6 +435,7 @@ final void drain() {
v = null;
}
if (checkTerminated(d, v == null)) {
WIP.addAndGet(this, -missed);
return;
}
if (sourceMode != Fuseable.SYNC) {
Expand Down Expand Up @@ -459,6 +463,7 @@ final void drain() {
empty = v == null;

if (checkTerminated(d, empty)) {
WIP.addAndGet(this, -missed);
return;
}

Expand Down Expand Up @@ -600,6 +605,7 @@ final void remove(FluxPublish.PubSubInner<T> inner) {
if (q != null) {
q.clear();
}
WIP.decrementAndGet(this);
}
return;
}
Expand Down
Expand Up @@ -921,4 +921,22 @@ void emitNextWithNoSubscriberNoCapacityKeepsSinkOpenWithBuffer() {
.expectTimeout(Duration.ofSeconds(1))
.verify();
}

@Test
public void cancelledSinkClearsQueue() {
SinkManyEmitterProcessor<Integer> sinkManyEmitterProcessor = new SinkManyEmitterProcessor<>(true, 1);
// fill the buffer
assertThat(sinkManyEmitterProcessor.tryEmitNext(1)).as("filling buffer").isEqualTo(Sinks.EmitResult.OK);
StepVerifier.create(sinkManyEmitterProcessor)
.expectNext(1)
.expectTimeout(Duration.ofSeconds(1))
.verify();
sinkManyEmitterProcessor.asFlux().subscribe().dispose();

// fill the buffer
assertThat(sinkManyEmitterProcessor.tryEmitNext(1)).as("filling buffer").isEqualTo(Sinks.EmitResult.OK);
StepVerifier.create(sinkManyEmitterProcessor)
.verifyComplete();
assertThat(sinkManyEmitterProcessor.queue.isEmpty()).as("Buffer should be empty").isTrue();
}
}

0 comments on commit 3ad4a0b

Please sign in to comment.