Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a Semaphore for signaling in repo fetching #22100

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.devtools.build.lib.analysis.BlazeDirectories;
import com.google.devtools.build.lib.analysis.ConfiguredRuleClassProvider;
import com.google.devtools.build.lib.analysis.RuleDefinition;
Expand Down Expand Up @@ -64,6 +63,7 @@
import com.google.devtools.build.lib.bazel.repository.RepositoryOptions.CheckDirectDepsMode;
import com.google.devtools.build.lib.bazel.repository.RepositoryOptions.LockfileMode;
import com.google.devtools.build.lib.bazel.repository.RepositoryOptions.RepositoryOverride;
import com.google.devtools.build.lib.bazel.repository.RepositoryOptions.WorkerForRepoFetching;
import com.google.devtools.build.lib.bazel.repository.cache.RepositoryCache;
import com.google.devtools.build.lib.bazel.repository.downloader.DelegatingDownloader;
import com.google.devtools.build.lib.bazel.repository.downloader.DownloadManager;
Expand Down Expand Up @@ -121,9 +121,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -163,9 +160,6 @@ public class BazelRepositoryModule extends BlazeModule {
private List<String> allowedYankedVersions = ImmutableList.of();
private boolean disableNativeRepoRules;
private SingleExtensionEvalFunction singleExtensionEvalFunction;
private final ExecutorService repoFetchingWorkerThreadPool =
Executors.newFixedThreadPool(
100, new ThreadFactoryBuilder().setNameFormat("repo-fetching-worker-%d").build());

@Nullable private CredentialModule credentialModule;

Expand Down Expand Up @@ -316,37 +310,8 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {

RepositoryOptions repoOptions = env.getOptions().getOptions(RepositoryOptions.class);
if (repoOptions != null) {
switch (repoOptions.workerForRepoFetching) {
case OFF:
starlarkRepositoryFunction.setWorkerExecutorService(null);
break;
case PLATFORM:
starlarkRepositoryFunction.setWorkerExecutorService(repoFetchingWorkerThreadPool);
break;
case VIRTUAL:
case AUTO:
try {
// Since Google hasn't migrated to JDK 21 yet, we can't directly call
// Executors.newVirtualThreadPerTaskExecutor here. But a bit of reflection never hurt
// anyone... right? (OSS Bazel already ships with a bundled JDK 21)
starlarkRepositoryFunction.setWorkerExecutorService(
(ExecutorService)
Executors.class
.getDeclaredMethod("newThreadPerTaskExecutor", ThreadFactory.class)
.invoke(
null, Thread.ofVirtual().name("starlark-repository-", 0).factory()));
} catch (ReflectiveOperationException e) {
if (repoOptions.workerForRepoFetching == RepositoryOptions.WorkerForRepoFetching.AUTO) {
starlarkRepositoryFunction.setWorkerExecutorService(null);
} else {
throw new AbruptExitException(
detailedExitCode(
"couldn't create virtual worker thread executor for repo fetching",
Code.BAD_DOWNLOADER_CONFIG),
e);
}
}
}
starlarkRepositoryFunction.setUseWorkers(
repoOptions.workerForRepoFetching != WorkerForRepoFetching.OFF);
downloadManager.setDisableDownload(repoOptions.disableDownload);
if (repoOptions.repositoryDownloaderRetries >= 0) {
downloadManager.setRetries(repoOptions.repositoryDownloaderRetries);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,8 @@ public Converter() {
effectTags = {OptionEffectTag.UNKNOWN},
help =
"The threading mode to use for repo fetching. If set to 'off', no worker thread is used,"
+ " and the repo fetching is subject to restarts. Otherwise, uses a platform thread"
+ " (i.e. OS thread) if set to 'platform' or a virtual thread if set to 'virtual'. If"
+ " set to 'auto', virtual threads are used if available (i.e. running on JDK 21+),"
+ " otherwise no worker thread is used.")
+ " and the repo fetching is subject to restarts. Otherwise, uses a virtual worker"
+ " thread.")
public WorkerForRepoFetching workerForRepoFetching;

@Option(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,52 +14,53 @@

package com.google.devtools.build.lib.bazel.repository.starlark;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.rules.repository.RepoRecordedInput;
import com.google.devtools.build.lib.rules.repository.RepositoryDirectoryValue;
import com.google.devtools.build.skyframe.SkyFunction;
import com.google.devtools.build.skyframe.SkyFunction.Environment.SkyKeyComputeState;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import javax.annotation.Nullable;

/**
* Captures state that persists across different invocations of {@link
* com.google.devtools.build.lib.rules.repository.RepositoryDelegatorFunction}, specifically {@link
* StarlarkRepositoryFunction}.
*
* <p>This class is used to hold on to a worker thread (in reality just a {@link Future} object)
* when fetching repos using a worker thread is enabled. The worker thread uses a {@link
* SkyFunction.Environment} object acquired from the host thread, and can signal the host thread to
* restart to get a fresh environment object.
* <p>This class is used to hold on to a worker thread when fetching repos using a worker thread is
* enabled. The worker thread uses a {@link SkyFunction.Environment} object acquired from the host
* thread, and can signal the host thread to restart to get a fresh environment object.
*/
class RepoFetchingSkyKeyComputeState implements SkyKeyComputeState {

/** A signal that the worker thread can send to the host Skyframe thread. */
enum Signal {
/**
* Indicates that the host thread should return {@code null}, causing a Skyframe restart. After
* sending this signal, the client will immediately block on {@code delegateEnvQueue}, waiting
* for the host thread to send a fresh {@link SkyFunction.Environment} over.
*/
RESTART,
/**
* Indicates that the worker thread has finished running, either yielding a result or an
* exception.
*/
DONE
}

/** The channel for the worker thread to send a signal to the host Skyframe thread. */
final BlockingQueue<Signal> signalQueue = new SynchronousQueue<>();
/**
* A semaphore with 0 or 1 permit. The worker can release a permit either when it's finished
* (successfully or otherwise), or to indicate that the host thread should return {@code null},
* causing a Skyframe restart. In the latter case, the worker will immediately block on {@code
* delegateEnvQueue}, waiting for the host thread to send a fresh {@link SkyFunction.Environment}
* over.
*/
// A Semaphore is useful here because, crucially, releasing a permit never blocks and thus cannot
// be interrupted.
final Semaphore signalSemaphore = new Semaphore(0);

/**
* The channel for the host Skyframe thread to send fresh {@link SkyFunction.Environment} objects
* back to the worker thread.
*/
final BlockingQueue<SkyFunction.Environment> delegateEnvQueue = new SynchronousQueue<>();
// We use an ArrayBlockingQueue of size 1 instead of a SynchronousQueue, so that if the worker
// gets interrupted before the host thread restarts, the host thread doesn't hang forever.
final BlockingQueue<SkyFunction.Environment> delegateEnvQueue = new ArrayBlockingQueue<>(1);

/**
* This future holds on to the worker thread in order to cancel it when necessary; it also serves
Expand All @@ -69,7 +70,14 @@ enum Signal {
// could happen on multiple threads. Canceling a future multiple times is safe, though, so we
// only need to worry about nullness. Using a mutex/synchronization is an alternative but it means
// we might block in `close()`, which is potentially bad (see its javadoc).
@Nullable volatile Future<RepositoryDirectoryValue.Builder> workerFuture = null;
@Nullable volatile ListenableFuture<RepositoryDirectoryValue.Builder> workerFuture = null;

/** The executor service that manages the worker thread. */
// We hold on to this alongside `workerFuture` because it offers a convenient mechanism to make
// sure the worker thread has shut down (with its blocking `close()` method).
ListeningExecutorService workerExecutorService;

private final String repoName;

/**
* This is where the recorded inputs & values for the whole invocation is collected.
Expand All @@ -79,17 +87,69 @@ enum Signal {
*/
final Map<RepoRecordedInput, String> recordedInputValues = new TreeMap<>();

RepoFetchingSkyKeyComputeState(String repoName) {
this.repoName = repoName;
reset();
}

// This may only be called from the host Skyframe thread, *and* only when no worker thread is
// running.
private void reset() {
workerExecutorService =
MoreExecutors.listeningDecorator(
Executors.newThreadPerTaskExecutor(
Thread.ofVirtual().name("starlark-repository-" + repoName).factory()));
signalSemaphore.drainPermits();
delegateEnvQueue.clear();
recordedInputValues.clear();
}

/**
* Releases a permit on the {@code signalSemaphore} and immediately expect a fresh Environment
* back. This may only be called from the worker thread.
*/
SkyFunction.Environment signalForFreshEnv() throws InterruptedException {
signalQueue.put(Signal.RESTART);
signalSemaphore.release();
return delegateEnvQueue.take();
}

/**
* Starts a worker thread running the given callable. This sets the {@code workerFuture} field,
* and makes sure to release a permit on the {@code signalSemaphore} when the worker finishes,
* successfully or otherwise. Returns the worker future. This may only be called from the host
* Skyframe thread.
*/
ListenableFuture<RepositoryDirectoryValue.Builder> startWorker(
Callable<RepositoryDirectoryValue.Builder> c) {
var workerFuture = workerExecutorService.submit(c);
this.workerFuture = workerFuture;
workerFuture.addListener(signalSemaphore::release, directExecutor());
return workerFuture;
}

// This may be called from any thread, including the host Skyframe thread and the
// high-memory-pressure listener thread.
@Override
public void close() {
var myWorkerFuture = workerFuture;
Wyverald marked this conversation as resolved.
Show resolved Hide resolved
workerFuture = null;
if (myWorkerFuture != null) {
myWorkerFuture.cancel(true);
Wyverald marked this conversation as resolved.
Show resolved Hide resolved
}
workerExecutorService.shutdownNow();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: doesn't shutdownNow() cancel all pending work?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it does. If you're asking why I call shutdownNow() and future.cancel(true): per my reading of the code, shutdownNow() interrupts the worker thread, but does not technically cancel the future. So the future would end with a state of "finished exceptionally" with an InterruptedException. I think it's better to be consistent here and actually mark the future as canceled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL. Out of curiosity, did you experimentally verify this? It makes some weird sort of sense, but AFAIU each task an ExecutorService runs is wrapped in a future, so what would be the point of interrupting a task without cancelling the associated future?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't experimentally verified this. But anyhow, the ThreadPerTaskExecutor does not hold on to the Futures it returns from its submit() method; also it does also have an execute() method that doesn't create a Future at all.

}

/**
* Closes the state object, and blocks until all pending async work is finished. The state object
* will reset to a clean slate after this method finishes. This may only be called from the host
* Skyframe thread.
*/
public void closeAndWaitForTermination() throws InterruptedException {
close();
workerExecutorService.close(); // This blocks
reset();
Wyverald marked this conversation as resolved.
Show resolved Hide resolved
if (Thread.interrupted()) {
throw new InterruptedException();
}
}
}