Skip to content

Commit

Permalink
Limit the number of Scheduler#disposeGracefully threads (#3259)
Browse files Browse the repository at this point in the history
This change introduces a `DisposeAwaiterRunnable` with a small pool of
threads dedicated to polling the termination status after a graceful
Scheduler shutdown.

Previously, one Thread would be created for each Scheduler that is
disposed gracefully. While we don't expect this to be an issue in most
production applications, this can lead to hitting native thread limits
faster. Notably, stress tests around graceful disposal create a lot of
schedulers for that purpose.

This change also ensures that the evictor executorServices of both the
BoundedElasticScheduler and ElasticScheduler are limited to at most 1
thread.

Finally, it attempts to improve the SchedulersStressTest to avoid the
OOMs as much as possible: block on disposeGracefully() calls, increase
the heap of forked JVMs for jcstress, and ultimately stop covering the
BoundedElasticScheduler in the stress test.

Fixes #3258.
  • Loading branch information
simonbasle committed Nov 4, 2022
1 parent 2ecf52b commit 9fe3241
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 152 deletions.
1 change: 1 addition & 0 deletions reactor-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ blockHoundTest {
jcstress {
mode = 'quick' //quick, default, tough
jcstressDependency 'org.openjdk.jcstress:jcstress-core:0.15'
heapPerFork = 512
}

// inherit basic test task + common configuration in root
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.openjdk.jcstress.annotations.Actor;
import org.openjdk.jcstress.annotations.Arbiter;
Expand All @@ -29,10 +30,10 @@
import org.openjdk.jcstress.infra.results.IIZ_Result;
import org.openjdk.jcstress.infra.results.Z_Result;

public abstract class SchedulersStressTest {
public abstract class BasicSchedulersStressTest {

private static void restart(Scheduler scheduler) {
scheduler.disposeGracefully().block(Duration.ofMillis(100));
scheduler.disposeGracefully().block(Duration.ofMillis(500));
// TODO: in 3.6.x: remove restart capability and this validation
scheduler.start();
}
Expand Down Expand Up @@ -112,43 +113,14 @@ public void arbiter(Z_Result r) {
}
}

@JCStressTest
@Outcome(id = {"true"}, expect = Expect.ACCEPTABLE, desc = "Task scheduled after racing restart")
@State
public static class BoundedElasticSchedulerStartDisposeStressTest {

private final BoundedElasticScheduler scheduler =
new BoundedElasticScheduler(1, 1, Thread::new, 5);
{
scheduler.init();
}

@Actor
public void restart1() {
restart(scheduler);
}

@Actor
public void restart2() {
restart(scheduler);
}

@Arbiter
public void arbiter(Z_Result r) {
// At this stage, at least one actor called scheduler.start(),
// so we should be able to execute a task.
r.r1 = canScheduleTask(scheduler);
scheduler.dispose();
}
}

@JCStressTest
@Outcome(id = {".*, true"}, expect = Expect.ACCEPTABLE,
desc = "Scheduler in consistent state upon concurrent dispose and " +
"eventually disposed.")
@State
public static class SingleSchedulerDisposeGracefullyStressTest {

private final CountDownLatch latch = new CountDownLatch(2);
private final SingleScheduler scheduler = new SingleScheduler(Thread::new);

{
Expand All @@ -157,23 +129,34 @@ public static class SingleSchedulerDisposeGracefullyStressTest {

@Actor
public void disposeGracefully1(IIZ_Result r) {
scheduler.disposeGracefully().subscribe();
scheduler.disposeGracefully().doFinally(sig -> latch.countDown()).subscribe();
r.r1 = scheduler.state.initialResource.hashCode();
}

@Actor
public void disposeGracefully2(IIZ_Result r) {
scheduler.disposeGracefully().subscribe();
scheduler.disposeGracefully().doFinally(sig -> latch.countDown()).subscribe();
r.r2 = scheduler.state.initialResource.hashCode();
}

@Arbiter
public void arbiter(IIZ_Result r) {
try {
latch.await(5, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
// Validate both disposals left the Scheduler in consistent state,
// assuming the await process coordinates on the resources as identified
// by r.r1 and r.r2, which should be equal.
boolean consistentState = r.r1 == r.r2;
r.r3 = consistentState && scheduler.isDisposed();
if (consistentState) {
//when that condition is true, we erase the r1/r2 state. that should greatly limit
//the output of "interesting acceptable state" in the dump should and error occur
r.r1 = r.r2 = 0;
}
}
}

Expand All @@ -184,6 +167,7 @@ public void arbiter(IIZ_Result r) {
@State
public static class ParallelSchedulerDisposeGracefullyStressTest {

private final CountDownLatch latch = new CountDownLatch(2);
private final ParallelScheduler scheduler =
new ParallelScheduler(10, Thread::new);

Expand All @@ -193,59 +177,34 @@ public static class ParallelSchedulerDisposeGracefullyStressTest {

@Actor
public void disposeGracefully1(IIZ_Result r) {
scheduler.disposeGracefully().subscribe();
scheduler.disposeGracefully().doFinally(sig -> latch.countDown()).subscribe();
r.r1 = scheduler.state.initialResource.hashCode();
}

@Actor
public void disposeGracefully2(IIZ_Result r) {
scheduler.disposeGracefully().subscribe();
r.r2 = scheduler.state.initialResource.hashCode();
}

@Arbiter
public void arbiter(IIZ_Result r) {
// Validate both disposals left the Scheduler in consistent state,
// assuming the await process coordinates on the resources as identified
// by r.r1 and r.r2, which should be equal.
boolean consistentState = r.r1 == r.r2;
r.r3 = consistentState && scheduler.isDisposed();
}
}

@JCStressTest
@Outcome(id = {".*, true"}, expect = Expect.ACCEPTABLE,
desc = "Scheduler in consistent state upon concurrent dispose and " +
"eventually disposed.")
@State
public static class BoundedElasticSchedulerDisposeGracefullyStressTest {

private final BoundedElasticScheduler scheduler =
new BoundedElasticScheduler(4, 4, Thread::new, 5);

{
scheduler.init();
}

@Actor
public void disposeGracefully1(IIZ_Result r) {
scheduler.disposeGracefully().subscribe();
r.r1 = scheduler.state.initialResource.hashCode();
}

@Actor
public void disposeGracefully2(IIZ_Result r) {
scheduler.disposeGracefully().subscribe();
scheduler.disposeGracefully().doFinally(sig -> latch.countDown()).subscribe();
r.r2 = scheduler.state.initialResource.hashCode();
}

@Arbiter
public void arbiter(IIZ_Result r) {
try {
latch.await(5, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
// Validate both disposals left the Scheduler in consistent state,
// assuming the await process coordinates on the resources as identified
// by r.r1 and r.r2, which should be equal.
boolean consistentState = r.r1 == r.r2;
r.r3 = consistentState && scheduler.isDisposed();
if (consistentState) {
//when that condition is true, we erase the r1/r2 state. that should greatly limit
//the output of "interesting acceptable state" in the dump should and error occur
r.r1 = r.r2 = 0;
}
}
}

Expand All @@ -264,8 +223,15 @@ public static class SingleSchedulerDisposeGracefullyAndDisposeStressTest {

@Actor
public void disposeGracefully(IIZ_Result r) {
scheduler.disposeGracefully().subscribe();
final CountDownLatch latch = new CountDownLatch(1);
scheduler.disposeGracefully().doFinally(sig -> latch.countDown()).subscribe();
r.r1 = scheduler.state.initialResource.hashCode();
try {
latch.await(5, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@Actor
Expand All @@ -281,6 +247,11 @@ public void arbiter(IIZ_Result r) {
// by r.r1 and r.r2, which should be equal.
boolean consistentState = r.r1 == r.r2;
r.r3 = consistentState && scheduler.isDisposed();
if (consistentState) {
//when that condition is true, we erase the r1/r2 state. that should greatly limit
//the output of "interesting acceptable state" in the dump should and error occur
r.r1 = r.r2 = 0;
}
}
}

Expand All @@ -300,45 +271,15 @@ public static class ParallelSchedulerDisposeGracefullyAndDisposeStressTest {

@Actor
public void disposeGracefully(IIZ_Result r) {
scheduler.disposeGracefully().subscribe();
r.r1 = scheduler.state.initialResource.hashCode();
}

@Actor
public void dispose(IIZ_Result r) {
scheduler.dispose();
r.r2 = scheduler.state.initialResource.hashCode();
}

@Arbiter
public void arbiter(IIZ_Result r) {
// Validate both disposals left the Scheduler in consistent state,
// assuming the await process coordinates on the resources as identified
// by r.r1 and r.r2, which should be equal.
boolean consistentState = r.r1 == r.r2;
r.r3 = consistentState && scheduler.isDisposed();
}
}

@JCStressTest
@Outcome(id = {".*, true"}, expect = Expect.ACCEPTABLE,
desc = "Scheduler in consistent state upon concurrent dispose and " +
"disposeGracefully, eventually disposed.")
@State
public static class BoundedElasticSchedulerDisposeGracefullyAndDisposeStressTest {


private final BoundedElasticScheduler scheduler =
new BoundedElasticScheduler(4, 4, Thread::new, 5);

{
scheduler.init();
}

@Actor
public void disposeGracefully(IIZ_Result r) {
scheduler.disposeGracefully().subscribe();
final CountDownLatch latch = new CountDownLatch(1);
scheduler.disposeGracefully().doFinally(sig -> latch.countDown()).subscribe();
r.r1 = scheduler.state.initialResource.hashCode();
try {
latch.await(5, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@Actor
Expand All @@ -354,6 +295,11 @@ public void arbiter(IIZ_Result r) {
// by r.r1 and r.r2, which should be equal.
boolean consistentState = r.r1 == r.r2;
r.r3 = consistentState && scheduler.isDisposed();
if (consistentState) {
//when that condition is true, we erase the r1/r2 state. that should greatly limit
//the output of "interesting acceptable state" in the dump should and error occur
r.r1 = r.r2 = 0;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,7 @@ public void start() {
}

@Override
public boolean await(BoundedServices boundedServices, long timeout, TimeUnit timeUnit)
throws InterruptedException {
public boolean await(BoundedServices boundedServices, long timeout, TimeUnit timeUnit) throws InterruptedException {
if (!boundedServices.evictor.awaitTermination(timeout, timeUnit)) {
return false;
}
Expand Down Expand Up @@ -508,7 +507,7 @@ private BoundedServices() {
this.clock = parent.clock;
this.idleQueue = new ConcurrentLinkedDeque<>();
this.busyStates = ALL_IDLE;
this.evictor = Executors.newScheduledThreadPool(1, EVICTOR_FACTORY);
this.evictor = Executors.newSingleThreadScheduledExecutor(EVICTOR_FACTORY);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public boolean isDisposed() {

@Override
public boolean await(ScheduledExecutorService resource, long timeout, TimeUnit timeUnit)
throws InterruptedException {
throws InterruptedException {
return resource.awaitTermination(timeout, timeUnit);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void init() {
);
}

this.evictor = Executors.newScheduledThreadPool(1, EVICTOR_FACTORY);
this.evictor = Executors.newSingleThreadScheduledExecutor(EVICTOR_FACTORY);
this.evictor.scheduleAtFixedRate(this::eviction,
ttlSeconds,
ttlSeconds,
Expand All @@ -124,7 +124,7 @@ public void start() {
if (!shutdown) {
return;
}
this.evictor = Executors.newScheduledThreadPool(1, EVICTOR_FACTORY);
this.evictor = Executors.newSingleThreadScheduledExecutor(EVICTOR_FACTORY);
this.evictor.scheduleAtFixedRate(this::eviction,
ttlSeconds,
ttlSeconds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,15 @@ public void start() {
}
}

@Override
public boolean await(ScheduledExecutorService[] resource, long timeout, TimeUnit timeUnit) throws InterruptedException {
for (ScheduledExecutorService executor : resource) {
if (!executor.awaitTermination(timeout, timeUnit)) {
return false;
}
}
return true;
}
@Override
public boolean await(ScheduledExecutorService[] resource, long timeout, TimeUnit timeUnit) throws InterruptedException {
for (ScheduledExecutorService executor : resource) {
if (!executor.awaitTermination(timeout, timeUnit)) {
return false;
}
}
return true;
}

@Override
public void dispose() {
Expand Down

0 comments on commit 9fe3241

Please sign in to comment.