Skip to content

Commit

Permalink
Use an Exchanger to rendezvous between the worker and Skyframe threads.
Browse files Browse the repository at this point in the history
  • Loading branch information
lberki committed Apr 26, 2024
1 parent 36216c8 commit 72707a9
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@
import com.google.devtools.build.skyframe.SkyFunction.Environment.SkyKeyComputeState;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Exchanger;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -55,14 +54,8 @@ record Success(RepositoryDirectoryValue.Builder result) implements Signal {}
record Failure(Throwable e) implements Signal {}
}

/** The channel for the worker thread to send a signal to the host Skyframe thread. */
final BlockingQueue<Signal> signalQueue = new SynchronousQueue<>();

/**
* The channel for the host Skyframe thread to send fresh {@link SkyFunction.Environment} objects
* back to the worker thread.
*/
final BlockingQueue<SkyFunction.Environment> delegateEnvQueue = new SynchronousQueue<>();
/** Used to ensure that the worker and Skyframe threads both are at a known place. */
private final Exchanger rendezvous = new Exchanger();

/** The working thread that actually performs the fetching logic. */
// This is volatile since we set it to null to indicate the worker thread isn't running, and this
Expand All @@ -81,8 +74,37 @@ record Failure(Throwable e) implements Signal {}
final Map<RepoRecordedInput, String> recordedInputValues = new TreeMap<>();

SkyFunction.Environment signalForFreshEnv() throws InterruptedException {
signalQueue.put(new Signal.Restart());
return delegateEnvQueue.take();
// First unblock the Skyframe thread, then wait until it has something else to say. The only
// thing the Skyframe thread can do to this one is an interrupt so there is no need to do
// anything special in between.
rendezvousFromWorker(new Signal.Restart());
return rendezvousFromWorker(null);
}

SkyFunction.Environment rendezvousFromWorker(Signal signal) throws InterruptedException {
return (SkyFunction.Environment) rendezvous.exchange(signal);
}

Signal rendezvousFromHost(SkyFunction.Environment environment) throws InterruptedException {
return (Signal) rendezvous.exchange(environment);
}

Signal rendezvousUninterruptiblyFromHost(SkyFunction.Environment environment) {
while(true) {
try {
return (Signal) rendezvous.exchange(environment);
} catch (InterruptedException e) {
}
}
}

SkyFunction.Environment rendezvousUninterruptiblyFromWorker(Signal signal) {
while(true) {
try {
return (SkyFunction.Environment) rendezvous.exchange(signal);
} catch (InterruptedException e) {
}
}
}

public void join() {
Expand All @@ -104,8 +126,8 @@ public void close() {
// part is that in case an interrupt happens, a Success or Failure is eventually posted by the
// worker thread.
while (true) {
Signal s = Uninterruptibles.takeUninterruptibly(signalQueue);
if (s instanceof Signal.Success || s instanceof Signal.Failure) {
Signal signal = rendezvousUninterruptiblyFromHost(null);
if (signal instanceof Signal.Success || signal instanceof Signal.Failure) {
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,13 @@
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.vfs.SyscallCache;
import com.google.devtools.build.skyframe.SkyFunction;
import com.google.devtools.build.skyframe.SkyFunction.Environment;
import com.google.devtools.build.skyframe.SkyFunctionException.Transience;
import com.google.devtools.build.skyframe.SkyKey;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import javax.annotation.Nullable;
import net.starlark.java.eval.EvalException;
import net.starlark.java.eval.Mutability;
Expand Down Expand Up @@ -152,75 +154,84 @@ public RepositoryDirectoryValue.Builder fetch(
.start(
() -> {
try {
Environment workerEnvInThread = state.delegateEnvQueue.take();
state.signalQueue.put(
new Signal.Success(
fetchInternal(
rule,
outputDirectory,
directories,
workerEnvInThread,
state.recordedInputValues,
key)));
SkyFunction.Environment workerEnvInThread = state.rendezvousFromWorker(null);
Signal result = new Signal.Success(
fetchInternal(
rule,
outputDirectory,
directories,
workerEnvInThread,
state.recordedInputValues,
key));
state.rendezvousFromWorker(result);
} catch (Throwable e) {
// No matter what, we must inform the Skyframe thread that we are done. So
// don't let an InterruptedException deter us from doing so.
Uninterruptibles.putUninterruptibly(state.signalQueue, new Signal.Failure(e));
state.rendezvousUninterruptiblyFromWorker(new Signal.Failure(e));
}
});
state.workerThread = workerThread;
}

// At this point, we can be certain that the worker thread is waiting for an environment: either
// due to a restart (then it's in signalForFreshEnv()) or because it was just started (then it's
// in the lambda passed to Thread.start() above)
// The worker thread is probably waiting for an environment: either due to a restart (then it's
// in signalForFreshEnv()) or because it was just started (then it's in the lambda passed to
// Thread.start() above). However, it's not impossible that it got interrupted in the meantime,
// in which case it'll be at the final rendezvousUninterruptibly() in the lambda above with
// state.signal already set.
Signal signal;
try {
state.delegateEnvQueue.put(workerEnv);
signal = state.signalQueue.take();
} catch (InterruptedException e) {
// If the host Skyframe thread is interrupted for any reason, we make sure to shut down any
// worker threads and wait for their termination before propagating the interrupt.
state.close();
throw e;
}
do {
try {
// First give the new environment to the worker thread, then see what it has to say.
// But if it gets interrupted in between, handle that appropriately.
signal = state.rendezvousFromHost(workerEnv);
if (signal != null) {
break;
}

if (signal instanceof Signal.Restart) {
// The worker thread determined that it needs some new values. Restart the SkyFunction and
// eventually send the new environment over to it.
return null;
}
signal = state.rendezvousFromHost(null);
} catch (InterruptedException e) {
// If the host Skyframe thread is interrupted for any reason, we make sure to shut down any
// worker threads and wait for their termination before propagating the interrupt.
state.close();
throw e;
}
} while (false);

// The worker thread finished its job, either successfully or not. Close the state and pass
// whatever its result is to Skyframe.
if (signal instanceof Signal.Success s) {
state.join();
return switch (signal) {
case Signal.Restart unused ->
// The worker thread determined that it needs some new values. Restart the SkyFunction and
// eventually send the new environment over to it.
null;

case Signal.Success(RepositoryDirectoryValue.Builder result) -> {
state.join();
if (!env.valuesMissing()) {
recordedInputValues.putAll(state.recordedInputValues);
yield result;
}

if (env.valuesMissing()) {
// This is a special case: RepoFetchingWorkerSkyFunctionEnvironment does not faithfully
// reproduce valuesMissing() in the SkyFunction.Environment it wraps, so it's possible that
// Skyframe thinks a restart is needed but RFWSFE does not. This is arguably a bug in the
// latter, but let's do the simple thing to recover and wipe the state clean.
//
// This case happens when --keep_going is in effect and when getValue() is called to learn the
// value requested by a previous getValuesAndExceptions() that turns out to be in error: in
// this case, SkyFunction.Environment sets valuesMissing to true and that's not reflected in
// RFWSFE.
return null;
// This case happens when --keep_going is in effect and when getValue() is called to learn
// the value requested by a previous getValuesAndExceptions() that turns out to be in error:
// in this case, SkyFunction.Environment sets valuesMissing to true and that's not reflected
// in RFWSFE.
yield null;
}

recordedInputValues.putAll(state.recordedInputValues);
return s.result();
} else if (signal instanceof Signal.Failure f) {
state.join();
Throwable e = f.e();
Throwables.throwIfInstanceOf(e, RepositoryFunctionException.class);
Throwables.throwIfInstanceOf(e, InterruptedException.class);
Throwables.throwIfUnchecked(e);
throw new IllegalStateException("unexpected exception type: " + e.getClass(), e);
} else {
throw new IllegalStateException();
}
case Signal.Failure(Throwable e) -> {
state.join();
Throwables.throwIfInstanceOf(e, RepositoryFunctionException.class);
Throwables.throwIfInstanceOf(e, InterruptedException.class);
Throwables.throwIfUnchecked(e);
throw new IllegalStateException("unexpected exception type: " + e.getClass(), e);
}
};
}

@Nullable
Expand Down

0 comments on commit 72707a9

Please sign in to comment.