Skip to content

Commit

Permalink
Fix a deadlock in Starlark repository workers.
Browse files Browse the repository at this point in the history
  • Loading branch information
lberki committed Apr 26, 2024
1 parent 4a84f30 commit 36216c8
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.google.devtools.build.lib.bazel.repository.starlark;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.devtools.build.lib.rules.repository.RepoRecordedInput;
import com.google.devtools.build.lib.rules.repository.RepositoryDirectoryValue;
Expand Down Expand Up @@ -84,25 +85,32 @@ SkyFunction.Environment signalForFreshEnv() throws InterruptedException {
return delegateEnvQueue.take();
}

public void join() {
Uninterruptibles.joinUninterruptibly(workerThread);
workerThread = null;
}

@Override
public void close() {
var myWorkerThread = workerThread;
if (myWorkerThread != null) {
myWorkerThread.interrupt();
if (workerThread == null) {
return;
}
// DON'T set workerThread to null; someone should always call `closeAndWaitForTermination` to
// make sure the workerThread stops running.
}
workerThread.interrupt();

public void closeAndWaitForTermination() throws InterruptedException {
var myWorkerThread = workerThread;
workerThread = null;
if (myWorkerThread != null) {
myWorkerThread.interrupt();
Uninterruptibles.joinUninterruptibly(myWorkerThread);
}
if (Thread.interrupted()) {
throw new InterruptedException();
// Wait until the worker thread actually gets interrupted. Be resilient to cases where despite
// the interrupt above, the worker thread was already trying to post a restart. I'm not sure if
// that can happen; theoretically, it looks like it shouldn't be but I'm not intimately familiar
// with the exact semantics of thread interruption and it's cheap to be resilient. The important
// part is that in case an interrupt happens, a Success or Failure is eventually posted by the
// worker thread.
while (true) {
Signal s = Uninterruptibles.takeUninterruptibly(signalQueue);
if (s instanceof Signal.Success || s instanceof Signal.Failure) {
break;
}
}

Uninterruptibles.joinUninterruptibly(workerThread);
workerThread = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Table.Cell;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.devtools.build.lib.analysis.BlazeDirectories;
import com.google.devtools.build.lib.analysis.RuleDefinition;
import com.google.devtools.build.lib.bazel.repository.RepositoryResolvedEvent;
Expand Down Expand Up @@ -115,6 +116,10 @@ public void reportSkyframeRestart(Environment env, RepositoryName repoName) {
}
}

private void workerThread() {

}

@Nullable
@Override
public RepositoryDirectoryValue.Builder fetch(
Expand All @@ -135,81 +140,87 @@ public RepositoryDirectoryValue.Builder fetch(
return fetchInternal(rule, outputDirectory, directories, env, recordedInputValues, key);
}
var state = env.getState(RepoFetchingSkyKeyComputeState::new);
Environment workerEnv = new RepoFetchingWorkerSkyFunctionEnvironment(state, env);
var workerThread = state.workerThread;
if (workerThread == null) {
// No worker is running yet, which means we're just starting to fetch this repo. Start with a
// clean slate, and create the worker.
setupRepoRoot(outputDirectory);
Environment workerEnv = new RepoFetchingWorkerSkyFunctionEnvironment(state, env);
workerThread =
Thread.ofVirtual()
.name("starlark-repository-" + rule.getName())
.start(
() -> {
try {
try {
state.signalQueue.put(
new Signal.Success(
fetchInternal(
rule,
outputDirectory,
directories,
workerEnv,
state.recordedInputValues,
key)));
} catch (Throwable e) {
state.signalQueue.put(new Signal.Failure(e));
}
} catch (InterruptedException e) {
// Do nothing. We already tried to put a signal onto the queue, and got
// interrupted again. Guess we'll die!
Environment workerEnvInThread = state.delegateEnvQueue.take();
state.signalQueue.put(
new Signal.Success(
fetchInternal(
rule,
outputDirectory,
directories,
workerEnvInThread,
state.recordedInputValues,
key)));
} catch (Throwable e) {
// No matter what, we must inform the Skyframe thread that we are done. So
// don't let an InterruptedException deter us from doing so.
Uninterruptibles.putUninterruptibly(state.signalQueue, new Signal.Failure(e));
}
});
state.workerThread = workerThread;
} else {
// A worker is already running. This can only mean one thing -- we just had a Skyframe
// restart, and need to send over a fresh Environment.
state.delegateEnvQueue.put(env);
}

// At this point, we can be certain that the worker thread is waiting for an environment: either
// due to a restart (then it's in signalForFreshEnv()) or because it was just started (then it's
// in the lambda passed to Thread.start() above)
Signal signal;
try {
state.delegateEnvQueue.put(workerEnv);
signal = state.signalQueue.take();
} catch (InterruptedException e) {
// If the host Skyframe thread is interrupted for any reason, we make sure to shut down any
// worker threads and wait for their termination before propagating the interrupt.
state.closeAndWaitForTermination();
state.close();
throw e;
}
if (!(signal instanceof Signal.Restart)) {
// If `signal` is not a restart, the worker thread has definitely finished. But in some corner
// cases (see b/330892334), a Skyframe restart might still happen; to ensure we're not tricked
// into a deadlock, we clean up the worker thread and so that next time we come into fetch(),
// we actually restart the entire computation.
state.closeAndWaitForTermination();

if (signal instanceof Signal.Restart) {
// The worker thread determined that it needs some new values. Restart the SkyFunction and
// eventually send the new environment over to it.
return null;
}
return switch (signal) {
case Signal.Restart() -> null;
case Signal.Success(RepositoryDirectoryValue.Builder result) -> {
recordedInputValues.putAll(state.recordedInputValues);
yield result;
}
case Signal.Failure(Throwable e) -> {
Throwables.throwIfInstanceOf(e, RepositoryFunctionException.class);
Throwables.throwIfUnchecked(e);
if (e instanceof InterruptedException) {
// This means that the worker thread was interrupted, but the host Skyframe thread was not.
// This can only happen if the state object was invalidated due to memory pressure, in
// which case we can simply reattempt the fetch.
env.getListener()
.post(
RepositoryFetchProgress.ongoing(
RepositoryName.createUnvalidated(rule.getName()),
"fetch interrupted due to memory pressure; restarting."));
yield fetch(rule, outputDirectory, directories, env, recordedInputValues, key);
}
throw new IllegalStateException("unexpected exception type: " + e.getClass(), e);

// The worker thread finished its job, either successfully or not. Close the state and pass
// whatever its result is to Skyframe.
if (signal instanceof Signal.Success s) {
state.join();

if (env.valuesMissing()) {
// This is a special case: RepoFetchingWorkerSkyFunctionEnvironment does not faithfully
// reproduce valuesMissing() in the SkyFunction.Environment it wraps, so it's possible that
// Skyframe thinks a restart is needed but RFWSFE does not. This is arguably a bug in the
// latter, but let's do the simple thing to recover and wipe the state clean.
//
// This case happens when --keep_going is in effect and when getValue() is called to learn the
// value requested by a previous getValuesAndExceptions() that turns out to be in error: in
// this case, SkyFunction.Environment sets valuesMissing to true and that's not reflected in
// RFWSFE.
return null;
}
};

recordedInputValues.putAll(state.recordedInputValues);
return s.result();
} else if (signal instanceof Signal.Failure f) {
state.join();
Throwable e = f.e();
Throwables.throwIfInstanceOf(e, RepositoryFunctionException.class);
Throwables.throwIfInstanceOf(e, InterruptedException.class);
Throwables.throwIfUnchecked(e);
throw new IllegalStateException("unexpected exception type: " + e.getClass(), e);
} else {
throw new IllegalStateException();
}
}

@Nullable
Expand Down

0 comments on commit 36216c8

Please sign in to comment.