Skip to content

Commit

Permalink
Use a Semaphore for signaling in repo fetching
Browse files Browse the repository at this point in the history
I managed to reproduce some deadlocks during repo fetching with virtual worker threads. One notable trigger was some _other_ repo failing to fetch, which seems to cause Skyframe to try to interrupt other concurrent repo fetches. This _might_ be the cause for a deadlock where we submit a task to the worker executor service, but the task never starts running before it gets cancelled, which causes us to wait forever for a `DONE` signal that never comes. (The worker task puts a `DONE` signal in the queue in a `finally` block -- but we don't even enter the `try`.)

This PR improves the situation in various ways:

1. Instead of using a `SynchronousQueue` for the signal queue, we now use a Semaphore for signaling. Semaphores have the crucial property that releasing a permit (ie. incrementing the counter) does not block, and thus cannot be interrupted. This means that the worker thread can now reliably send signals the host thread, even when it's interrupted.

2. Instead of using two signals for `DONE` and `RESTART`, we just use the one semaphore for both signals, and rely on `workerFuture.isDone()` to tell whether the worker has finished or is waiting for a fresh Environment.

3. Instead of signaling `DONE` in a `finally` block, we now use a `ListenableFuture` and signal to the semaphore in the worker future's listener. This makes sure that the signaling is performed _after_ the worker future's status changes, and safeguards against the case where the submitted task never starts running before it gets cancelled.

4. Instead of waiting for a `DONE` signal (or, in the new setup, the signal semaphore) to make sure the worker thread has finished, we now hold on to the executor service, which offers a `close()` method that essentially uninterruptibly waits for any scheduled tasks to terminate, whether or not they have started running. (@justinhorvitz had suggested a similar idea before.) To make sure distinct repo fetches don't interfere with each other, we start a separate worker executor service for each repo fetch instead of making everyone share the same worker executor service. (This is recommended for virtual threads; see https://docs.oracle.com/en/java/javase/21/core/virtual-threads.html#GUID-C0FEE349-D998-4C9D-B032-E01D06BE55F2 for example.)

And because I now create a separate worker executor service for each repo fetch, it doesn't really make sense to use this for platform threads anymore. So setting `--experimental_worker_for_repo_fetching` to any but `off` will cause us to use virtual threads.

Related: bazelbuild#22003

Fixes bazelbuild#21712.

Closes bazelbuild#22100.

PiperOrigin-RevId: 630534733
Change-Id: If989bf9cae76abb1579a2b1de896df8e5a63b88d
  • Loading branch information
Wyverald authored and Kila2 committed May 13, 2024
1 parent 3a49c90 commit 7d08d55
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.devtools.build.lib.analysis.BlazeDirectories;
import com.google.devtools.build.lib.analysis.ConfiguredRuleClassProvider;
import com.google.devtools.build.lib.analysis.RuleDefinition;
Expand Down Expand Up @@ -64,6 +63,7 @@
import com.google.devtools.build.lib.bazel.repository.RepositoryOptions.CheckDirectDepsMode;
import com.google.devtools.build.lib.bazel.repository.RepositoryOptions.LockfileMode;
import com.google.devtools.build.lib.bazel.repository.RepositoryOptions.RepositoryOverride;
import com.google.devtools.build.lib.bazel.repository.RepositoryOptions.WorkerForRepoFetching;
import com.google.devtools.build.lib.bazel.repository.cache.RepositoryCache;
import com.google.devtools.build.lib.bazel.repository.downloader.DelegatingDownloader;
import com.google.devtools.build.lib.bazel.repository.downloader.DownloadManager;
Expand Down Expand Up @@ -121,9 +121,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -163,9 +160,6 @@ public class BazelRepositoryModule extends BlazeModule {
private List<String> allowedYankedVersions = ImmutableList.of();
private boolean disableNativeRepoRules;
private SingleExtensionEvalFunction singleExtensionEvalFunction;
private final ExecutorService repoFetchingWorkerThreadPool =
Executors.newFixedThreadPool(
100, new ThreadFactoryBuilder().setNameFormat("repo-fetching-worker-%d").build());

@Nullable private CredentialModule credentialModule;

Expand Down Expand Up @@ -316,37 +310,8 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {

RepositoryOptions repoOptions = env.getOptions().getOptions(RepositoryOptions.class);
if (repoOptions != null) {
switch (repoOptions.workerForRepoFetching) {
case OFF:
starlarkRepositoryFunction.setWorkerExecutorService(null);
break;
case PLATFORM:
starlarkRepositoryFunction.setWorkerExecutorService(repoFetchingWorkerThreadPool);
break;
case VIRTUAL:
case AUTO:
try {
// Since Google hasn't migrated to JDK 21 yet, we can't directly call
// Executors.newVirtualThreadPerTaskExecutor here. But a bit of reflection never hurt
// anyone... right? (OSS Bazel already ships with a bundled JDK 21)
starlarkRepositoryFunction.setWorkerExecutorService(
(ExecutorService)
Executors.class
.getDeclaredMethod("newThreadPerTaskExecutor", ThreadFactory.class)
.invoke(
null, Thread.ofVirtual().name("starlark-repository-", 0).factory()));
} catch (ReflectiveOperationException e) {
if (repoOptions.workerForRepoFetching == RepositoryOptions.WorkerForRepoFetching.AUTO) {
starlarkRepositoryFunction.setWorkerExecutorService(null);
} else {
throw new AbruptExitException(
detailedExitCode(
"couldn't create virtual worker thread executor for repo fetching",
Code.BAD_DOWNLOADER_CONFIG),
e);
}
}
}
starlarkRepositoryFunction.setUseWorkers(
repoOptions.workerForRepoFetching != WorkerForRepoFetching.OFF);
downloadManager.setDisableDownload(repoOptions.disableDownload);
if (repoOptions.repositoryDownloaderRetries >= 0) {
downloadManager.setRetries(repoOptions.repositoryDownloaderRetries);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,8 @@ public Converter() {
effectTags = {OptionEffectTag.UNKNOWN},
help =
"The threading mode to use for repo fetching. If set to 'off', no worker thread is used,"
+ " and the repo fetching is subject to restarts. Otherwise, uses a platform thread"
+ " (i.e. OS thread) if set to 'platform' or a virtual thread if set to 'virtual'. If"
+ " set to 'auto', virtual threads are used if available (i.e. running on JDK 21+),"
+ " otherwise no worker thread is used.")
+ " and the repo fetching is subject to restarts. Otherwise, uses a virtual worker"
+ " thread.")
public WorkerForRepoFetching workerForRepoFetching;

@Option(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,52 +14,53 @@

package com.google.devtools.build.lib.bazel.repository.starlark;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.rules.repository.RepoRecordedInput;
import com.google.devtools.build.lib.rules.repository.RepositoryDirectoryValue;
import com.google.devtools.build.skyframe.SkyFunction;
import com.google.devtools.build.skyframe.SkyFunction.Environment.SkyKeyComputeState;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import javax.annotation.Nullable;

/**
* Captures state that persists across different invocations of {@link
* com.google.devtools.build.lib.rules.repository.RepositoryDelegatorFunction}, specifically {@link
* StarlarkRepositoryFunction}.
*
* <p>This class is used to hold on to a worker thread (in reality just a {@link Future} object)
* when fetching repos using a worker thread is enabled. The worker thread uses a {@link
* SkyFunction.Environment} object acquired from the host thread, and can signal the host thread to
* restart to get a fresh environment object.
* <p>This class is used to hold on to a worker thread when fetching repos using a worker thread is
* enabled. The worker thread uses a {@link SkyFunction.Environment} object acquired from the host
* thread, and can signal the host thread to restart to get a fresh environment object.
*/
class RepoFetchingSkyKeyComputeState implements SkyKeyComputeState {

/** A signal that the worker thread can send to the host Skyframe thread. */
enum Signal {
/**
* Indicates that the host thread should return {@code null}, causing a Skyframe restart. After
* sending this signal, the client will immediately block on {@code delegateEnvQueue}, waiting
* for the host thread to send a fresh {@link SkyFunction.Environment} over.
*/
RESTART,
/**
* Indicates that the worker thread has finished running, either yielding a result or an
* exception.
*/
DONE
}

/** The channel for the worker thread to send a signal to the host Skyframe thread. */
final BlockingQueue<Signal> signalQueue = new SynchronousQueue<>();
/**
* A semaphore with 0 or 1 permit. The worker can release a permit either when it's finished
* (successfully or otherwise), or to indicate that the host thread should return {@code null},
* causing a Skyframe restart. In the latter case, the worker will immediately block on {@code
* delegateEnvQueue}, waiting for the host thread to send a fresh {@link SkyFunction.Environment}
* over.
*/
// A Semaphore is useful here because, crucially, releasing a permit never blocks and thus cannot
// be interrupted.
final Semaphore signalSemaphore = new Semaphore(0);

/**
* The channel for the host Skyframe thread to send fresh {@link SkyFunction.Environment} objects
* back to the worker thread.
*/
final BlockingQueue<SkyFunction.Environment> delegateEnvQueue = new SynchronousQueue<>();
// We use an ArrayBlockingQueue of size 1 instead of a SynchronousQueue, so that if the worker
// gets interrupted before the host thread restarts, the host thread doesn't hang forever.
final BlockingQueue<SkyFunction.Environment> delegateEnvQueue = new ArrayBlockingQueue<>(1);

/**
* This future holds on to the worker thread in order to cancel it when necessary; it also serves
Expand All @@ -69,7 +70,14 @@ enum Signal {
// could happen on multiple threads. Canceling a future multiple times is safe, though, so we
// only need to worry about nullness. Using a mutex/synchronization is an alternative but it means
// we might block in `close()`, which is potentially bad (see its javadoc).
@Nullable volatile Future<RepositoryDirectoryValue.Builder> workerFuture = null;
@Nullable volatile ListenableFuture<RepositoryDirectoryValue.Builder> workerFuture = null;

/** The executor service that manages the worker thread. */
// We hold on to this alongside `workerFuture` because it offers a convenient mechanism to make
// sure the worker thread has shut down (with its blocking `close()` method).
ListeningExecutorService workerExecutorService;

private final String repoName;

/**
* This is where the recorded inputs & values for the whole invocation is collected.
Expand All @@ -79,17 +87,71 @@ enum Signal {
*/
final Map<RepoRecordedInput, String> recordedInputValues = new TreeMap<>();

RepoFetchingSkyKeyComputeState(String repoName) {
this.repoName = repoName;
reset();
}

// This may only be called from the host Skyframe thread, *and* only when no worker thread is
// running.
private void reset() {
workerExecutorService =
MoreExecutors.listeningDecorator(
Executors.newThreadPerTaskExecutor(
Thread.ofVirtual().name("starlark-repository-" + repoName).factory()));
signalSemaphore.drainPermits();
delegateEnvQueue.clear();
recordedInputValues.clear();
}

/**
* Releases a permit on the {@code signalSemaphore} and immediately expect a fresh Environment
* back. This may only be called from the worker thread.
*/
SkyFunction.Environment signalForFreshEnv() throws InterruptedException {
signalQueue.put(Signal.RESTART);
signalSemaphore.release();
return delegateEnvQueue.take();
}

/**
* Starts a worker thread running the given callable. This sets the {@code workerFuture} field,
* and makes sure to release a permit on the {@code signalSemaphore} when the worker finishes,
* successfully or otherwise. Returns the worker future. This may only be called from the host
* Skyframe thread.
*/
ListenableFuture<RepositoryDirectoryValue.Builder> startWorker(
Callable<RepositoryDirectoryValue.Builder> c) {
var workerFuture = workerExecutorService.submit(c);
this.workerFuture = workerFuture;
workerFuture.addListener(signalSemaphore::release, directExecutor());
return workerFuture;
}

// This may be called from any thread, including the host Skyframe thread and the
// high-memory-pressure listener thread.
@Override
public void close() {
var myWorkerFuture = workerFuture;
workerFuture = null;
if (myWorkerFuture != null) {
myWorkerFuture.cancel(true);
}
workerExecutorService.shutdownNow();
}

/**
* Closes the state object, and blocks until all pending async work is finished. The state object
* will reset to a clean slate after this method finishes. This may only be called from the host
* Skyframe thread.
*/
public void closeAndWaitForTermination() throws InterruptedException {
close();
workerExecutorService.close(); // This blocks
// We reset the state object back to its very initial state, since the host SkyFunction may be
// re-entered (for example b/330892334 and https://github.com/bazelbuild/bazel/issues/21238).
reset();
if (Thread.interrupted()) {
throw new InterruptedException();
}
}
}

0 comments on commit 7d08d55

Please sign in to comment.