Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a Semaphore for signaling in repo fetching #22100

Closed
wants to merge 3 commits into from
Closed

Conversation

Wyverald
Copy link
Member

@Wyverald Wyverald commented Apr 23, 2024

I managed to reproduce some deadlocks during repo fetching with virtual worker threads. One notable trigger was some other repo failing to fetch, which seems to cause Skyframe to try to interrupt other concurrent repo fetches. This might be the cause for a deadlock where we submit a task to the worker executor service, but the task never starts running before it gets cancelled, which causes us to wait forever for a DONE signal that never comes. (The worker task puts a DONE signal in the queue in a finally block -- but we don't even enter the try.)

This PR improves the situation in various ways:

  1. Instead of using a SynchronousQueue for the signal queue, we now use a Semaphore for signaling. Semaphores have the crucial property that releasing a permit (ie. incrementing the counter) does not block, and thus cannot be interrupted. This means that the worker thread can now reliably send signals the host thread, even when it's interrupted.

  2. Instead of using two signals for DONE and RESTART, we just use the one semaphore for both signals, and rely on workerFuture.isDone() to tell whether the worker has finished or is waiting for a fresh Environment.

  3. Instead of signaling DONE in a finally block, we now use a ListenableFuture and signal to the semaphore in the worker future's listener. This makes sure that the signaling is performed after the worker future's status changes, and safeguards against the case where the submitted task never starts running before it gets cancelled.

  4. Instead of waiting for a DONE signal (or, in the new setup, the signal semaphore) to make sure the worker thread has finished, we now hold on to the executor service, which offers a close() method that essentially uninterruptibly waits for any scheduled tasks to terminate, whether or not they have started running. (@justinhorvitz had suggested a similar idea before.) To make sure distinct repo fetches don't interfere with each other, we start a separate worker executor service for each repo fetch instead of making everyone share the same worker executor service. (This is recommended for virtual threads; see https://docs.oracle.com/en/java/javase/21/core/virtual-threads.html#GUID-C0FEE349-D998-4C9D-B032-E01D06BE55F2 for example.)

And because I now create a separate worker executor service for each repo fetch, it doesn't really make sense to use this for platform threads anymore. So setting --experimental_worker_for_repo_fetching to any but off will cause us to use virtual threads.

Related: #22003

Fixes #21712.

@github-actions github-actions bot added team-ExternalDeps External dependency handling, remote repositiories, WORKSPACE file. awaiting-review PR is awaiting review from an assigned reviewer labels Apr 23, 2024
@Wyverald
Copy link
Member Author

cc @meteorcloudy @coeuvre

@lberki
Copy link
Contributor

lberki commented Apr 24, 2024

Now that platform threads are not used anymore, why do we even need an ExecutorService and monkey around with futures? I could imagine an approach where we'd directly store a virtual thread in RepoFetchingSkyKeyComputeState and we join() it in a suitably-placed finally block and in case an early exit is needed, we can call Thread.interrupt() on it.

Granted, that thread could still swallow interrupts but at least there would be a bit fewer moving parts: that way, no questions would arise about whether we are using the ExecutorService correctly and it doesn't bring anything useful to the table anyway because we only ever pass one task to it.

@Wyverald
Copy link
Member Author

Now that platform threads are not used anymore, why do we even need an ExecutorService and monkey around with futures?

that's a good point -- the executor service really has no value anymore, so I'll probably just switch to a Thread. The future is still nice as it encapsulates the result [success (a value), failure (an exception), cancellation etc] quite nicely, so I'll probably still use it, but just as a SettableFuture or something.

anyhow -- watch this space!

@Wyverald
Copy link
Member Author

Wyverald commented Apr 24, 2024

Now that platform threads are not used anymore, why do we even need an ExecutorService and monkey around with futures?

that's a good point -- the executor service really has no value anymore, so I'll probably just switch to a Thread. The future is still nice as it encapsulates the result [success (a value), failure (an exception), cancellation etc] quite nicely, so I'll probably still use it, but just as a SettableFuture or something.

Okay, I gave up. (EDIT: to be clear, I gave up on switching to Thread, not this PR.) The failed attempt is at #22110 if anyone's interested. It turns out using an executor service for a single task is still valuable since it manages the lifecycle of the underlying thread (with some latches etc). Directly using a thread while also trying to communicate the result/exception back to the host thread turned out to be rather tricky; in particular, while trying to avoid a case where the worker thread dies without emitting a signal, I always fell into some other deadlock (see failed PR for comments).

It seems to me that there's always some tension between the signal queue and the actual lifecycle of the worker thread. The host thread needs to listen on the signal queue while also watching out for the worker thread dying before emitting a signal. Even with the executor service approach, I'm not confident that we're free of deadlocks; for example, if the worker is interrupted due to memory pressure, the host will probably wait forever to take something from the signal queue until it gets interrupted by something.

It almost feels like we need some sort of channel select mechanism -- upon any of 1) worker interrupted, 2) host interrupted, 3) worker emits signal, we need to take some action.

