Skip to content

Commit

Permalink
Use an Exchanger to rendezvous between the worker and Skyframe threads.
Browse files Browse the repository at this point in the history
  • Loading branch information
lberki committed Apr 26, 2024
1 parent 36216c8 commit 0a86ff7
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@
import com.google.devtools.build.skyframe.SkyFunction.Environment.SkyKeyComputeState;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Exchanger;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -55,14 +54,29 @@ record Success(RepositoryDirectoryValue.Builder result) implements Signal {}
record Failure(Throwable e) implements Signal {}
}

/** The channel for the worker thread to send a signal to the host Skyframe thread. */
final BlockingQueue<Signal> signalQueue = new SynchronousQueue<>();
/** Dummy object for {@link #rendezvous}. */
static final Object TOKEN = new Object();

/**
* The channel for the host Skyframe thread to send fresh {@link SkyFunction.Environment} objects
* back to the worker thread.
/** Used to ensure that the worker and Skyframe threads both are at a known place. */
private final Exchanger rendezvous = new Exchanger();

/** The slot for messages from the worker thread to the Skyframe thread.
*
* <p>The contract is that this can ever be set to non-null from the worker thread, set to null
* from the Skyframe thread and once it is set to a value by the worker thread and it has
* rendezvoused with the Skyframe thread, it the worker thread is not allowed to overwrite it
* before another rendezvous.
*/
Signal signal;

/** The slot for messages from the Skyframe thread to the worker thread, which can only be of
* one kind: an environment.
*
* <p>Can ever be set to non-null by the Skyframe thread, set to null by the worker thread and
* if the main thread sets it to something and has rendezvoused with the worker thread, it needs
* to wait until another rendezvous before it overwrites it.
*/
final BlockingQueue<SkyFunction.Environment> delegateEnvQueue = new SynchronousQueue<>();
SkyFunction.Environment latestEnvironment;

/** The working thread that actually performs the fetching logic. */
// This is volatile since we set it to null to indicate the worker thread isn't running, and this
Expand All @@ -81,15 +95,36 @@ record Failure(Throwable e) implements Signal {}
final Map<RepoRecordedInput, String> recordedInputValues = new TreeMap<>();

SkyFunction.Environment signalForFreshEnv() throws InterruptedException {
signalQueue.put(new Signal.Restart());
return delegateEnvQueue.take();
signal = new Signal.Restart();
// First unblock the Skyframe thread, then wait until it has something else to say. The only
// thing the Skyframe thread can do to this one is an interrupt so there is no need to do
// anything special in between.
rendezvous();
rendezvous();
SkyFunction.Environment result = latestEnvironment;
latestEnvironment = null;
return result;
}

public void join() {
Uninterruptibles.joinUninterruptibly(workerThread);
workerThread = null;
}

void rendezvous() throws InterruptedException {
rendezvous.exchange(TOKEN);
}

void rendezvousUninterruptibly() {
while(true) {
try {
rendezvous();
return;
} catch (InterruptedException e) {
}
}
}

