Skip to content

Commit

Permalink
Hold on to the worker executor service in RepoFetching
Browse files Browse the repository at this point in the history
I managed to reproduce some deadlocks during repo fetching with virtual worker threads. One notable trigger was some _other_ repo failing to fetch, which seems to cause Skyframe to try to interrupt other concurrent repo fetches. This _might_ be the cause for a deadlock where we submit a task to the worker executor service, but the task never starts running before it gets cancelled, which causes us to wait forever for a `DONE` signal that never comes. (The worker task puts a `DONE` signal in the queue in a `finally` block -- but we don't even enter the `try`.)

I then tried various things to fix this; this PR is an attempt that actually seemed to eliminate the deadlock. Instead of waiting for a `DONE` signal to make sure the worker thread has finished, we now hold on to the executor service, which offers a `close()` method that essentially uninterruptibly waits for any scheduled tasks to terminate, whether or not they have started running. (@justinhorvitz had suggested a similar idea before.) To make sure distinct repo fetches don't interfere with each other, we start a separate worker executor service for each repo fetch instead of making everyone share the same worker executor service. (This is recommended for virtual threads; see https://docs.oracle.com/en/java/javase/21/core/virtual-threads.html#GUID-C0FEE349-D998-4C9D-B032-E01D06BE55F2 for example.)

Related: #22003
  • Loading branch information
Wyverald committed Apr 23, 2024
1 parent 8105bc0 commit a3f7e59
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.devtools.build.lib.analysis.BlazeDirectories;
import com.google.devtools.build.lib.analysis.ConfiguredRuleClassProvider;
import com.google.devtools.build.lib.analysis.RuleDefinition;
Expand Down Expand Up @@ -63,6 +62,7 @@
import com.google.devtools.build.lib.bazel.repository.RepositoryOptions.CheckDirectDepsMode;
import com.google.devtools.build.lib.bazel.repository.RepositoryOptions.LockfileMode;
import com.google.devtools.build.lib.bazel.repository.RepositoryOptions.RepositoryOverride;
import com.google.devtools.build.lib.bazel.repository.RepositoryOptions.WorkerForRepoFetching;
import com.google.devtools.build.lib.bazel.repository.cache.RepositoryCache;
import com.google.devtools.build.lib.bazel.repository.downloader.DelegatingDownloader;
import com.google.devtools.build.lib.bazel.repository.downloader.DownloadManager;
Expand All @@ -78,7 +78,6 @@
import com.google.devtools.build.lib.cmdline.RepositoryName;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.pkgcache.PackageOptions;
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.rules.repository.LocalRepositoryFunction;
import com.google.devtools.build.lib.rules.repository.LocalRepositoryRule;
import com.google.devtools.build.lib.rules.repository.NewLocalRepositoryFunction;
Expand Down Expand Up @@ -121,9 +120,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -163,9 +159,6 @@ public class BazelRepositoryModule extends BlazeModule {
private List<String> allowedYankedVersions = ImmutableList.of();
private boolean disableNativeRepoRules;
private SingleExtensionEvalFunction singleExtensionEvalFunction;
private final ExecutorService repoFetchingWorkerThreadPool =
Executors.newFixedThreadPool(
100, new ThreadFactoryBuilder().setNameFormat("repo-fetching-worker-%d").build());

@Nullable private CredentialModule credentialModule;

Expand Down Expand Up @@ -315,39 +308,8 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {

RepositoryOptions repoOptions = env.getOptions().getOptions(RepositoryOptions.class);
if (repoOptions != null) {
switch (repoOptions.workerForRepoFetching) {
case OFF:
starlarkRepositoryFunction.setWorkerExecutorService(null);
break;
case PLATFORM:
starlarkRepositoryFunction.setWorkerExecutorService(repoFetchingWorkerThreadPool);
break;
case VIRTUAL:
case AUTO:
try {
// Since Google hasn't migrated to JDK 21 yet, we can't directly call
// Executors.newVirtualThreadPerTaskExecutor here. But a bit of reflection never hurt
// anyone... right? (OSS Bazel already ships with a bundled JDK 21)
starlarkRepositoryFunction.setWorkerExecutorService(
(ExecutorService)
Executors.class
.getDeclaredMethod("newThreadPerTaskExecutor", ThreadFactory.class)
.invoke(
null,
Profiler.instance()
.profileableVirtualThreadFactory("starlark-repository-")));
} catch (ReflectiveOperationException e) {
if (repoOptions.workerForRepoFetching == RepositoryOptions.WorkerForRepoFetching.AUTO) {
starlarkRepositoryFunction.setWorkerExecutorService(null);
} else {
throw new AbruptExitException(
detailedExitCode(
"couldn't create virtual worker thread executor for repo fetching",
Code.BAD_DOWNLOADER_CONFIG),
e);
}
}
}
starlarkRepositoryFunction.setUseWorkers(
repoOptions.workerForRepoFetching != WorkerForRepoFetching.OFF);
downloadManager.setDisableDownload(repoOptions.disableDownload);
if (repoOptions.repositoryDownloaderRetries >= 0) {
downloadManager.setRetries(repoOptions.repositoryDownloaderRetries);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,8 @@ public Converter() {
effectTags = {OptionEffectTag.UNKNOWN},
help =
"The threading mode to use for repo fetching. If set to 'off', no worker thread is used,"
+ " and the repo fetching is subject to restarts. Otherwise, uses a platform thread"
+ " (i.e. OS thread) if set to 'platform' or a virtual thread if set to 'virtual'. If"
+ " set to 'auto', virtual threads are used if available (i.e. running on JDK 21+),"
+ " otherwise no worker thread is used.")
+ " and the repo fetching is subject to restarts. Otherwise, uses a virtual worker"
+ " thread.")
public WorkerForRepoFetching workerForRepoFetching;

@Option(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@

package com.google.devtools.build.lib.bazel.repository.starlark;

import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.rules.repository.RepoRecordedInput;
import com.google.devtools.build.lib.rules.repository.RepositoryDirectoryValue;
import com.google.devtools.build.skyframe.SkyFunction;
import com.google.devtools.build.skyframe.SkyFunction.Environment.SkyKeyComputeState;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -71,6 +74,13 @@ enum Signal {
// we might block in `close()`, which is potentially bad (see its javadoc).
@Nullable volatile Future<RepositoryDirectoryValue.Builder> workerFuture = null;

/** The executor service that manages the worker thread. */
// We hold on to this alongside `workerFuture` because it's the only reliable way to make sure the
// worker thread has shut down (the `Future` class doesn't have the capability).
final ExecutorService workerExecutorService =
Executors.newThreadPerTaskExecutor(
Profiler.instance().profileableVirtualThreadFactory("starlark-repository-"));

/**
* This is where the recorded inputs & values for the whole invocation is collected.
*
Expand All @@ -92,4 +102,12 @@ public void close() {
myWorkerFuture.cancel(true);
}
}

public void closeAndWaitForTermination() throws InterruptedException {
close();
workerExecutorService.close(); // This blocks
if (Thread.interrupted()) {
throw new InterruptedException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Table;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.devtools.build.lib.analysis.BlazeDirectories;
import com.google.devtools.build.lib.analysis.RuleDefinition;
import com.google.devtools.build.lib.bazel.repository.RepositoryResolvedEvent;
Expand Down Expand Up @@ -59,7 +58,7 @@
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import javax.annotation.Nullable;
import net.starlark.java.eval.EvalException;
import net.starlark.java.eval.Mutability;
Expand All @@ -73,7 +72,7 @@
public final class StarlarkRepositoryFunction extends RepositoryFunction {
private final DownloadManager downloadManager;
private double timeoutScaling = 1.0;
@Nullable private ExecutorService workerExecutorService = null;
private boolean useWorkers;
@Nullable private ProcessWrapper processWrapper = null;
@Nullable private RepositoryRemoteExecutor repositoryRemoteExecutor;
@Nullable private SyscallCache syscallCache;
Expand All @@ -94,15 +93,15 @@ public void setSyscallCache(SyscallCache syscallCache) {
this.syscallCache = checkNotNull(syscallCache);
}

public void setWorkerExecutorService(@Nullable ExecutorService workerExecutorService) {
this.workerExecutorService = workerExecutorService;
public void setUseWorkers(boolean useWorkers) {
this.useWorkers = useWorkers;
}

@Override
protected void setupRepoRootBeforeFetching(Path repoRoot) throws RepositoryFunctionException {
// DON'T delete the repo root here if we're using a worker thread, since when this SkyFunction
// restarts, fetching is still happening inside the worker thread.
if (workerExecutorService == null) {
if (!useWorkers) {
setupRepoRoot(repoRoot);
}
}
Expand All @@ -111,7 +110,7 @@ protected void setupRepoRootBeforeFetching(Path repoRoot) throws RepositoryFunct
public void reportSkyframeRestart(Environment env, RepositoryName repoName) {
// DON'T report a "restarting." event if we're using a worker thread, since the actual fetch
// function run by the worker thread never restarts.
if (workerExecutorService == null) {
if (!useWorkers) {
super.reportSkyframeRestart(env, repoName);
}
}
Expand All @@ -126,8 +125,7 @@ public RepositoryDirectoryValue.Builder fetch(
Map<RepoRecordedInput, String> recordedInputValues,
SkyKey key)
throws RepositoryFunctionException, InterruptedException {
if (workerExecutorService == null
|| env.inErrorBubblingForSkyFunctionsThatCanFullyRecoverFromErrors()) {
if (!useWorkers || env.inErrorBubblingForSkyFunctionsThatCanFullyRecoverFromErrors()) {
// Don't use the worker thread if we're in Skyframe error bubbling. For some reason, using a
// worker thread during error bubbling very frequently causes deadlocks on Linux platforms.
// The deadlock is rather elusive and this is just the immediate thing that seems to help.
Expand All @@ -143,21 +141,27 @@ public RepositoryDirectoryValue.Builder fetch(
// clean slate, and create the worker.
setupRepoRoot(outputDirectory);
Environment workerEnv = new RepoFetchingWorkerSkyFunctionEnvironment(state, env);
workerFuture =
workerExecutorService.submit(
() -> {
try {
return fetchInternal(
rule,
outputDirectory,
directories,
workerEnv,
state.recordedInputValues,
key);
} finally {
state.signalQueue.put(Signal.DONE);
}
});
try {
workerFuture =
state.workerExecutorService.submit(
() -> {
try {
return fetchInternal(
rule,
outputDirectory,
directories,
workerEnv,
state.recordedInputValues,
key);
} finally {
state.signalQueue.put(Signal.DONE);
}
});
} catch (RejectedExecutionException e) {
// This means that the worker executor service is already shut down. Fall back to not using
// worker threads.
return fetchInternal(rule, outputDirectory, directories, env, recordedInputValues, key);
}
state.workerFuture = workerFuture;
} else {
// A worker is already running. This can only mean one thing -- we just had a Skyframe
Expand All @@ -168,23 +172,18 @@ public RepositoryDirectoryValue.Builder fetch(
try {
signal = state.signalQueue.take();
} catch (InterruptedException e) {
// This means that we caught a Ctrl-C. Make sure to close the state object to interrupt the
// worker thread, wait for it to finish, and then propagate the InterruptedException.
state.close();
signal = Uninterruptibles.takeUninterruptibly(state.signalQueue);
// The call to Uninterruptibles.takeUninterruptibly() above may set the thread interrupted
// status if it suppressed an InterruptedException, so we clear it again.
Thread.interrupted();
throw new InterruptedException();
// If the host Skyframe thread is interrupted for any reason, we make sure to shut down any
// worker threads and wait for their termination before propagating the interrupt.
state.closeAndWaitForTermination();
throw e;
}
switch (signal) {
case RESTART:
return null;
case DONE:
return switch (signal) {
case RESTART -> null;
case DONE -> {
try {
RepositoryDirectoryValue.Builder result = workerFuture.get();
recordedInputValues.putAll(state.recordedInputValues);
return result;
yield result;
} catch (ExecutionException e) {
Throwables.throwIfInstanceOf(e.getCause(), RepositoryFunctionException.class);
Throwables.throwIfUnchecked(e.getCause());
Expand All @@ -198,17 +197,16 @@ public RepositoryDirectoryValue.Builder fetch(
RepositoryFetchProgress.ongoing(
RepositoryName.createUnvalidated(rule.getName()),
"fetch interrupted due to memory pressure; restarting."));
return fetch(rule, outputDirectory, directories, env, recordedInputValues, key);
yield fetch(rule, outputDirectory, directories, env, recordedInputValues, key);
} finally {
// At this point, the worker thread has definitely finished. But in some corner cases (see
// b/330892334), a Skyframe restart might still happen; to ensure we're not tricked into
// a deadlock, we clean up the worker thread and so that next time we come into fetch(),
// we actually restart the entire computation.
state.close();
state.closeAndWaitForTermination();
}
}
// TODO(wyv): use a switch expression above instead and remove this.
throw new IllegalStateException();
}
};
}

@Nullable
Expand Down

0 comments on commit a3f7e59

Please sign in to comment.