Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit the number of Scheduler#disposeGracefully threads #3259

Merged
merged 10 commits into from
Nov 4, 2022
1 change: 1 addition & 0 deletions reactor-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ blockHoundTest {
jcstress {
mode = 'quick' //quick, default, tough
jcstressDependency 'org.openjdk.jcstress:jcstress-core:0.15'
heapPerFork = 512
}

// inherit basic test task + common configuration in root
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import org.openjdk.jcstress.infra.results.IIZ_Result;
import org.openjdk.jcstress.infra.results.Z_Result;

public abstract class SchedulersStressTest {
public abstract class BasicSchedulersStressTest {

private static void restart(Scheduler scheduler) {
scheduler.disposeGracefully().block(Duration.ofMillis(100));
scheduler.disposeGracefully().block(Duration.ofMillis(500));
// TODO: in 3.6.x: remove restart capability and this validation
scheduler.start();
}
Expand Down Expand Up @@ -112,36 +112,6 @@ public void arbiter(Z_Result r) {
}
}

@JCStressTest
@Outcome(id = {"true"}, expect = Expect.ACCEPTABLE, desc = "Task scheduled after racing restart")
@State
public static class BoundedElasticSchedulerStartDisposeStressTest {

private final BoundedElasticScheduler scheduler =
new BoundedElasticScheduler(1, 1, Thread::new, 5);
{
scheduler.init();
}

@Actor
public void restart1() {
restart(scheduler);
}

@Actor
public void restart2() {
restart(scheduler);
}

@Arbiter
public void arbiter(Z_Result r) {
// At this stage, at least one actor called scheduler.start(),
// so we should be able to execute a task.
r.r1 = canScheduleTask(scheduler);
scheduler.dispose();
}
}

@JCStressTest
@Outcome(id = {".*, true"}, expect = Expect.ACCEPTABLE,
desc = "Scheduler in consistent state upon concurrent dispose and " +
Expand All @@ -157,13 +127,13 @@ public static class SingleSchedulerDisposeGracefullyStressTest {

@Actor
public void disposeGracefully1(IIZ_Result r) {
scheduler.disposeGracefully().subscribe();
scheduler.disposeGracefully().block();
simonbasle marked this conversation as resolved.
Show resolved Hide resolved
r.r1 = scheduler.state.initialResource.hashCode();
}

@Actor
public void disposeGracefully2(IIZ_Result r) {
scheduler.disposeGracefully().subscribe();
scheduler.disposeGracefully().block();
r.r2 = scheduler.state.initialResource.hashCode();
}

Expand All @@ -174,6 +144,11 @@ public void arbiter(IIZ_Result r) {
// by r.r1 and r.r2, which should be equal.
boolean consistentState = r.r1 == r.r2;
r.r3 = consistentState && scheduler.isDisposed();
if (consistentState) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

//when that condition is true, we erase the r1/r2 state. that should greatly limit
//the output of "interesting acceptable state" in the dump should and error occur
r.r1 = r.r2 = 0;
}
}
}

Expand All @@ -193,49 +168,13 @@ public static class ParallelSchedulerDisposeGracefullyStressTest {

@Actor
public void disposeGracefully1(IIZ_Result r) {
scheduler.disposeGracefully().subscribe();
scheduler.disposeGracefully().block();
r.r1 = scheduler.state.initialResource.hashCode();
}

@Actor
public void disposeGracefully2(IIZ_Result r) {
scheduler.disposeGracefully().subscribe();
r.r2 = scheduler.state.initialResource.hashCode();
}

@Arbiter
public void arbiter(IIZ_Result r) {
// Validate both disposals left the Scheduler in consistent state,
// assuming the await process coordinates on the resources as identified
// by r.r1 and r.r2, which should be equal.
boolean consistentState = r.r1 == r.r2;
r.r3 = consistentState && scheduler.isDisposed();
}
}

@JCStressTest
@Outcome(id = {".*, true"}, expect = Expect.ACCEPTABLE,
desc = "Scheduler in consistent state upon concurrent dispose and " +
"eventually disposed.")
@State
public static class BoundedElasticSchedulerDisposeGracefullyStressTest {

private final BoundedElasticScheduler scheduler =
new BoundedElasticScheduler(4, 4, Thread::new, 5);

{
scheduler.init();
}

@Actor
public void disposeGracefully1(IIZ_Result r) {
scheduler.disposeGracefully().subscribe();
r.r1 = scheduler.state.initialResource.hashCode();
}

@Actor
public void disposeGracefully2(IIZ_Result r) {
scheduler.disposeGracefully().subscribe();
scheduler.disposeGracefully().block();
r.r2 = scheduler.state.initialResource.hashCode();
}

Expand All @@ -246,6 +185,11 @@ public void arbiter(IIZ_Result r) {
// by r.r1 and r.r2, which should be equal.
boolean consistentState = r.r1 == r.r2;
r.r3 = consistentState && scheduler.isDisposed();
if (consistentState) {
//when that condition is true, we erase the r1/r2 state. that should greatly limit
//the output of "interesting acceptable state" in the dump should and error occur
r.r1 = r.r2 = 0;
}
}
}

Expand All @@ -264,7 +208,7 @@ public static class SingleSchedulerDisposeGracefullyAndDisposeStressTest {

@Actor
public void disposeGracefully(IIZ_Result r) {
scheduler.disposeGracefully().subscribe();
scheduler.disposeGracefully().block();
r.r1 = scheduler.state.initialResource.hashCode();
}

Expand All @@ -281,6 +225,11 @@ public void arbiter(IIZ_Result r) {
// by r.r1 and r.r2, which should be equal.
boolean consistentState = r.r1 == r.r2;
r.r3 = consistentState && scheduler.isDisposed();
if (consistentState) {
//when that condition is true, we erase the r1/r2 state. that should greatly limit
//the output of "interesting acceptable state" in the dump should and error occur
r.r1 = r.r2 = 0;
}
}
}

Expand All @@ -300,44 +249,7 @@ public static class ParallelSchedulerDisposeGracefullyAndDisposeStressTest {

@Actor
public void disposeGracefully(IIZ_Result r) {
scheduler.disposeGracefully().subscribe();
r.r1 = scheduler.state.initialResource.hashCode();
}

@Actor
public void dispose(IIZ_Result r) {
scheduler.dispose();
r.r2 = scheduler.state.initialResource.hashCode();
}

@Arbiter
public void arbiter(IIZ_Result r) {
// Validate both disposals left the Scheduler in consistent state,
// assuming the await process coordinates on the resources as identified
// by r.r1 and r.r2, which should be equal.
boolean consistentState = r.r1 == r.r2;
r.r3 = consistentState && scheduler.isDisposed();
}
}

@JCStressTest
@Outcome(id = {".*, true"}, expect = Expect.ACCEPTABLE,
desc = "Scheduler in consistent state upon concurrent dispose and " +
"disposeGracefully, eventually disposed.")
@State
public static class BoundedElasticSchedulerDisposeGracefullyAndDisposeStressTest {


private final BoundedElasticScheduler scheduler =
new BoundedElasticScheduler(4, 4, Thread::new, 5);

{
scheduler.init();
}

@Actor
public void disposeGracefully(IIZ_Result r) {
scheduler.disposeGracefully().subscribe();
scheduler.disposeGracefully().block();
r.r1 = scheduler.state.initialResource.hashCode();
}

Expand All @@ -354,6 +266,11 @@ public void arbiter(IIZ_Result r) {
// by r.r1 and r.r2, which should be equal.
boolean consistentState = r.r1 == r.r2;
r.r3 = consistentState && scheduler.isDisposed();
if (consistentState) {
//when that condition is true, we erase the r1/r2 state. that should greatly limit
//the output of "interesting acceptable state" in the dump should and error occur
r.r1 = r.r2 = 0;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public void start() {

@Override
public boolean await(BoundedServices boundedServices, long timeout, TimeUnit timeUnit)
throws InterruptedException {
throws InterruptedException {
simonbasle marked this conversation as resolved.
Show resolved Hide resolved
if (!boundedServices.evictor.awaitTermination(timeout, timeUnit)) {
return false;
}
Expand Down Expand Up @@ -508,7 +508,7 @@ private BoundedServices() {
this.clock = parent.clock;
this.idleQueue = new ConcurrentLinkedDeque<>();
this.busyStates = ALL_IDLE;
this.evictor = Executors.newScheduledThreadPool(1, EVICTOR_FACTORY);
this.evictor = Executors.newSingleThreadScheduledExecutor(EVICTOR_FACTORY);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public boolean isDisposed() {

@Override
public boolean await(ScheduledExecutorService resource, long timeout, TimeUnit timeUnit)
throws InterruptedException {
throws InterruptedException {
return resource.awaitTermination(timeout, timeUnit);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void init() {
);
}

this.evictor = Executors.newScheduledThreadPool(1, EVICTOR_FACTORY);
this.evictor = Executors.newSingleThreadScheduledExecutor(EVICTOR_FACTORY);
this.evictor.scheduleAtFixedRate(this::eviction,
ttlSeconds,
ttlSeconds,
Expand All @@ -124,7 +124,7 @@ public void start() {
if (!shutdown) {
return;
}
this.evictor = Executors.newScheduledThreadPool(1, EVICTOR_FACTORY);
this.evictor = Executors.newSingleThreadScheduledExecutor(EVICTOR_FACTORY);
this.evictor.scheduleAtFixedRate(this::eviction,
ttlSeconds,
ttlSeconds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,15 @@ public void start() {
}
}

@Override
public boolean await(ScheduledExecutorService[] resource, long timeout, TimeUnit timeUnit) throws InterruptedException {
for (ScheduledExecutorService executor : resource) {
if (!executor.awaitTermination(timeout, timeUnit)) {
return false;
}
}
return true;
}
@Override
public boolean await(ScheduledExecutorService[] resource, long timeout, TimeUnit timeUnit) throws InterruptedException {
for (ScheduledExecutorService executor : resource) {
if (!executor.awaitTermination(timeout, timeUnit)) {
return false;
}
}
return true;
}

@Override
public void dispose() {
Expand Down