Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1579][part-1] fix(spark): Adjust reassigned time to ensure that all previous data is cleared for stage retry #1584

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

zuston
Copy link
Member

@zuston zuston commented Mar 15, 2024

What changes were proposed in this pull request?

clear out previous stage attempt data synchronously when registering the re-assignment shuffleIds.

Why are the changes needed?

Fix: #1579

If the previous stage attempt is in the purge queue in shuffle-server side, the retry stage writing will cause
unknown exceptions, so we'd better to clear out all previous stage attempt data before re-registering

This PR is to sync remove previous stage data when the first attempt writer is initialized.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing tests.

@zuston
Copy link
Member Author

zuston commented Mar 15, 2024

cc @dingshun3016 @yl09099 PTAL

Copy link

github-actions bot commented Mar 15, 2024

Test Results

 2 419 files  ±0   2 419 suites  ±0   4h 58m 9s ⏱️ +16s
   933 tests ±0     932 ✅ ±0   1 💤 ±0  0 ❌ ±0 
10 819 runs  ±0  10 805 ✅ ±0  14 💤 ±0  0 ❌ ±0 

Results for commit 3dd6b34. ± Comparison against base commit a0e88da.

♻️ This comment has been updated with latest results.

@zuston
Copy link
Member Author

zuston commented Mar 15, 2024

After rethinking this, I think the reassignAllShuffleServersForWholeStage could be invoked by the retry writer rather than previous failed writer that could ensure no older data into server after re-register.

leslizhang pushed a commit to leslizhang/incubator-uniffle that referenced this pull request Mar 19, 2024
@zuston zuston force-pushed the stageRetry2 branch 2 times, most recently from b192095 to 4d3a892 Compare March 22, 2024 08:25
@codecov-commenter
Copy link

codecov-commenter commented Mar 22, 2024

Codecov Report

Attention: Patch coverage is 3.29670% with 352 lines in your changes are missing coverage. Please review.

Project coverage is 53.42%. Comparing base (6f6d35a) to head (5c9d9e3).
Report is 34 commits behind head on master.

Files Patch % Lines
...uniffle/shuffle/manager/RssShuffleManagerBase.java 0.00% 187 Missing ⚠️
.../shuffle/handle/StageAttemptShuffleHandleInfo.java 0.00% 43 Missing ⚠️
...pache/uniffle/server/ShuffleServerGrpcService.java 0.00% 32 Missing ⚠️
.../apache/spark/shuffle/RssStageResubmitManager.java 0.00% 22 Missing ⚠️
...spark/shuffle/handle/MutableShuffleHandleInfo.java 0.00% 22 Missing ⚠️
...niffle/server/netty/ShuffleServerNettyHandler.java 0.00% 9 Missing ⚠️
...ffle/client/request/RssRegisterShuffleRequest.java 0.00% 7 Missing ⚠️
...fle/shuffle/manager/ShuffleManagerGrpcService.java 0.00% 6 Missing ⚠️
...ffle/client/impl/grpc/ShuffleServerGrpcClient.java 0.00% 6 Missing ⚠️
...ffle/client/request/RssSendShuffleDataRequest.java 0.00% 5 Missing ⚠️
... and 6 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #1584      +/-   ##
============================================
- Coverage     54.86%   53.42%   -1.45%     
- Complexity     2358     2943     +585     
============================================
  Files           368      435      +67     
  Lines         16379    23768    +7389     
  Branches       1504     2208     +704     
============================================
+ Hits           8986    12697    +3711     
- Misses         6862    10290    +3428     
- Partials        531      781     +250     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@zuston zuston changed the title [#1579] fix(spark): clear out previous stage attempt data synchronously [#1579] fix(spark): Adjust reassgin time to avoid failure to clean up previous stage data Mar 22, 2024
@jerqi
Copy link
Contributor

jerqi commented Mar 22, 2024

It's dangerous to delete the failed data of the stage when we retry. It's hard to reach the condition to delete the data. We should rely on the data skip to avoid reading the failure data.

@zuston
Copy link
Member Author

zuston commented Mar 22, 2024

It's dangerous to delete the failed data of the stage when we retry. It's hard to reach the condition to delete the data.

Could you describe more?

@jerqi
Copy link
Contributor

jerqi commented Mar 23, 2024

It's dangerous to delete the failed data of the stage when we retry. It's hard to reach the condition to delete the data.

Could you describe more?

There may be some tasks will write legacy data to the shuffle server after you delete the shuffle data. Because although we resubmit the stage, some tasks for last attempt may write the data. Spark doesn't guarantee that all tasks will be ended from last attempt although you have started the newest attempt.

@jerqi
Copy link
Contributor

jerqi commented Mar 25, 2024

@EnricoMi If we have the retry of stage, the taskId may not unique. Because we don't have stage attemptId to differ task 1 attempt 0 in the stage attempt 0 and task 1 attempt 0 in the stage attempt 1. This may cause we read wrong data.

@zuston
Copy link
Member Author

zuston commented Mar 25, 2024

It's dangerous to delete the failed data of the stage when we retry. It's hard to reach the condition to delete the data.

Could you describe more?

There may be some tasks will write legacy data to the shuffle server after you delete the shuffle data. Because although we resubmit the stage, some tasks for last attempt may write the data. Spark doesn't guarantee that all tasks will be ended from last attempt although you have started the newest attempt.

If so, we'd better to reject the shuffle data of older version. This could be implemented by maintaining the latest staeg attempt id

@jerqi
Copy link
Contributor

jerqi commented Mar 25, 2024

It's dangerous to delete the failed data of the stage when we retry. It's hard to reach the condition to delete the data.

Could you describe more?

There may be some tasks will write legacy data to the shuffle server after you delete the shuffle data. Because although we resubmit the stage, some tasks for last attempt may write the data. Spark doesn't guarantee that all tasks will be ended from last attempt although you have started the newest attempt.

If so, we'd better to reject the shuffle data of older version. This could be implemented by maintaining the latest staeg attempt id

OK, Maybe rejection the legacy data will be better choice.

@jerqi
Copy link
Contributor

jerqi commented Mar 25, 2024

@EnricoMi If we have the retry of stage, the taskId may not unique. Because we don't have stage attemptId to differ task 1 attempt 0 in the stage attempt 0 and task 1 attempt 0 in the stage attempt 1. This may cause we read wrong data.

Ignore this. Maybe rejection legacy data will be a better choice.

@@ -158,6 +158,30 @@ public void registerShuffle(
String remoteStoragePath = req.getRemoteStorage().getPath();
String user = req.getUser();

if (req.getIsStageRetry()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If removeShuffleDataSync is always being called, we can avoid adding plumbing isStateRetry in here. When isStateRetry == false, this is a NOOP.

Method removeShuffleDataSync might return true if it found data to delete, so we can conditionally log the message below.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer reserving the isStageRetry(or use stage attempt number to replace this) param for 2 reasons

  1. this is more explicit for stage retry, especially when something go wrong, like the previous data has been purged due to expire heartbeat. If having this, the log will indicate the abnormal problem happens
  2. for the next PR, I will introduce the stage latest attempt to discard the older attempt data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. all this plumbing for logging is peculiar
  2. maybe there are better mechanisms to discard older data

@zuston zuston changed the title [#1579] fix(spark): Adjust reassgin time to avoid failure to clean up previous stage data [#1579] fix(spark): Adjust reassigned time to ensure that all previous data is cleared for stage retry Mar 26, 2024
@zuston zuston changed the title [#1579] fix(spark): Adjust reassigned time to ensure that all previous data is cleared for stage retry [#1579][part-1] fix(spark): Adjust reassigned time to ensure that all previous data is cleared for stage retry Mar 26, 2024
@zuston zuston requested review from EnricoMi and jerqi March 26, 2024 03:49
@zuston
Copy link
Member Author

zuston commented Mar 26, 2024

Could you help review this? @EnricoMi @jerqi spark2 change will be finished after this PR is OK for you

@jerqi
Copy link
Contributor

jerqi commented Mar 26, 2024

Could you help review this? @EnricoMi @jerqi spark2 change will be finished after this PR is OK for you

Several questions:

  1. How to reject the legacy requests?
  2. How to delete the legacy shuffle?

@zuston
Copy link
Member Author

zuston commented Mar 26, 2024

  1. How to reject the legacy requests?

Using the latest attemtp id in server side to check whether the send request is valid with the older version, this will be finished in the next PR.

  1. How to delete the legacy shuffle?

This has been involved in this PR.

@EnricoMi
Copy link
Contributor

Can we register a shuffle as the tuple (shuffle_id, stage_attempt_id)? This way, we do not need to wait for (shuffle_id, 0) to be be deleted synchronously, and can go on registering and writing (shuffle_id, 1). Deletion could take a significant time for large partitions (think TBs).

@EnricoMi
Copy link
Contributor

EnricoMi commented Mar 26, 2024

I think deletion of earlier shuffle data should not be synchronously in the first place! That is flawed by design. Think of TB of shuffle data. They should be deleted quickly / constant time (e.g. HDFS move) and cleaned up asynchronously (e.g. HDMF delete).

@zuston
Copy link
Member Author

zuston commented Mar 26, 2024

Can we register a shuffle as the tuple (shuffle_id, stage_attempt_id)? This way, we do not need to wait for (shuffle_id, 0) to be be deleted synchronously, and can go on registering and writing (shuffle_id, 1). Deletion could take a significant time for large partitions (think TBs).

Agree with you. I’m concerned about the cost of refactor.

@@ -184,6 +184,7 @@ message ShuffleRegisterRequest {
string user = 5;
DataDistribution shuffleDataDistribution = 6;
int32 maxConcurrencyPerPartitionToWrite = 7;
int32 stageAttemptNumber = 8;
Copy link
Contributor

@jerqi jerqi May 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to reject legacy data? The legacy data won't call register request.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The legacy request could be rejected according to attemptNumber in sendShuffleData + reportShuffleResult rpc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @yl09099 Pay more attension.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be solved.

public class ChainShuffleHandleInfo extends ShuffleHandleInfoBase {
private static final Logger LOGGER = LoggerFactory.getLogger(MutableShuffleHandleInfo.class);

private Map<Integer, List<ShuffleServerInfo>> currentPartitionToServers;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Emmm. This is not right.

private ShuffleHandleInfo current;
private LinkedList<ShuffleHandleInfo> historyHandles;

* When a Stage retry occurs, replace the current PartitionToShuffleServer and record the
* historical PartitionToShuffleServe.
*/
default void replaceCurrentShuffleHandleInfo(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is no need to introduce the extra general interface method here. If you want to update the handleInfo, you could forcelly transform type into ChainShuffleHandleInfo and then to update its inner current handle.

@Override
public boolean reassignOnStageResubmit(
int stageId, int stageAttemptNumber, int shuffleId, int numPartitions) {
synchronized (reassignLock) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reassignLock is for the whole app, but from this RPC origin semantic, I think this lock should only be applied on the shuffleId level.

synchronized (reassignLock) {
String stageIdAndAttempt = stageId + "_" + stageAttemptNumber;
Boolean needReassign =
rssStageResubmitManager.recordAndGetServerAssignedInfo(stageIdAndAttempt);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if the attempt is less than the existing max attempt number, the reassign should be illegal.


public class RssStageResubmitManager {
/** A list of shuffleServer for Write failures */
private Set<String> failuresShuffleServerIds;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lack the default constructor to initialize these vars.

@yl09099 yl09099 force-pushed the stageRetry2 branch 17 times, most recently from d922716 to ae02409 Compare May 21, 2024 06:07
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.proto.RssProtos;

public class ChainShuffleHandleInfo extends ShuffleHandleInfoBase {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about renaming to StageAttemptShuffleHandleInfo

.build();
replicaServersProto.put(replicaServerEntry.getKey(), item);
}
Map<Integer, RssProtos.PartitionReplicaServers> partitionToServers = new HashMap<>();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why removing the synchronized ?

@Override
public boolean reassignOnStageResubmit(
int stageId, int stageAttemptNumber, int shuffleId, int numPartitions) {
ReentrantReadWriteLock.WriteLock shuffleWriteLock = getShuffleWriteLock(shuffleId);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this also could be added into the StageResubmitManager.


MutableShuffleHandleInfo reassignOnBlockSendFailure(
ChainShuffleHandleInfo reassignOnBlockSendFailure(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be still as MutableShuffleHandleInfo

@@ -184,6 +184,7 @@ message ShuffleRegisterRequest {
string user = 5;
DataDistribution shuffleDataDistribution = 6;
int32 maxConcurrencyPerPartitionToWrite = 7;
int32 stageAttemptNumber = 8;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be solved.

…at all previous data is cleared for stage retry
…at all previous data is cleared for stage retry
@yl09099 yl09099 force-pushed the stageRetry2 branch 4 times, most recently from 522926d to 5c9d9e3 Compare May 24, 2024 09:42
…at all previous data is cleared for stage retry
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug] previous data is not deleted when re-register shuffle after re-assignment for stage recomputing
5 participants