Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a Thread instead of an ExecutorService for Starlark repository workers #22139

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.devtools.build.lib.analysis.BlazeDirectories;
import com.google.devtools.build.lib.analysis.ConfiguredRuleClassProvider;
import com.google.devtools.build.lib.analysis.RuleDefinition;
Expand Down Expand Up @@ -63,6 +62,7 @@
import com.google.devtools.build.lib.bazel.repository.RepositoryOptions.CheckDirectDepsMode;
import com.google.devtools.build.lib.bazel.repository.RepositoryOptions.LockfileMode;
import com.google.devtools.build.lib.bazel.repository.RepositoryOptions.RepositoryOverride;
import com.google.devtools.build.lib.bazel.repository.RepositoryOptions.WorkerForRepoFetching;
import com.google.devtools.build.lib.bazel.repository.cache.RepositoryCache;
import com.google.devtools.build.lib.bazel.repository.downloader.DelegatingDownloader;
import com.google.devtools.build.lib.bazel.repository.downloader.DownloadManager;
Expand Down Expand Up @@ -120,9 +120,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -162,9 +159,6 @@ public class BazelRepositoryModule extends BlazeModule {
private List<String> allowedYankedVersions = ImmutableList.of();
private boolean disableNativeRepoRules;
private SingleExtensionEvalFunction singleExtensionEvalFunction;
private final ExecutorService repoFetchingWorkerThreadPool =
Executors.newFixedThreadPool(
100, new ThreadFactoryBuilder().setNameFormat("repo-fetching-worker-%d").build());

@Nullable private CredentialModule credentialModule;

Expand Down Expand Up @@ -314,37 +308,8 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {

RepositoryOptions repoOptions = env.getOptions().getOptions(RepositoryOptions.class);
if (repoOptions != null) {
switch (repoOptions.workerForRepoFetching) {
case OFF:
starlarkRepositoryFunction.setWorkerExecutorService(null);
break;
case PLATFORM:
starlarkRepositoryFunction.setWorkerExecutorService(repoFetchingWorkerThreadPool);
break;
case VIRTUAL:
case AUTO:
try {
// Since Google hasn't migrated to JDK 21 yet, we can't directly call
// Executors.newVirtualThreadPerTaskExecutor here. But a bit of reflection never hurt
// anyone... right? (OSS Bazel already ships with a bundled JDK 21)
starlarkRepositoryFunction.setWorkerExecutorService(
(ExecutorService)
Executors.class
.getDeclaredMethod("newThreadPerTaskExecutor", ThreadFactory.class)
.invoke(
null, Thread.ofVirtual().name("starlark-repository-", 0).factory()));
} catch (ReflectiveOperationException e) {
if (repoOptions.workerForRepoFetching == RepositoryOptions.WorkerForRepoFetching.AUTO) {
starlarkRepositoryFunction.setWorkerExecutorService(null);
} else {
throw new AbruptExitException(
detailedExitCode(
"couldn't create virtual worker thread executor for repo fetching",
Code.BAD_DOWNLOADER_CONFIG),
e);
}
}
}
starlarkRepositoryFunction.setUseWorkers(
repoOptions.workerForRepoFetching != WorkerForRepoFetching.OFF);
downloadManager.setDisableDownload(repoOptions.disableDownload);
if (repoOptions.repositoryDownloaderRetries >= 0) {
downloadManager.setRetries(repoOptions.repositoryDownloaderRetries);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,8 @@ public Converter() {
effectTags = {OptionEffectTag.UNKNOWN},
help =
"The threading mode to use for repo fetching. If set to 'off', no worker thread is used,"
+ " and the repo fetching is subject to restarts. Otherwise, uses a platform thread"
+ " (i.e. OS thread) if set to 'platform' or a virtual thread if set to 'virtual'. If"
+ " set to 'auto', virtual threads are used if available (i.e. running on JDK 21+),"
+ " otherwise no worker thread is used.")
+ " and the repo fetching is subject to restarts. Otherwise, uses a virtual worker"
+ " thread.")
public WorkerForRepoFetching workerForRepoFetching;

@Option(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@

package com.google.devtools.build.lib.bazel.repository.starlark;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.devtools.build.lib.rules.repository.RepoRecordedInput;
import com.google.devtools.build.lib.rules.repository.RepositoryDirectoryValue;
import com.google.devtools.build.skyframe.SkyFunction;
import com.google.devtools.build.skyframe.SkyFunction.Environment.SkyKeyComputeState;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Exchanger;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import javax.annotation.Nullable;

/**
Expand All @@ -38,38 +39,31 @@
class RepoFetchingSkyKeyComputeState implements SkyKeyComputeState {

/** A signal that the worker thread can send to the host Skyframe thread. */
enum Signal {
sealed interface Signal {
/**
* Indicates that the host thread should return {@code null}, causing a Skyframe restart. After
* sending this signal, the client will immediately block on {@code delegateEnvQueue}, waiting
* for the host thread to send a fresh {@link SkyFunction.Environment} over.
*/
RESTART,
/**
* Indicates that the worker thread has finished running, either yielding a result or an
* exception.
*/
DONE
}
record Restart() implements Signal {}

/** The channel for the worker thread to send a signal to the host Skyframe thread. */
final BlockingQueue<Signal> signalQueue = new SynchronousQueue<>();
/** Indicates that the worker thread has finished running successfully. */
record Success(RepositoryDirectoryValue.Builder result) implements Signal {}

/**
* The channel for the host Skyframe thread to send fresh {@link SkyFunction.Environment} objects
* back to the worker thread.
*/
final BlockingQueue<SkyFunction.Environment> delegateEnvQueue = new SynchronousQueue<>();
/** Indicates that the worker thread has finished running with a failure. */
record Failure(Throwable e) implements Signal {}
}

/**
* This future holds on to the worker thread in order to cancel it when necessary; it also serves
* to tell whether a worker thread is already running.
*/
/** Used to ensure that the worker and Skyframe threads both are at a known place. */
private final Exchanger rendezvous = new Exchanger();

/** The working thread that actually performs the fetching logic. */
// This is volatile since we set it to null to indicate the worker thread isn't running, and this
// could happen on multiple threads. Canceling a future multiple times is safe, though, so we
// only need to worry about nullness. Using a mutex/synchronization is an alternative but it means
// we might block in `close()`, which is potentially bad (see its javadoc).
@Nullable volatile Future<RepositoryDirectoryValue.Builder> workerFuture = null;
// could happen on multiple threads. Interrupting and joining a thread multiple times is safe,
// though, so we only need to worry about nullness. Using a mutex/synchronization is an
// alternative but it means we might block in `close()`, which is potentially bad (see its
// javadoc).
@Nullable volatile Thread workerThread = null;

/**
* This is where the recorded inputs & values for the whole invocation is collected.
Expand All @@ -80,16 +74,65 @@ enum Signal {
final Map<RepoRecordedInput, String> recordedInputValues = new TreeMap<>();

SkyFunction.Environment signalForFreshEnv() throws InterruptedException {
signalQueue.put(Signal.RESTART);
return delegateEnvQueue.take();
// First unblock the Skyframe thread, then wait until it has something else to say. The only
// thing the Skyframe thread can do to this one is an interrupt so there is no need to do
// anything special in between.
rendezvousFromWorker(new Signal.Restart());
return rendezvousFromWorker(null);
}

SkyFunction.Environment rendezvousFromWorker(Signal signal) throws InterruptedException {
return (SkyFunction.Environment) rendezvous.exchange(signal);
}

Signal rendezvousFromHost(SkyFunction.Environment environment) throws InterruptedException {
return (Signal) rendezvous.exchange(environment);
}

Signal rendezvousUninterruptiblyFromHost(SkyFunction.Environment environment) {
while(true) {
try {
return (Signal) rendezvous.exchange(environment);
} catch (InterruptedException e) {
}
}
}

SkyFunction.Environment rendezvousUninterruptiblyFromWorker(Signal signal) {
while(true) {
try {
return (SkyFunction.Environment) rendezvous.exchange(signal);
} catch (InterruptedException e) {
}
}
}

public void join() {
Uninterruptibles.joinUninterruptibly(workerThread);
workerThread = null;
}

@Override
public void close() {
var myWorkerFuture = workerFuture;
workerFuture = null;
if (myWorkerFuture != null) {
myWorkerFuture.cancel(true);
if (workerThread == null) {
return;
}
workerThread.interrupt();

// Wait until the worker thread actually gets interrupted. Be resilient to cases where despite
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This goes against the comment on line 64 (ie. we're not supposed to block in close()).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize that this is not supposed to block, but in my not-so-humble opinion it's much better to block a bit than to have unaccounted for state floating around for an unspecified amount of time. This is especially bad because the worker thread can have side effects visible outside of Bazel (since it can do I/O, write files, read files, etc., all from user code) and it sounds like a terrible idea to have that going on without the knowledge of the rest of Bazel.

That said, my sticking point is not that we block here, it's that we don't do anything with side effects outside of the worker thread after this method returns so if you have any clever ideas as to how to implement that without blocking, I'm all ears.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is especially bad because the worker thread can have side effects visible outside of Bazel (since it can do I/O, write files, read files, etc., all from user code) and it sounds like a terrible idea to have that going on without the knowledge of the rest of Bazel.

Isn't that why we block in the host thread? The memory pressure listener thread just interrupts the worker thread, and it's the host thread's job to make sure the worker is finished before moving on. It's not necessary that the worker thread is terminated right after close().

// the interrupt above, the worker thread was already trying to post a restart. I'm not sure if
// that can happen; theoretically, it looks like it shouldn't be but I'm not intimately familiar
// with the exact semantics of thread interruption and it's cheap to be resilient. The important
// part is that in case an interrupt happens, a Success or Failure is eventually posted by the
// worker thread.
while (true) {
Signal signal = rendezvousUninterruptiblyFromHost(null);
if (signal instanceof Signal.Success || signal instanceof Signal.Failure) {
break;
}
}

Uninterruptibles.joinUninterruptibly(workerThread);
workerThread = null;
}
}