@Override
public void close() {
if (workerThread == null) {
Expand All @@ -104,8 +139,8 @@ public void close() {
// part is that in case an interrupt happens, a Success or Failure is eventually posted by the
// worker thread.
while (true) {
Signal s = Uninterruptibles.takeUninterruptibly(signalQueue);
if (s instanceof Signal.Success || s instanceof Signal.Failure) {
rendezvousUninterruptibly();
if (signal instanceof Signal.Success || signal instanceof Signal.Failure) {
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,13 @@
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.vfs.SyscallCache;
import com.google.devtools.build.skyframe.SkyFunction;
import com.google.devtools.build.skyframe.SkyFunction.Environment;
import com.google.devtools.build.skyframe.SkyFunctionException.Transience;
import com.google.devtools.build.skyframe.SkyKey;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import javax.annotation.Nullable;
import net.starlark.java.eval.EvalException;
import net.starlark.java.eval.Mutability;
Expand Down Expand Up @@ -152,75 +154,88 @@ public RepositoryDirectoryValue.Builder fetch(
.start(
() -> {
try {
Environment workerEnvInThread = state.delegateEnvQueue.take();
state.signalQueue.put(
new Signal.Success(
fetchInternal(
rule,
outputDirectory,
directories,
workerEnvInThread,
state.recordedInputValues,
key)));
state.rendezvous();
SkyFunction.Environment workerEnvInThread = state.latestEnvironment;
state.signal = new Signal.Success(
fetchInternal(
rule,
outputDirectory,
directories,
workerEnvInThread,
state.recordedInputValues,
key));
state.rendezvous();
} catch (Throwable e) {
// No matter what, we must inform the Skyframe thread that we are done. So
// don't let an InterruptedException deter us from doing so.
Uninterruptibles.putUninterruptibly(state.signalQueue, new Signal.Failure(e));
state.signal = new Signal.Failure(e);
state.rendezvousUninterruptibly();
}
});
state.workerThread = workerThread;
}

// At this point, we can be certain that the worker thread is waiting for an environment: either
// due to a restart (then it's in signalForFreshEnv()) or because it was just started (then it's
// in the lambda passed to Thread.start() above)
Signal signal;
try {
state.delegateEnvQueue.put(workerEnv);
signal = state.signalQueue.take();
} catch (InterruptedException e) {
// If the host Skyframe thread is interrupted for any reason, we make sure to shut down any
// worker threads and wait for their termination before propagating the interrupt.
state.close();
throw e;
}
// The worker thread is probably waiting for an environment: either due to a restart (then it's
// in signalForFreshEnv()) or because it was just started (then it's in the lambda passed to
// Thread.start() above). However, it's not impossible that it got interrupted in the meantime,
// in which case it'll be at the final rendezvousUninterruptibly() in the lambda above with
// state.signal already set.
do {
try {
state.latestEnvironment = workerEnv;
// First give the new environment to the worker thread, then see what it has to say.
// But if it gets interrupted in between, handle that appropriately.
state.rendezvous();
if (state.signal != null) {
break;
}

if (signal instanceof Signal.Restart) {
// The worker thread determined that it needs some new values. Restart the SkyFunction and
// eventually send the new environment over to it.
return null;
}
state.rendezvous();
} catch (InterruptedException e) {
// If the host Skyframe thread is interrupted for any reason, we make sure to shut down any
// worker threads and wait for their termination before propagating the interrupt.
state.close();
throw e;
}
} while (false);

// The worker thread finished its job, either successfully or not. Close the state and pass
// whatever its result is to Skyframe.
if (signal instanceof Signal.Success s) {
state.join();
Signal signal = state.signal;
state.signal = null;
return switch (signal) {
case Signal.Restart unused ->
// The worker thread determined that it needs some new values. Restart the SkyFunction and
// eventually send the new environment over to it.
null;

case Signal.Success(RepositoryDirectoryValue.Builder result) -> {
state.join();
if (!env.valuesMissing()) {
recordedInputValues.putAll(state.recordedInputValues);
yield result;
}

if (env.valuesMissing()) {
// This is a special case: RepoFetchingWorkerSkyFunctionEnvironment does not faithfully
// reproduce valuesMissing() in the SkyFunction.Environment it wraps, so it's possible that
// Skyframe thinks a restart is needed but RFWSFE does not. This is arguably a bug in the
// latter, but let's do the simple thing to recover and wipe the state clean.
//
// This case happens when --keep_going is in effect and when getValue() is called to learn the
// value requested by a previous getValuesAndExceptions() that turns out to be in error: in
// this case, SkyFunction.Environment sets valuesMissing to true and that's not reflected in
// RFWSFE.
return null;
// This case happens when --keep_going is in effect and when getValue() is called to learn
// the value requested by a previous getValuesAndExceptions() that turns out to be in error:
// in this case, SkyFunction.Environment sets valuesMissing to true and that's not reflected
// in RFWSFE.
yield null;
}

recordedInputValues.putAll(state.recordedInputValues);
return s.result();
} else if (signal instanceof Signal.Failure f) {
state.join();
Throwable e = f.e();
Throwables.throwIfInstanceOf(e, RepositoryFunctionException.class);
Throwables.throwIfInstanceOf(e, InterruptedException.class);
Throwables.throwIfUnchecked(e);
throw new IllegalStateException("unexpected exception type: " + e.getClass(), e);
} else {
throw new IllegalStateException();
}
case Signal.Failure(Throwable e) -> {
state.join();
Throwables.throwIfInstanceOf(e, RepositoryFunctionException.class);
Throwables.throwIfInstanceOf(e, InterruptedException.class);
Throwables.throwIfUnchecked(e);
throw new IllegalStateException("unexpected exception type: " + e.getClass(), e);
}
};
}

@Nullable
Expand Down

0 comments on commit 0a86ff7

Please sign in to comment.