Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] support generating larger block size during shuffle map task by spill partial partitions data #1594

Open
3 tasks done
leslizhang opened this issue Mar 20, 2024 · 3 comments
Assignees

Comments

@leslizhang
Copy link
Contributor

Code of Conduct

Search before asking

  • I have searched in the issues and found no similar issues.

Describe the feature

At least three situations can trigger a large number of small blocks in shuffle map task:

  1. Data skew: When the data processed by the map task mostly belongs to a few reduce tasks, the sum of the data for the few reduce tasks exceeds the spill threshold, causing the remaining long-tail reduce tasks to spill together passively.
  2. The number of reduce tasks is extremely large (>10k): When each reduce partition has very little data, it will also cause the buffer of the ShuffleBufferManager of the shuffle map task to be occupied beyond the spill threshold, thereby producing a large number of small blocks.
  3. When an executor can run multiple tasks at the same time, there is memory competition between tasks. At this time, some tasks may not get enough memory because they start later than other tasks, but the overall memory of the executor is insufficient to cause ShuffleBufferManager to spill as a MemoryConsumer. In fact, ShuffleBufferManager may only store a small amount of data for each reduce partition.

Problems caused by small blocks:

Unnecessary network overhead: The amount of data sent in a single transmission is reduced, increasing the number of network interactions between the executor and the shuffle server. The number of interactions can expand up to 10 times or even 100 times.
Increase the size of the index file when the shuffle server stores shuffle data persistently.

Motivation

No response

Describe the solution

when spilling shuffle data, we just spill part of the reduce partition datas which hold the major space.
so, in each spilling process, the WriteBufferManager.clear() method should implement one more logic: sort the to-be spilled buffers by their size and select the top-N buffers to spill.

Additional context

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!
@zuston zuston assigned zuston and leslizhang and unassigned zuston Mar 22, 2024
@zuston
Copy link
Member

zuston commented Mar 22, 2024

+1.

@EnricoMi
Copy link
Contributor

Sorting the partition data by the reduce partition id should maximize block sizes. Is that an option?

@leslizhang
Copy link
Contributor Author

With this feature, the buffer size of a single partition(spark.rss.writer.buffer.size) can be increased to a larger value, for example, from the current default configuration of 3MB to 10MB or more.

zuston pushed a commit that referenced this issue May 14, 2024
…fle map task by spill partial partitions data (#1670)

### What changes were proposed in this pull request?

when spilling shuffle data, we just spill part of the reduce partition datas which hold the major space.
so, in each spilling process, the WriteBufferManager.clear() method should implement one more logic: sort the to-be spilled buffers by their size and select the top-N buffers to spill.

### Why are the changes needed?

related feature #1594

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

new UTs.

---------

Co-authored-by: leslizhang <leslizhang@tencent.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants