Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1594] improvement(client):support generating larger block size during shuffle map task by spill partial partitions data #1670

Merged
merged 15 commits into from May 14, 2024

Conversation

leslizhang
Copy link
Contributor

What changes were proposed in this pull request?

when spilling shuffle data, we just spill part of the reduce partition datas which hold the major space.
so, in each spilling process, the WriteBufferManager.clear() method should implement one more logic: sort the to-be spilled buffers by their size and select the top-N buffers to spill.

Why are the changes needed?

related feature #1594

Does this PR introduce any user-facing change?

No.

How was this patch tested?

new UTs.

leslizhang added 4 commits April 28, 2024 10:35
# Conflicts:
#	client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
Copy link

github-actions bot commented Apr 28, 2024

Test Results

 2 391 files  ±0   2 391 suites  ±0   4h 58m 3s ⏱️ + 1m 13s
   929 tests +1     928 ✅ +1   1 💤 ±0  0 ❌ ±0 
10 763 runs  +9  10 749 ✅ +9  14 💤 ±0  0 ❌ ±0 

Results for commit a650c97. ± Comparison against base commit 8e26a34.

♻️ This comment has been updated with latest results.

@jerqi
Copy link
Contributor

jerqi commented Apr 29, 2024

@rickyma Could you help me review this pull request?

@jerqi
Copy link
Contributor

jerqi commented Apr 29, 2024

Could you paste some test results to the community for this feature?

Copy link
Contributor

@rickyma rickyma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. Left some minor comments.

List<Integer> partitionList =
new ArrayList<Integer>() {
{
addAll(buffers.keySet());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be simplified as:

List<Integer> partitionList = new ArrayList<>(buffers.keySet());

}
};
if (bufferSpillRatio < 1.0) {
Collections.sort(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be simplified as:

partitionList.sort(Comparator.comparingInt(o -> buffers.get(o) == null ? 0 : buffers.get(o).getMemoryUsed()).reversed());

@@ -316,6 +350,10 @@ public synchronized List<ShuffleBlockInfo> clear() {
+ dataSize
+ "], memoryUsed["
+ memoryUsed
+ "],number of blocks["
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, add a space here, for a better log output:
],number of blocks[ -> ], number of blocks[

@@ -316,6 +350,10 @@ public synchronized List<ShuffleBlockInfo> clear() {
+ dataSize
+ "], memoryUsed["
+ memoryUsed
+ "],number of blocks["
+ result.size()
+ "],flush ratio["
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, add a space here, for a better log output:
],flush ratio[ -> ], flush ratio[

LOG.info(
String.format(
"ShuffleBufferManager spill for buffer size exceeding spill threshold,"
+ "usedBytes[%d],inSendListBytes[%d],spill size threshold[%d]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, a better log output:

" usedBytes[%d], inSendListBytes[%d], spill size threshold[%d]",

@zuston
Copy link
Member

zuston commented May 13, 2024

Could you help review this again? @jerqi @rickyma

Copy link
Contributor

@rickyma rickyma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Copy link
Contributor

@jerqi jerqi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Copy link
Member

@zuston zuston left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Although I think if you want to acheive bigger block size, maybe the temproal executor side localfile could be implemented to store the task partition shuffle data.

@zuston zuston merged commit dc022d6 into apache:master May 14, 2024
41 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants