New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#1398] fix(mr,tez): Make attempId computable and move it to taskAttemptId in BlockId layout. #1418
base: master
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #1418 +/- ##
============================================
+ Coverage 54.01% 54.88% +0.87%
- Complexity 2863 2868 +5
============================================
Files 438 418 -20
Lines 24850 22552 -2298
Branches 2114 2120 +6
============================================
- Hits 13423 12378 -1045
+ Misses 10586 9406 -1180
+ Partials 841 768 -73 ☔ View full report in Codecov by Sentry. |
@jerqi Could you please provide suggestions on areas that need improvement? |
cc @zhengchenyu could you help review this ? |
Sine #1529 is merged into master, I think we should review this PR? @qijiale76 Can you reconstruct the code according to #1529? |
@qijiale76 Do you want to push this forward? |
Yes, I’ll reconstruct the code next week. |
… taskAttemptId in BlockId.
client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
Outdated
Show resolved
Hide resolved
client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
Outdated
Show resolved
Hide resolved
client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
Outdated
Show resolved
Hide resolved
client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
Outdated
Show resolved
Hide resolved
client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
Outdated
Show resolved
Hide resolved
client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
Outdated
Show resolved
Hide resolved
client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
Outdated
Show resolved
Hide resolved
client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
Outdated
Show resolved
Hide resolved
Test Results 2 340 files ± 0 2 340 suites ±0 4h 30m 5s ⏱️ - 1m 56s Results for commit bc5585d. ± Comparison against base commit 32d533d. This pull request removes 2 and adds 2 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
@EnricoMi Thanks for your very helpful review. I have updated this PR based on your suggestions and by referring to Spark's implementation. Could you please review the latest code again? |
client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java
Outdated
Show resolved
Hide resolved
client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
Outdated
Show resolved
Hide resolved
client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
Outdated
Show resolved
Hide resolved
client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssEventFetcher.java
Outdated
Show resolved
Hide resolved
client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssEventFetcher.java
Outdated
Show resolved
Hide resolved
client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
Outdated
Show resolved
Hide resolved
client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
Outdated
Show resolved
Hide resolved
...t-tez/src/main/java/org/apache/tez/runtime/library/output/RssOrderedPartitionedKVOutput.java
Outdated
Show resolved
Hide resolved
...tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java
Outdated
Show resolved
Hide resolved
@@ -143,7 +143,7 @@ public int hashCode() { | |||
return Objects.hash(sequenceNoBits, partitionIdBits, taskAttemptIdBits); | |||
} | |||
|
|||
public long getBlockId(int sequenceNo, int partitionId, long taskAttemptId) { | |||
public long getBlockId(int sequenceNo, int partitionId, int taskAttemptId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this long
-> int
change here should be reverted, because here we check the original long task attempt id is within block id layout constraints. Only task attempt ids used after the block id accepted them are reduced to int.
taskAttemptId - (attemptId << (LAYOUT.partitionIdBits + LAYOUT.taskAttemptIdBits)); | ||
|
||
return LAYOUT.getBlockId(atomicInt, partitionId, taskId); | ||
public static long getBlockId(int partitionId, int taskAttemptId, int nextSeqNo) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically, the taskAttemptId
can be long
here as this before block id layout checks the bit size constraint (though we feed this method only with int taskAttemptId
s produced by RssMRUtils.createRssTaskAttemptId()
):
public static long getBlockId(int partitionId, int taskAttemptId, int nextSeqNo) { | |
public static long getBlockId(int partitionId, long taskAttemptId, int nextSeqNo) { |
|
||
return LAYOUT.getBlockId(atomicInt, partitionId, taskId); | ||
public static long getBlockId(int partitionId, int taskAttemptId, int nextSeqNo) { | ||
return LAYOUT.getBlockId(nextSeqNo, partitionId, taskAttemptId); | ||
} | ||
|
||
public static long getTaskAttemptId(long blockId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This task attempt id is derived from the block id, hence it is reduced in its bit size:
public static long getTaskAttemptId(long blockId) { | |
public static int getTaskAttemptId(long blockId) { |
The caller of this method can continue to upcast the returned int
to long
, no problem.
|
||
return LAYOUT.getBlockId(atomicInt, partitionId, taskId); | ||
public static long getBlockId(int partitionId, int taskAttemptId, int nextSeqNo) { | ||
return LAYOUT.getBlockId(nextSeqNo, partitionId, taskAttemptId); | ||
} | ||
|
||
public static long getTaskAttemptId(long blockId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This task attempt id is derived from the block id, hence it is reduced in its bit size:
public static long getTaskAttemptId(long blockId) { | |
public static int getTaskAttemptId(long blockId) { |
The caller of this method can continue to upcast the returned int
to long
, no problem.
taskAttemptId - (attemptId << (LAYOUT.partitionIdBits + LAYOUT.taskAttemptIdBits)); | ||
|
||
return LAYOUT.getBlockId(atomicInt, partitionId, taskId); | ||
public static long getBlockId(int partitionId, int taskAttemptId, int nextSeqNo) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically, the taskAttemptId
can be long
here as this before block id layout checks the bit size constraint (though we feed this method only with int taskAttemptId
s produced by RssTezUtils.createRssTaskAttemptId()
):
public static long getBlockId(int partitionId, int taskAttemptId, int nextSeqNo) { | |
public static long getBlockId(int partitionId, long taskAttemptId, int nextSeqNo) { |
@@ -185,13 +185,13 @@ public BlockId asBlockId(long blockId) { | |||
blockId, this, getSequenceNo(blockId), getPartitionId(blockId), getTaskAttemptId(blockId)); | |||
} | |||
|
|||
public BlockId asBlockId(int sequenceNo, int partitionId, long taskAttemptId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
@@ -64,7 +64,7 @@ public class SortWriteBufferManager<K, V> { | |||
private final Counters.Counter mapOutputRecordCounter; | |||
private long uncompressedDataLen = 0; | |||
private long compressTime = 0; | |||
private final long taskAttemptId; | |||
private final int taskAttemptId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure about restricting taskAttemptIds
to int
in such places.
Here is the situation:
- Spark, Tez and MR provide us with
long
task attempt ids (for Tez and MR, (taskId, attemptId) constitutes along
task attempt id, which we restrict toint
for similar reasons as in 2.) - for the purpose of the block id, we limit those
long
task attempt ids toint
, since we allow only less that 32 bits for it - the task attempt id retrieved from the block id is
int
because of that - still, all other places could continue to work with
long
task attempt ids if that makes no difference for that code, up-castingint
task attempt ids tolong
does not harm, as long as the code works withlong
.
This allows to support truly long
task attempt ids without reverting such code changes in the future.
@zuston @jerqi @zhengchenyu what do you think?
What changes were proposed in this pull request?
Before this PR, in MR and TEZ engine:
After this PR:
Why are the changes needed?
Fix: #1398
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing UT and integrated tests.