@lberki
Copy link
Contributor

lberki commented Apr 25, 2024

I think I have an explanation for the deadlock you found on #22110. you interrupt the worker thread, then the InterruptedException is caught in the catch (Throwable e) in line 160, then it'll try to put something on the signal queue in the finally clause, but there is no one there to get that thing from the signal queue, so it's a deadlock.

I can imagine two solutions to this:

  1. Don't use the BlockingQueue to report that the thread is done in case an interrupt is received. This would work because even if the worker thread wants to put something in the queue when the interrupt hits it, the interrupt will unblock it. So doing nothing in the catch (InterruptedException e) {} branch is safe. This probably only works if the only thing that can interrupt the worker thread is the Skyframe thread (Can a virtual thread get interrupted in any way other than by Thread.interrupt()? Maybe a SIGINT to its carrier thread, whatever it may be?)

  2. Use an unlimited queue (so that it never blocks) instead of a BlockingQueue. This would get rid of the synchronization point between .put() and .take() and make the code more resilient to cases you haven't anticipated and you could e.g.

    1. Make the worker thread emit an "I am done" message right before the thread finishes
    2. When the Skyframe thread is interrupted, interrupt the worker thread, then drain the queue until you get the "I am done" message, then join the worker thread. FWIW, we use this approach in quite a few places in the code base (look for POISON_PILL, all caps), although I'm not sure if there is bidirectional communication in any of these places.
    3. Otherwise, the Skyframe thread would get a message from the worker thread and if it's an unexpected "I am done" one, it knows that the worker thread unexpectedly died and can at least report an error instead of deadlocking.

I think I'd do (2) for the very practical reason that you have carefully orchestrating aSynchronousQueue and it didn't work, but in either case, I think thinking through all the possible sequence of events is not optional...

@lberki
Copy link
Contributor

lberki commented Apr 25, 2024

I take the "unbounded queue" idea back. The only case where a deadlock should happen is when one thread is waiting in .put() / .take() and the other doesn't ever get there and it looks like it's reasonably simple to avoid that: in particular, I think "poison pill" approach to signal from one side that it should not expect any more updates from the other is pretty sound and easy to grok.

@Wyverald
Copy link
Member Author

then it'll try to put something on the signal queue in the finally clause, but there is no one there to get that thing from the signal queue, so it's a deadlock.

There is no finally clause? Also note that an interrupt is sent to the worker thread right before the host blocks on the join.

This probably only works if the only thing that can interrupt the worker thread is the Skyframe thread

This is not the case, unfortunately; Skyframe can close() the state object on high memory pressure, interrupting the worker thread.

Make the worker thread emit an "I am done" message right before the thread finishes

in particular, I think "poison pill" approach to signal from one side that it should not expect any more updates from the other is pretty sound and easy to grok.

