Skip to content

Commit

Permalink
Avoid reentrancy in ExecutionSequencer.
Browse files Browse the repository at this point in the history
RELNOTES=n/a

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=317189958
  • Loading branch information
clm authored and nick-someone committed Jun 22, 2020
1 parent ac0c342 commit 6cc0bb9
Show file tree
Hide file tree
Showing 5 changed files with 826 additions and 44 deletions.
Expand Up @@ -15,11 +15,16 @@
package com.google.common.util.concurrent;

import static com.google.common.truth.Truth.assertThat;
import static com.google.common.util.concurrent.Futures.allAsList;
import static com.google.common.util.concurrent.Futures.getDone;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.util.concurrent.TimeUnit.SECONDS;

import com.google.common.annotations.GwtIncompatible;
import com.google.common.testing.GcFinalization;
import com.google.common.testing.TestLogHandler;
import com.google.j2objc.annotations.J2ObjCIncompatible;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -146,6 +151,36 @@ public Boolean call() {
assertThat(getDone(future2)).isFalse();
}

@GwtIncompatible
@J2ObjCIncompatible // gc
@AndroidIncompatible
public void testCancellationWithReferencedObject() throws Exception {
Object toBeGCed = new Object();
WeakReference<Object> ref = new WeakReference<>(toBeGCed);
final SettableFuture<Void> settableFuture = SettableFuture.create();
ListenableFuture<?> ignored =
serializer.submitAsync(
new AsyncCallable<Void>() {
@Override
public ListenableFuture<Void> call() {
return settableFuture;
}
},
directExecutor());
serializer.submit(toStringCallable(toBeGCed), directExecutor()).cancel(true);
toBeGCed = null;
GcFinalization.awaitClear(ref);
}

private static Callable<String> toStringCallable(final Object object) {
return new Callable<String>() {
@Override
public String call() {
return object.toString();
}
};
}

public void testCancellationDuringReentrancy() throws Exception {
TestLogHandler logHandler = new TestLogHandler();
Logger.getLogger(AbstractFuture.class.getName()).addHandler(logHandler);
Expand Down Expand Up @@ -191,6 +226,171 @@ public Void call() {
assertThat(logHandler.getStoredLogRecords()).isEmpty();
}

public void testAvoidsStackOverflow_manySubmitted() throws Exception {
final SettableFuture<Void> settableFuture = SettableFuture.create();
ArrayList<ListenableFuture<Void>> results = new ArrayList<>(50_001);
results.add(
serializer.submitAsync(
new AsyncCallable<Void>() {
@Override
public ListenableFuture<Void> call() {
return settableFuture;
}
},
directExecutor()));
for (int i = 0; i < 50_000; i++) {
results.add(serializer.submit(Callables.<Void>returning(null), directExecutor()));
}
settableFuture.set(null);
getDone(allAsList(results));
}

public void testAvoidsStackOverflow_manyCancelled() throws Exception {
final SettableFuture<Void> settableFuture = SettableFuture.create();
ListenableFuture<Void> unused =
serializer.submitAsync(
new AsyncCallable<Void>() {
@Override
public ListenableFuture<Void> call() {
return settableFuture;
}
},
directExecutor());
for (int i = 0; i < 50_000; i++) {
serializer.submit(Callables.<Void>returning(null), directExecutor()).cancel(true);
}
ListenableFuture<Integer> stackDepthCheck =
serializer.submit(
new Callable<Integer>() {
@Override
public Integer call() {
return Thread.currentThread().getStackTrace().length;
}
},
directExecutor());
settableFuture.set(null);
assertThat(getDone(stackDepthCheck))
.isLessThan(Thread.currentThread().getStackTrace().length + 100);
}

public void testAvoidsStackOverflow_alternatingCancelledAndSubmitted() throws Exception {
final SettableFuture<Void> settableFuture = SettableFuture.create();
ListenableFuture<Void> unused =
serializer.submitAsync(
new AsyncCallable<Void>() {
@Override
public ListenableFuture<Void> call() {
return settableFuture;
}
},
directExecutor());
for (int i = 0; i < 25_000; i++) {
serializer.submit(Callables.<Void>returning(null), directExecutor()).cancel(true);
unused = serializer.submit(Callables.<Void>returning(null), directExecutor());
}
ListenableFuture<Integer> stackDepthCheck =
serializer.submit(
new Callable<Integer>() {
@Override
public Integer call() {
return Thread.currentThread().getStackTrace().length;
}
},
directExecutor());
settableFuture.set(null);
assertThat(getDone(stackDepthCheck))
.isLessThan(Thread.currentThread().getStackTrace().length + 100);
}

private static final class LongHolder {
long count;
}

private static final int ITERATION_COUNT = 50_000;
private static final int DIRECT_EXECUTIONS_PER_THREAD = 100;

@GwtIncompatible // threads

public void testAvoidsStackOverflow_multipleThreads() throws Exception {
final LongHolder holder = new LongHolder();
final ArrayList<ListenableFuture<Integer>> lengthChecks = new ArrayList<>();
final List<Integer> completeLengthChecks;
final int baseStackDepth;
ExecutorService service = Executors.newFixedThreadPool(5);
try {
// Avoid counting frames from the executor itself, or the ExecutionSequencer
baseStackDepth =
serializer
.submit(
new Callable<Integer>() {
@Override
public Integer call() {
return Thread.currentThread().getStackTrace().length;
}
},
service)
.get();
final SettableFuture<Void> settableFuture = SettableFuture.create();
ListenableFuture<?> unused =
serializer.submitAsync(
new AsyncCallable<Void>() {
@Override
public ListenableFuture<Void> call() {
return settableFuture;
}
},
directExecutor());
for (int i = 0; i < 50_000; i++) {
if (i % DIRECT_EXECUTIONS_PER_THREAD == 0) {
// after some number of iterations, switch threads
unused =
serializer.submit(
new Callable<Void>() {
@Override
public Void call() {
holder.count++;
return null;
}
},
service);
} else if (i % DIRECT_EXECUTIONS_PER_THREAD == DIRECT_EXECUTIONS_PER_THREAD - 1) {
// When at max depth, record stack trace depth
lengthChecks.add(
serializer.submit(
new Callable<Integer>() {
@Override
public Integer call() {
holder.count++;
return Thread.currentThread().getStackTrace().length;
}
},
directExecutor()));
} else {
// Otherwise, schedule a task on directExecutor
unused =
serializer.submit(
new Callable<Void>() {
@Override
public Void call() {
holder.count++;
return null;
}
},
directExecutor());
}
}
settableFuture.set(null);
completeLengthChecks = allAsList(lengthChecks).get();
} finally {
service.shutdown();
}
assertThat(holder.count).isEqualTo(ITERATION_COUNT);
for (int length : completeLengthChecks) {
// Verify that at max depth, less than one stack frame per submitted task was consumed
assertThat(length - baseStackDepth).isLessThan(DIRECT_EXECUTIONS_PER_THREAD / 2);
}
}

public void testToString() {
Future<?> first = serializer.submitAsync(firstCallable, directExecutor());
TestCallable secondCallable = new TestCallable(SettableFuture.<Void>create());
Expand Down

0 comments on commit 6cc0bb9

Please sign in to comment.