-
Notifications
You must be signed in to change notification settings - Fork 325
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow BoundedExecutor to recover from failures #742
base: master
Are you sure you want to change the base?
Conversation
Previously submitted tasks will eventually run if the underlying executor recovers and additional tasks are submitted.
try { | ||
coreExecutor.execute(this::drainQueue); | ||
} | ||
catch (Throwable e) { | ||
failed.set(true); | ||
log.error("BoundedExecutor state corrupted due to underlying executor failure"); | ||
decrementAndGetQueueSize(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if coreExecutor.execute(this::drainQueue)
throws immediately, you can end up having 1 task in queue and queueSize=0.
To me, it looks like you want something like "current thread count" instead of "queue size". Question is how to do this without losing an incoming task.
catch (Throwable e) { | ||
log.error(e, "Task failed"); | ||
|
||
if ((decrementAndGetQueueSize() == 0) && (task == null)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary parenthesis
log.error(e, "Task failed"); | ||
|
||
if ((decrementAndGetQueueSize() == 0) && (task == null)) { | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With maxThreads=1 consider
- thread A calls execute
a. adds task
b. queueSize := 1
c. starts a thread A' - thread A' removes the task from the queue and executes it
- thread A' calls
decrementAndGetQueueSize
, i.e. queueSize := 0, but does not exit becausetask
was not null - thread B (or A) calls execute
a. adds task
b. queueSize := 1
c. starts a thread B', because queueSize <= maxThreads - thread C calls execute
a. adds task
b. queueSize := 2
c. doesn't start a thread
At this point, 2 threads A' and B' are executing within coreExecutor even though maxThreads is 1 and both will attempt to poll & execute next task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(updated)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But if we could rearrange the if
condition to task == null && decrementAndGetQueueSize() == 0
then threadB' won't be created as queueSize := 0
only if the task == null
or maybe we can peek into the queue to check for the next task
decrementAndGetQueueSize == 0 && queue.peek() == null
@electrum: to achieve the desired behavior, we really only have 3 possible options here:
There is no way to make this magically work without one of these three options. For me, I would suggest (2). |
Previously submitted tasks will eventually run if the underlying
executor recovers and additional tasks are submitted.