Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1264] feat(spark): support cancel async rpc when kill or interrupt task #1265

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

summaryzb
Copy link
Contributor

@summaryzb summaryzb commented Oct 26, 2023

What changes were proposed in this pull request?

  1. Cancel all the runnable that are wait to be executed or blocked in waiting for rpc callback
  2. Interrupt checkBlockSendResult immediately

Why are the changes needed?

#1264

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Integration test

@codecov-commenter
Copy link

codecov-commenter commented Oct 26, 2023

Codecov Report

Merging #1265 (caf0552) into master (931d6cd) will increase coverage by 1.08%.
The diff coverage is 54.54%.

@@             Coverage Diff              @@
##             master    #1265      +/-   ##
============================================
+ Coverage     54.12%   55.20%   +1.08%     
- Complexity     2516     2671     +155     
============================================
  Files           383      376       -7     
  Lines         21618    20707     -911     
  Branches       1841     1972     +131     
============================================
- Hits          11700    11431     -269     
+ Misses         9186     8577     -609     
+ Partials        732      699      -33     
Files Coverage Δ
...he/uniffle/client/impl/ShuffleWriteClientImpl.java 35.55% <33.33%> (-0.15%) ⬇️
...va/org/apache/uniffle/client/util/ClientUtils.java 60.00% <62.50%> (-1.71%) ⬇️

... and 35 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@summaryzb summaryzb changed the title [WIP][#1264] feat(spark): support cancel async rpc when kill or interrupt task [#1264] feat(spark): support cancel async rpc when kill or interrupt task Oct 27, 2023
@jerqi
Copy link
Contributor

jerqi commented Oct 27, 2023

@LuciferYang Could you help me review this pr?

@@ -304,56 +306,54 @@ protected List<CompletableFuture<Long>> postBlockEvent(
LOG.error("Add event " + event + " to finishEventQueue fail");
}
});
futures.add(shuffleManager.sendData(event));
CompletableFuture<Long> longCompletableFuture = shuffleManager.sendData(event);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to outstandingDataSentFuture ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@@ -442,6 +442,8 @@ public Option<MapStatus> stop(boolean success) {
return Option.empty();
}
} finally {
// cancel all async thread task related
sendingSet.stream().forEach(eventTask -> eventTask.cancel(true));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the task held by this future is still in the queue of dataPusher, the cancel operation will not be valid?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if the task is running of pushing data to shuffle server, the interrupt signal caused by the operation of future.cancel will be accepted and then exit quickly, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the task held by this future is still in the queue of dataPusher, the cancel operation will not be valid?

It will mark the result of the runnable, when the implementation AsyncSupply of runnable is executed, since the result is set, directtly skip the execution of the actual Supplier function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if the task is running of pushing data to shuffle server, the interrupt signal caused by the operation of future.cancel will be accepted and then exit quickly, right?

In fact, the signal will not be passed to the executing thread, i'll change the CompletableFuture to FutureTask

Copy link
Contributor

@LuciferYang LuciferYang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you resolve the conflict? @summaryzb

@@ -89,6 +89,7 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
private final boolean isMemoryShuffleEnabled;
private final Function<String, Boolean> taskFailureCallback;
private final Set<Long> blockIds = Sets.newConcurrentHashSet();
private final Set<CompletableFuture> sendingSet = Sets.newConcurrentHashSet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set<CompletableFuture<Long>>?

}
if (finishEventQueue.isEmpty()) {
remainingMs = Math.max(end - System.currentTimeMillis(), 0);
Object event = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Object event = null;
Object event;

@@ -442,6 +442,8 @@ public Option<MapStatus> stop(boolean success) {
return Option.empty();
}
} finally {
// cancel all async thread task related
sendingSet.stream().forEach(eventTask -> eventTask.cancel(true));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
sendingSet.stream().forEach(eventTask -> eventTask.cancel(true));
sendingSet.forEach(eventTask -> eventTask.cancel(true));

@@ -442,6 +442,8 @@ public Option<MapStatus> stop(boolean success) {
return Option.empty();
}
} finally {
// cancel all async thread task related
sendingSet.stream().forEach(eventTask -> eventTask.cancel(true));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is cancel certain to succeed? If cancel returns false, what negative impact will it have?

@@ -120,8 +126,17 @@ public static boolean waitUntilDoneOrFail(

try {
allFutures.get(10, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can merge these two Exception

@rickyma
Copy link
Contributor

rickyma commented Apr 16, 2024

Do we still need this PR? @summaryzb @zuston @LuciferYang

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants