Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1398] fix(mr,tez): Make attempId computable and move it to taskAttemptId in BlockId layout. #1418

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

qijiale76
Copy link
Contributor

@qijiale76 qijiale76 commented Jan 4, 2024

What changes were proposed in this pull request?

Before this PR, in MR and TEZ engine:

  1. attemptId is in sequenceNo of BlockId instead of taskAttemptId.
  2. taskAttemptId is long which is not necessary instead of int.
  3. attempId is fixed 6 bit.

After this PR:

  1. attemptId is in taskAttemptId. This is more reasonable.
  2. taskAttemptId is changed to int.
  3. attempId is calculated from max num of allowed failures and whether speculative execution is enabled.

Why are the changes needed?

Fix: #1398

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing UT and integrated tests.

@codecov-commenter
Copy link

codecov-commenter commented Jan 4, 2024

Codecov Report

Attention: Patch coverage is 79.10448% with 14 lines in your changes are missing coverage. Please review.

Project coverage is 54.88%. Comparing base (dd67774) to head (bc5585d).
Report is 4 commits behind head on master.

Files Patch % Lines
...c/main/java/org/apache/tez/common/RssTezUtils.java 76.19% 4 Missing and 1 partial ⚠️
...rg/apache/hadoop/mapred/RssMapOutputCollector.java 0.00% 3 Missing ⚠️
...library/common/shuffle/impl/RssTezFetcherTask.java 0.00% 3 Missing ⚠️
...library/common/shuffle/impl/RssShuffleManager.java 0.00% 2 Missing ⚠️
...n/java/org/apache/hadoop/mapreduce/RssMRUtils.java 95.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #1418      +/-   ##
============================================
+ Coverage     54.01%   54.88%   +0.87%     
- Complexity     2863     2868       +5     
============================================
  Files           438      418      -20     
  Lines         24850    22552    -2298     
  Branches       2114     2120       +6     
============================================
- Hits          13423    12378    -1045     
+ Misses        10586     9406    -1180     
+ Partials        841      768      -73     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@qijiale76
Copy link
Contributor Author

@jerqi Could you please provide suggestions on areas that need improvement?

@zuston
Copy link
Member

zuston commented Jan 8, 2024

cc @zhengchenyu could you help review this ?

@zhengchenyu
Copy link
Collaborator

Sine #1529 is merged into master, I think we should review this PR?
After this PR, the blockid calculation for spark, mr, tez will remain consistent. Then we will reduce the probability of overflow problems.

@qijiale76 Can you reconstruct the code according to #1529?

@zuston
Copy link
Member

zuston commented Feb 29, 2024

@qijiale76 Do you want to push this forward?

@qijiale76
Copy link
Contributor Author

@qijiale76 Do you want to push this forward?

Yes, I’ll reconstruct the code next week.

Copy link

github-actions bot commented Mar 21, 2024

Test Results

 2 340 files  ± 0   2 340 suites  ±0   4h 30m 5s ⏱️ - 1m 56s
   908 tests ± 0     907 ✅ ± 0   1 💤 ±0  0 ❌ ±0 
10 551 runs  +10  10 537 ✅ +10  14 💤 ±0  0 ❌ ±0 

Results for commit bc5585d. ± Comparison against base commit 32d533d.

This pull request removes 2 and adds 2 tests. Note that renamed tests count towards both.
org.apache.uniffle.shuffle.manager.RssShuffleManagerBaseTest ‑ testGetAttemptIdBits
org.apache.uniffle.shuffle.manager.RssShuffleManagerBaseTest ‑ testGetMaxAttemptNo
org.apache.uniffle.client.ClientUtilsTest ‑ testGetMaxAttemptNo
org.apache.uniffle.client.ClientUtilsTest ‑ testGetNumberOfSignificantBits

♻️ This comment has been updated with latest results.

@jerqi jerqi changed the title [#1398] fix(MR)(TEZ): Limit attemptId to 4 bit and move it from 18 bit atomicInt to 21 bit taskAttemptId in 63 bit BlockId. [#1398] fix(mr,tez): Limit attemptId to 4 bit and move it from 18 bit atomicInt to 21 bit taskAttemptId in 63 bit BlockId. Mar 28, 2024
@qijiale76 qijiale76 requested a review from EnricoMi March 28, 2024 11:19
@qijiale76 qijiale76 changed the title [#1398] fix(mr,tez): Limit attemptId to 4 bit and move it from 18 bit atomicInt to 21 bit taskAttemptId in 63 bit BlockId. [#1398] fix(mr,tez): Make attempId computable and move it to taskAttemptId in BlockId layout. Mar 28, 2024
@qijiale76 qijiale76 marked this pull request as ready for review March 28, 2024 11:40
@qijiale76
Copy link
Contributor Author

@EnricoMi Thanks for your very helpful review. I have updated this PR based on your suggestions and by referring to Spark's implementation. Could you please review the latest code again?

@qijiale76 qijiale76 requested a review from EnricoMi April 17, 2024 10:39
@@ -143,7 +143,7 @@ public int hashCode() {
return Objects.hash(sequenceNoBits, partitionIdBits, taskAttemptIdBits);
}

public long getBlockId(int sequenceNo, int partitionId, long taskAttemptId) {
public long getBlockId(int sequenceNo, int partitionId, int taskAttemptId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this long -> int change here should be reverted, because here we check the original long task attempt id is within block id layout constraints. Only task attempt ids used after the block id accepted them are reduced to int.

taskAttemptId - (attemptId << (LAYOUT.partitionIdBits + LAYOUT.taskAttemptIdBits));

return LAYOUT.getBlockId(atomicInt, partitionId, taskId);
public static long getBlockId(int partitionId, int taskAttemptId, int nextSeqNo) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, the taskAttemptId can be long here as this before block id layout checks the bit size constraint (though we feed this method only with int taskAttemptIds produced by RssMRUtils.createRssTaskAttemptId()):

Suggested change
public static long getBlockId(int partitionId, int taskAttemptId, int nextSeqNo) {
public static long getBlockId(int partitionId, long taskAttemptId, int nextSeqNo) {


return LAYOUT.getBlockId(atomicInt, partitionId, taskId);
public static long getBlockId(int partitionId, int taskAttemptId, int nextSeqNo) {
return LAYOUT.getBlockId(nextSeqNo, partitionId, taskAttemptId);
}

public static long getTaskAttemptId(long blockId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This task attempt id is derived from the block id, hence it is reduced in its bit size:

Suggested change
public static long getTaskAttemptId(long blockId) {
public static int getTaskAttemptId(long blockId) {

The caller of this method can continue to upcast the returned int to long, no problem.


return LAYOUT.getBlockId(atomicInt, partitionId, taskId);
public static long getBlockId(int partitionId, int taskAttemptId, int nextSeqNo) {
return LAYOUT.getBlockId(nextSeqNo, partitionId, taskAttemptId);
}

public static long getTaskAttemptId(long blockId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This task attempt id is derived from the block id, hence it is reduced in its bit size:

Suggested change
public static long getTaskAttemptId(long blockId) {
public static int getTaskAttemptId(long blockId) {

The caller of this method can continue to upcast the returned int to long, no problem.

taskAttemptId - (attemptId << (LAYOUT.partitionIdBits + LAYOUT.taskAttemptIdBits));

return LAYOUT.getBlockId(atomicInt, partitionId, taskId);
public static long getBlockId(int partitionId, int taskAttemptId, int nextSeqNo) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, the taskAttemptId can be long here as this before block id layout checks the bit size constraint (though we feed this method only with int taskAttemptIds produced by RssTezUtils.createRssTaskAttemptId()):

Suggested change
public static long getBlockId(int partitionId, int taskAttemptId, int nextSeqNo) {
public static long getBlockId(int partitionId, long taskAttemptId, int nextSeqNo) {

@@ -185,13 +185,13 @@ public BlockId asBlockId(long blockId) {
blockId, this, getSequenceNo(blockId), getPartitionId(blockId), getTaskAttemptId(blockId));
}

public BlockId asBlockId(int sequenceNo, int partitionId, long taskAttemptId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

@@ -64,7 +64,7 @@ public class SortWriteBufferManager<K, V> {
private final Counters.Counter mapOutputRecordCounter;
private long uncompressedDataLen = 0;
private long compressTime = 0;
private final long taskAttemptId;
private final int taskAttemptId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about restricting taskAttemptIds to int in such places.

Here is the situation:

  1. Spark, Tez and MR provide us with long task attempt ids (for Tez and MR, (taskId, attemptId) constitutes a long task attempt id, which we restrict to int for similar reasons as in 2.)
  2. for the purpose of the block id, we limit those long task attempt ids to int, since we allow only less that 32 bits for it
  3. the task attempt id retrieved from the block id is int because of that
  4. still, all other places could continue to work with long task attempt ids if that makes no difference for that code, up-casting int task attempt ids to long does not harm, as long as the code works with long.

This allows to support truly long task attempt ids without reverting such code changes in the future.

@zuston @jerqi @zhengchenyu what do you think?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Improvement] [TEZ][MR] blockid will be overflows when the shuffle data is large enough.
5 participants