Isn't that what we're doing already? The DONE signal is that poison pill. It's just really hard to enforce that a DONE is put on the queue because the "put" itself may be interrupted. If you do it with Uninterruptibles.putUninterruptibly, you might then get a deadlock if the host thread was already interrupted. "Draining the queue" doesn't make sense for a synchronous queue (unless you mean to use a size-1 queue or something).

@lberki
Copy link
Contributor

lberki commented Apr 26, 2024

My apologies; instead of "finally clause", I should have said catch (Throwable e).

I took your attempt and ran with it and created #22139 . WDYT? At the time I'm writing this, test_no_restarts_fetching_with_worker_thread still fails but at least there are no deadlocks which is a good start :) By the time you wake up, I hope I'll be able to polish it up to code.

@justinhorvitz justinhorvitz removed their request for review April 30, 2024 21:41
@justinhorvitz
Copy link
Contributor

Since you have feedback from other reviewers, and I have no experience with virtual threads, I removed myself as a reviewer. Please let me know if you want me to take a look at anything in particular though.

@Wyverald Wyverald changed the title Hold on to the worker executor service in RepoFetching Use a Semaphore for signaling in repo fetching May 1, 2024
@Wyverald
Copy link
Member Author

Wyverald commented May 1, 2024

I finally found a solution I'm happy with! Please see the updated PR description. This new approach using Semaphores is IMO pretty clean and easy to understand.

Copy link
Contributor

@lberki lberki left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of all the solutions, I like this one the best. As appealing as my Thread-based solution is, it somehow always ends up being more complicated than necessary. This one arguably has a lot of moving parts, but at least not a lot of code, which is a win.

