Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a Thread instead of an ExecutorService for Starlark repository workers #22139

Closed
wants to merge 4 commits into from

Conversation

lberki
Copy link
Contributor

@lberki lberki commented Apr 26, 2024

No description provided.

I managed to reproduce some deadlocks during repo fetching with virtual worker threads. One notable trigger was some _other_ repo failing to fetch, which seems to cause Skyframe to try to interrupt other concurrent repo fetches. This _might_ be the cause for a deadlock where we submit a task to the worker executor service, but the task never starts running before it gets cancelled, which causes us to wait forever for a `DONE` signal that never comes. (The worker task puts a `DONE` signal in the queue in a `finally` block -- but we don't even enter the `try`.)

I then tried various things to fix this; this PR is an attempt that actually seemed to eliminate the deadlock. Instead of waiting for a `DONE` signal to make sure the worker thread has finished, we now hold on to the executor service, which offers a `close()` method that essentially uninterruptibly waits for any scheduled tasks to terminate, whether or not they have started running. (@justinhorvitz had suggested a similar idea before.) To make sure distinct repo fetches don't interfere with each other, we start a separate worker executor service for each repo fetch instead of making everyone share the same worker executor service. (This is recommended for virtual threads; see https://docs.oracle.com/en/java/javase/21/core/virtual-threads.html#GUID-C0FEE349-D998-4C9D-B032-E01D06BE55F2 for example.)

Related: #22003
@github-actions github-actions bot added team-ExternalDeps External dependency handling, remote repositiories, WORKSPACE file. awaiting-review PR is awaiting review from an assigned reviewer labels Apr 26, 2024
@lberki
Copy link
Contributor Author

lberki commented Apr 26, 2024

@Wyverald what do you think about this alternative proposal? I essentially took your "use a Thread" change and ran with it -- it turned out to be much simpler than any of the other alternatives and passes the

for i in $(seq 1 1000); do echo RUN $i; ghbazel build --noenable_bzlmod configure_with_bazel_transitive:all; done

test you used as a canary. It also passes a synthetic test that makes Bazel fetch 100 Starlark repositories in parallel then making one of them fail, but that also passes at HEAD so I'm not sure if that one is a good benchmark.

I made a number of non-obvious changes:

  1. I made it so that when cleaning up the SkyFunctionState due to memory pressure, the worker thread is joined. This is so that the state space is a bit less complicated and also so that we don't leave resources (a virtual thread) hanging around. This made it possible to simplify the handling of InterruptedException since it's now impossible that the worker thread returns one to the Skyframe thread under normal circumstances (the carrier thread can still be SIGINT'ed, I think, so we can't ignore that possibility)
  2. When creating a new state, I now pass the SkyFunction.Environment into it in the same way as for a restart. This makes it easy to handle InterruptedException in the Skyframe thread during that BlockingQueue.put() call.

I also have a few questions:

  1. Is SkyKeyComputeState.close() called by Skyframe in any other circumstance than memory pressure? I'm not sure if it would make a difference since it's idempotent and I like being able to look at StarlarkRepositoryFunction.compute() and understand all the possible states that SkyFunction can be in, but still, I'm curious.
  2. Why is RepoFetchingSkyKeyComputeState.workedThread volatile? My understanding is that it's accessed only from close(), join() and when the object is created, neither of which should interfere with the other.

@lberki
Copy link
Contributor Author

lberki commented Apr 26, 2024

This one still has one deadlock (that I know of, anyway) which is shared by every alternative: if the worker thread gets hit by a SIGINT in delegateEnvQueue.take() in signalForFreshEnv(), it will go to the putInterruptibly() call in catch (Throwable e) in the lambda that is passed to Thread.start(). In that case, the worker thread will hang in signalQueue.put(), the Skyfram thread, in delegateEnvQueue.put(). Let's see if I can fix that...

@lberki
Copy link
Contributor Author

lberki commented Apr 26, 2024

This alternative is free of any deadlocks I know of.

The reason why I stopped using SynchronousQueue is that we'd need two of them and I think that doesn't work: in signalForFreshEnv(), for example, an interrupt can arrive either in .put() or in .take(), but when it does, the next synchronization call will be .put() to tell the Skyframe thread about the interrupt. So the Skyframe thread would need to be prepared to handle both cases, but it can't wait on two SynchronousQueues at the same time. Using only one synchronization object avoid that problem.

WDYT?

@lberki lberki force-pushed the lberki-repoworker-thread branch 5 times, most recently from 0a86ff7 to b5c0abb Compare April 26, 2024 14:05
.start(
() -> {
try {
SkyFunction.Environment workerEnvInThread = state.rendezvousFromWorker(null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a big fan of the use of Exchanger here.

  1. Often the information exchange is one-way only (here and on line 166). You might as well be using SynchronousQueue.
  2. There is no type safety; untyped Objects are being passed around. This is mitigated by the wrapper rendezvous* methods but at that point why don't we just use two SynchronousQueues?
  3. Following on from the two points above: it's really hard to tell what's going on at any given point. Should we exchange by giving something? And/or should we ask for something back? What are we giving or asking back? These questions can be answered, but you need to be looking really carefully at the code on both sides to even begin answer them. Whereas previously the "state" was very easy to tell: either the worker sends back a RESTART and expects an Environment back, or it sends back a DONE and is finished.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah sorry I missed your comment explaining your rationale for Exchanger before. I now understand the reason better, but still think point 3 above stands. Maybe it's just a matter of naming? Or maybe clearer demarcating of states.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sigh. No, I'm not a fan of Exchanger, either and its appearance is an obvious sign of desperation, but I have no better ideas. Trimmed to the bone, the worker thread does this:

try {
  while (!done) {
    var env = getNewEnvironment();
    doThings(env);
    send("success");
  }
} catch (Throwable e) {
  send("failure");
}

and the Skyframe function does this:

try 
  send(env);
  var result = receive();
  return result == "restart" ? null : result;
} catch (Throwable e) {
  return "failure";
}

and the discrepancy between the two halves is that the worker thread can consider sending a new request and getting a new environment an atomic operation, but the Skyframe thread cannot, because it needs to do work in between.

It is also the case that when an interrupt happens, the worker thread will end up at send("failure"), regardless of whether the Skyframe thread expects it to send or receive a value. IOW, the Skyframe thread must be ready to receive a failure even when the worker thread signaled that it wants to received something instead. Which is why two blocking data structures inherently doesn't work.

The way I got here is that I thought that it would be easier to model this with two state machines passing messages between each other, then realized that writing a state machine and handling exceptions in parallel isn't nice (I tried), then I realized that executing code is just a state machine whose state is on the stack (and registers, etc.), then I started looking for a bidirectional communication mechanism between two threads, then I found Exchanger.

No, I'm not happy about this but this is about the best I could come up with :(

}
workerThread.interrupt();

// Wait until the worker thread actually gets interrupted. Be resilient to cases where despite
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This goes against the comment on line 64 (ie. we're not supposed to block in close()).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize that this is not supposed to block, but in my not-so-humble opinion it's much better to block a bit than to have unaccounted for state floating around for an unspecified amount of time. This is especially bad because the worker thread can have side effects visible outside of Bazel (since it can do I/O, write files, read files, etc., all from user code) and it sounds like a terrible idea to have that going on without the knowledge of the rest of Bazel.

That said, my sticking point is not that we block here, it's that we don't do anything with side effects outside of the worker thread after this method returns so if you have any clever ideas as to how to implement that without blocking, I'm all ears.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is especially bad because the worker thread can have side effects visible outside of Bazel (since it can do I/O, write files, read files, etc., all from user code) and it sounds like a terrible idea to have that going on without the knowledge of the rest of Bazel.

Isn't that why we block in the host thread? The memory pressure listener thread just interrupts the worker thread, and it's the host thread's job to make sure the worker is finished before moving on. It's not necessary that the worker thread is terminated right after close().

@Wyverald
Copy link
Member

  • Is SkyKeyComputeState.close() called by Skyframe in any other circumstance than memory pressure? I'm not sure if it would make a difference since it's idempotent and I like being able to look at StarlarkRepositoryFunction.compute() and understand all the possible states that SkyFunction can be in, but still, I'm curious.

I don't think so, but I'm not 100% sure about anything at this point...

  • Why is RepoFetchingSkyKeyComputeState.workedThread volatile? My understanding is that it's accessed only from close(), join() and when the object is created, neither of which should interfere with the other.

The comment there explains it. close() can technically be called by multiple things at the same time (high memory pressure watcher & host thread, for example), and we use volatile to make sure there are no races between threads setting null.

@brentleyjones brentleyjones changed the title Use a Thread instead of and ExecutorService for Starlark repository workers Use a Thread instead of an ExecutorService for Starlark repository workers Apr 29, 2024
@lberki
Copy link
Contributor Author

lberki commented Apr 29, 2024

  • Is SkyKeyComputeState.close() called by Skyframe in any other circumstance than memory pressure? I'm not sure if it would make a difference since it's idempotent and I like being able to look at StarlarkRepositoryFunction.compute() and understand all the possible states that SkyFunction can be in, but still, I'm curious.

I don't think so, but I'm not 100% sure about anything at this point...

Argh, it looks like we do invalidate stateCache from UnnecessaryTemporaryStateDropper which happens in response to JMX events, etc., which I mentally translate it to "whenever and on whatever thread the JVM damn well pleases", which doesn't translate well with blocking in close(), but that's the lesser evil than letting side effects run amok...

  • Why is RepoFetchingSkyKeyComputeState.workedThread volatile? My understanding is that it's accessed only from close(), join() and when the object is created, neither of which should interfere with the other.

The comment there explains it. close() can technically be called by multiple things at the same time (high memory pressure watcher & host thread, for example), and we use volatile to make sure there are no races between threads setting null.

But then it should be an AtomicReference and we should use getAndSet(), right? volatile does not guarantee that two threads don't read the same value. Technically, Future.cancel() is idempotent, but I'd much rather be obviously correct than relying on guarantees other code promises (but not necessarily holds...)

@lberki
Copy link
Contributor Author

lberki commented Apr 29, 2024

Update: I tried to use AtomicReference for state.workerThread and failed at the last step -- it turns out that with test_keep_going_weird_deadlock(), we have a case where a single state instance can have multiple worker threads and I couldn't get that case to work with AtomicReference in a way that was obviously correct.

I think it still all works out because Thread.join() is idempotent (you can join a single thread multiple times just fine), but it makes me a bit less comfortable. However, it looks like we want to handle that case cleanly, it's better done in a separate change; this one is complicated enough as it is.

@lberki
Copy link
Contributor Author

lberki commented May 3, 2024

Discarding in favor of #22215 and #22100.

@lberki lberki closed this May 3, 2024
@lberki lberki deleted the lberki-repoworker-thread branch May 3, 2024 06:29
@github-actions github-actions bot removed the awaiting-review PR is awaiting review from an assigned reviewer label May 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
team-ExternalDeps External dependency handling, remote repositiories, WORKSPACE file.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants