New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#1594] improvement(client):support generating larger block size during shuffle map task by spill partial partitions data #1670
Conversation
… partial partitions data
… partial partitions data
# Conflicts: # client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
# Conflicts: # client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
… partial partitions data
… partial partitions data
@rickyma Could you help me review this pull request? |
Could you paste some test results to the community for this feature? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM. Left some minor comments.
List<Integer> partitionList = | ||
new ArrayList<Integer>() { | ||
{ | ||
addAll(buffers.keySet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be simplified as:
List<Integer> partitionList = new ArrayList<>(buffers.keySet());
} | ||
}; | ||
if (bufferSpillRatio < 1.0) { | ||
Collections.sort( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be simplified as:
partitionList.sort(Comparator.comparingInt(o -> buffers.get(o) == null ? 0 : buffers.get(o).getMemoryUsed()).reversed());
@@ -316,6 +350,10 @@ public synchronized List<ShuffleBlockInfo> clear() { | |||
+ dataSize | |||
+ "], memoryUsed[" | |||
+ memoryUsed | |||
+ "],number of blocks[" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, add a space here, for a better log output:
],number of blocks[
-> ], number of blocks[
@@ -316,6 +350,10 @@ public synchronized List<ShuffleBlockInfo> clear() { | |||
+ dataSize | |||
+ "], memoryUsed[" | |||
+ memoryUsed | |||
+ "],number of blocks[" | |||
+ result.size() | |||
+ "],flush ratio[" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, add a space here, for a better log output:
],flush ratio[
-> ], flush ratio[
LOG.info( | ||
String.format( | ||
"ShuffleBufferManager spill for buffer size exceeding spill threshold," | ||
+ "usedBytes[%d],inSendListBytes[%d],spill size threshold[%d]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, a better log output:
" usedBytes[%d], inSendListBytes[%d], spill size threshold[%d]",
… partial partitions data
client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
Outdated
Show resolved
Hide resolved
… partial partitions data
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Although I think if you want to acheive bigger block size, maybe the temproal executor side localfile could be implemented to store the task partition shuffle data.
What changes were proposed in this pull request?
when spilling shuffle data, we just spill part of the reduce partition datas which hold the major space.
so, in each spilling process, the WriteBufferManager.clear() method should implement one more logic: sort the to-be spilled buffers by their size and select the top-N buffers to spill.
Why are the changes needed?
related feature #1594
Does this PR introduce any user-facing change?
No.
How was this patch tested?
new UTs.