I managed to reproduce some deadlocks during repo fetching with virtual worker threads. One notable trigger was some _other_ repo failing to fetch, which seems to cause Skyframe to try to interrupt other concurrent repo fetches. This _might_ be the cause for a deadlock where we submit a task to the worker executor service, but the task never starts running before it gets cancelled, which causes us to wait forever for a `DONE` signal that never comes. (The worker task puts a `DONE` signal in the queue in a `finally` block -- but we don't even enter the `try`.)

This PR improves the situation in various ways:

1. Instead of using a `SynchronousQueue` for the signal queue, we now use a Semaphore for signaling. Semaphores have the crucial property that releasing a permit (ie. incrementing the counter) does not block, and thus cannot be interrupted. This means that the worker thread can now reliably send signals the host thread, even when it's interrupted.

2. Instead of using two signals for `DONE` and `RESTART`, we just use the one semaphore for both signals, and rely on `workerFuture.isDone()` to tell whether the worker has finished or is waiting for a fresh Environment.

3. The above requires another change: instead of signaling `DONE` in a `finally` block, we now use a `ListenableFuture` and signal to the semaphore in the worker future's listener. This makes sure that the signaling is performed _after_ the worker future's status changes. (Note that points 2 & 3 aren't the only way to handle this -- we could alternatively just use two semaphores.)

4. Instead of waiting for a `DONE` signal (or, in the new setup, the signal semaphore) to make sure the worker thread has finished, we now hold on to the executor service, which offers a `close()` method that essentially uninterruptibly waits for any scheduled tasks to terminate, whether or not they have started running. (@justinhorvitz had suggested a similar idea before.) To make sure distinct repo fetches don't interfere with each other, we start a separate worker executor service for each repo fetch instead of making everyone share the same worker executor service. (This is recommended for virtual threads; see https://docs.oracle.com/en/java/javase/21/core/virtual-threads.html#GUID-C0FEE349-D998-4C9D-B032-E01D06BE55F2 for example.)

And because I now create a separate worker executor service for each repo fetch, it doesn't really make sense to use this for platform threads anymore. So setting `--experimental_worker_for_repo_fetching` to any but `off` will cause us to use virtual threads.

Related: #22003

Fixes #21712.
…ressure while the host Skyframe thread is inactive
@Override
public void close() {
var myWorkerFuture = workerFuture;
workerFuture = null;
if (myWorkerFuture != null) {
myWorkerFuture.cancel(true);
}
workerExecutorService.shutdownNow();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: doesn't shutdownNow() cancel all pending work?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it does. If you're asking why I call shutdownNow() and future.cancel(true): per my reading of the code, shutdownNow() interrupts the worker thread, but does not technically cancel the future. So the future would end with a state of "finished exceptionally" with an InterruptedException. I think it's better to be consistent here and actually mark the future as canceled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL. Out of curiosity, did you experimentally verify this? It makes some weird sort of sense, but AFAIU each task an ExecutorService runs is wrapped in a future, so what would be the point of interrupting a task without cancelling the associated future?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't experimentally verified this. But anyhow, the ThreadPerTaskExecutor does not hold on to the Futures it returns from its submit() method; also it does also have an execute() method that doesn't create a Future at all.

// Unless we know the worker is waiting on a fresh Environment, we should *always* shut down
// the worker executor and reset the state by the time we finish executing (successfully or
// otherwise). This ensures that 1) no background work happens without our knowledge, and
// 2) if the SkyFunction is re-entered for any reason (for example b/330892334 and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that in this case, the return value of this SkyFunction will be null, but how? How does the worker thread guarantee that in that case, it will return null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that in this case, the return value of this SkyFunction will be null, but how?

It's not null IIUC. For b/330892334 it'll presumably throw on the newly-discovered already-eval'd failing env.getValue(); for #21238 it's error-bubbling, so it should throw some previously thrown exception (or "bubble up", whatever that may mean).

@lberki
Copy link
Contributor

lberki commented May 3, 2024

I added some comments, but I think that they are just asking for reassurance and the code works as it is today.

I propose the following course of action:

  1. I approve this pull request
  2. @Wyverald takes a look at my comments to see if they indicate and actual issues
  3. If not, he merges it
  4. I drop Use a Thread instead of an ExecutorService for Starlark repository workers #22139 and Rewrite repository worker threads using message passing #22191 since they are strictly inferior to Rewrite Starlark repostory workers using an RPC paradigm... #22215
  5. We continue iterating on Rewrite Starlark repostory workers using an RPC paradigm... #22215

@Wyverald Wyverald added awaiting-PR-merge PR has been approved by a reviewer and is ready to be merge internally and removed awaiting-review PR is awaiting review from an assigned reviewer labels May 3, 2024
@copybara-service copybara-service bot closed this in eb905bd May 4, 2024
@github-actions github-actions bot removed the awaiting-PR-merge PR has been approved by a reviewer and is ready to be merge internally label May 4, 2024
@Wyverald Wyverald deleted the wyv-repoworker branch May 6, 2024 18:45
Wyverald added a commit that referenced this pull request May 6, 2024
I managed to reproduce some deadlocks during repo fetching with virtual worker threads. One notable trigger was some _other_ repo failing to fetch, which seems to cause Skyframe to try to interrupt other concurrent repo fetches. This _might_ be the cause for a deadlock where we submit a task to the worker executor service, but the task never starts running before it gets cancelled, which causes us to wait forever for a `DONE` signal that never comes. (The worker task puts a `DONE` signal in the queue in a `finally` block -- but we don't even enter the `try`.)

This PR improves the situation in various ways:

1. Instead of using a `SynchronousQueue` for the signal queue, we now use a Semaphore for signaling. Semaphores have the crucial property that releasing a permit (ie. incrementing the counter) does not block, and thus cannot be interrupted. This means that the worker thread can now reliably send signals the host thread, even when it's interrupted.

2. Instead of using two signals for `DONE` and `RESTART`, we just use the one semaphore for both signals, and rely on `workerFuture.isDone()` to tell whether the worker has finished or is waiting for a fresh Environment.

3. Instead of signaling `DONE` in a `finally` block, we now use a `ListenableFuture` and signal to the semaphore in the worker future's listener. This makes sure that the signaling is performed _after_ the worker future's status changes, and safeguards against the case where the submitted task never starts running before it gets cancelled.

4. Instead of waiting for a `DONE` signal (or, in the new setup, the signal semaphore) to make sure the worker thread has finished, we now hold on to the executor service, which offers a `close()` method that essentially uninterruptibly waits for any scheduled tasks to terminate, whether or not they have started running. (@justinhorvitz had suggested a similar idea before.) To make sure distinct repo fetches don't interfere with each other, we start a separate worker executor service for each repo fetch instead of making everyone share the same worker executor service. (This is recommended for virtual threads; see https://docs.oracle.com/en/java/javase/21/core/virtual-threads.html#GUID-C0FEE349-D998-4C9D-B032-E01D06BE55F2 for example.)

And because I now create a separate worker executor service for each repo fetch, it doesn't really make sense to use this for platform threads anymore. So setting `--experimental_worker_for_repo_fetching` to any but `off` will cause us to use virtual threads.

Related: #22003

Fixes #21712.

Closes #22100.

PiperOrigin-RevId: 630534733
Change-Id: If989bf9cae76abb1579a2b1de896df8e5a63b88d
Kila2 pushed a commit to Kila2/bazel that referenced this pull request May 13, 2024
I managed to reproduce some deadlocks during repo fetching with virtual worker threads. One notable trigger was some _other_ repo failing to fetch, which seems to cause Skyframe to try to interrupt other concurrent repo fetches. This _might_ be the cause for a deadlock where we submit a task to the worker executor service, but the task never starts running before it gets cancelled, which causes us to wait forever for a `DONE` signal that never comes. (The worker task puts a `DONE` signal in the queue in a `finally` block -- but we don't even enter the `try`.)

This PR improves the situation in various ways:

1. Instead of using a `SynchronousQueue` for the signal queue, we now use a Semaphore for signaling. Semaphores have the crucial property that releasing a permit (ie. incrementing the counter) does not block, and thus cannot be interrupted. This means that the worker thread can now reliably send signals the host thread, even when it's interrupted.

2. Instead of using two signals for `DONE` and `RESTART`, we just use the one semaphore for both signals, and rely on `workerFuture.isDone()` to tell whether the worker has finished or is waiting for a fresh Environment.

3. Instead of signaling `DONE` in a `finally` block, we now use a `ListenableFuture` and signal to the semaphore in the worker future's listener. This makes sure that the signaling is performed _after_ the worker future's status changes, and safeguards against the case where the submitted task never starts running before it gets cancelled.

4. Instead of waiting for a `DONE` signal (or, in the new setup, the signal semaphore) to make sure the worker thread has finished, we now hold on to the executor service, which offers a `close()` method that essentially uninterruptibly waits for any scheduled tasks to terminate, whether or not they have started running. (@justinhorvitz had suggested a similar idea before.) To make sure distinct repo fetches don't interfere with each other, we start a separate worker executor service for each repo fetch instead of making everyone share the same worker executor service. (This is recommended for virtual threads; see https://docs.oracle.com/en/java/javase/21/core/virtual-threads.html#GUID-C0FEE349-D998-4C9D-B032-E01D06BE55F2 for example.)

And because I now create a separate worker executor service for each repo fetch, it doesn't really make sense to use this for platform threads anymore. So setting `--experimental_worker_for_repo_fetching` to any but `off` will cause us to use virtual threads.

Related: bazelbuild#22003

Fixes bazelbuild#21712.

Closes bazelbuild#22100.

PiperOrigin-RevId: 630534733
Change-Id: If989bf9cae76abb1579a2b1de896df8e5a63b88d
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
team-ExternalDeps External dependency handling, remote repositiories, WORKSPACE file.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Hang in 'checking cached actions'
5 participants