New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#1264] feat(spark): support cancel async rpc when kill or interrupt task #1265
base: master
Are you sure you want to change the base?
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1265 +/- ##
============================================
+ Coverage 54.12% 55.20% +1.08%
- Complexity 2516 2671 +155
============================================
Files 383 376 -7
Lines 21618 20707 -911
Branches 1841 1972 +131
============================================
- Hits 11700 11431 -269
+ Misses 9186 8577 -609
+ Partials 732 699 -33
... and 35 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
@LuciferYang Could you help me review this pr? |
@@ -304,56 +306,54 @@ protected List<CompletableFuture<Long>> postBlockEvent( | |||
LOG.error("Add event " + event + " to finishEventQueue fail"); | |||
} | |||
}); | |||
futures.add(shuffleManager.sendData(event)); | |||
CompletableFuture<Long> longCompletableFuture = shuffleManager.sendData(event); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to outstandingDataSentFuture
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
@@ -442,6 +442,8 @@ public Option<MapStatus> stop(boolean success) { | |||
return Option.empty(); | |||
} | |||
} finally { | |||
// cancel all async thread task related | |||
sendingSet.stream().forEach(eventTask -> eventTask.cancel(true)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the task held by this future is still in the queue of dataPusher, the cancel operation will not be valid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And if the task is running of pushing data to shuffle server, the interrupt signal caused by the operation of future.cancel
will be accepted and then exit quickly, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the task held by this future is still in the queue of dataPusher, the cancel operation will not be valid?
It will mark the result of the runnable, when the implementation AsyncSupply
of runnable is executed, since the result is set, directtly skip the execution of the actual Supplier
function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And if the task is running of pushing data to shuffle server, the interrupt signal caused by the operation of
future.cancel
will be accepted and then exit quickly, right?
In fact, the signal will not be passed to the executing thread, i'll change the CompletableFuture
to FutureTask
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you resolve the conflict? @summaryzb
@@ -89,6 +89,7 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> { | |||
private final boolean isMemoryShuffleEnabled; | |||
private final Function<String, Boolean> taskFailureCallback; | |||
private final Set<Long> blockIds = Sets.newConcurrentHashSet(); | |||
private final Set<CompletableFuture> sendingSet = Sets.newConcurrentHashSet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Set<CompletableFuture<Long>>
?
} | ||
if (finishEventQueue.isEmpty()) { | ||
remainingMs = Math.max(end - System.currentTimeMillis(), 0); | ||
Object event = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Object event = null; | |
Object event; |
@@ -442,6 +442,8 @@ public Option<MapStatus> stop(boolean success) { | |||
return Option.empty(); | |||
} | |||
} finally { | |||
// cancel all async thread task related | |||
sendingSet.stream().forEach(eventTask -> eventTask.cancel(true)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sendingSet.stream().forEach(eventTask -> eventTask.cancel(true)); | |
sendingSet.forEach(eventTask -> eventTask.cancel(true)); |
@@ -442,6 +442,8 @@ public Option<MapStatus> stop(boolean success) { | |||
return Option.empty(); | |||
} | |||
} finally { | |||
// cancel all async thread task related | |||
sendingSet.stream().forEach(eventTask -> eventTask.cancel(true)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is cancel
certain to succeed? If cancel
returns false, what negative impact will it have?
@@ -120,8 +126,17 @@ public static boolean waitUntilDoneOrFail( | |||
|
|||
try { | |||
allFutures.get(10, TimeUnit.MILLISECONDS); | |||
} catch (TimeoutException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can merge these two Exception
Do we still need this PR? @summaryzb @zuston @LuciferYang |
What changes were proposed in this pull request?
Why are the changes needed?
#1264
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Integration test