Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1239] Remote merge on the shuffle server side. #1660

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

zhengchenyu
Copy link
Collaborator

@zhengchenyu zhengchenyu commented Apr 18, 2024

draft pr for #1239

@zhengchenyu zhengchenyu marked this pull request as draft April 18, 2024 11:07
@zhengchenyu
Copy link
Collaborator Author

@jerqi @zuston Can you please review this draft pr or proposal ?

@EnricoMi
Copy link
Contributor

Does code add here come from Apache Spark?

@zhengchenyu
Copy link
Collaborator Author

Does code add here come from Apache Spark?

No, I developed it myself.

@zuston
Copy link
Member

zuston commented Apr 18, 2024

Thanks for proposing this.

I have a question that I remember spark will sort the data using the sortExec in dataframe api. That means the sort will not ocurr on the reader process. @advancedxy may have a good thought about this.

@zuston
Copy link
Member

zuston commented Apr 18, 2024

Does code add here come from Apache Spark?

Detailed design could be found on #1239, the initial motivation is to reduce the disk usage of spill, especially on the K8s. And having this, we also can speed up the whole shuffle.

@advancedxy
Copy link
Contributor

Sorry, @zuston I missed your comment.

I have a question that I remember spark will sort the data using the sortExec in dataframe api. That means the sort will not occur on the reader process. @advancedxy may have a good thought about this.

Yeah, these are two catalog of APIs in the Spark side. For the DataFrame/SQL API, ordering is guaranteed by the SortExec, so there's no sorting in the shuffle read. For the RDD API, it's possible that key ordering is defined and sorting occurring in the shuffle read process. From my perspective, analytics workload of spark should be mainly in Dataset/DataFrame category for now.

For this particular feature, I think MR/Tez would benefit from that substantially(to be frankly, I had developed a similar shuffle system back ~6-7 years ago). The spark integration is less exciting though.

@zuston
Copy link
Member

zuston commented Apr 22, 2024

For this particular feature, I think MR/Tez would benefit from that substantially(to be frankly, I had developed a similar shuffle system back ~6-7 years ago). The spark integration is less exciting though.

But if we could introduce the extra plan to optimize the sortMergeJoin from sortExec + exchange to uniffleSortedExchangeExec that will benefit from this feature. Is this possible?

@advancedxy
Copy link
Contributor

For this particular feature, I think MR/Tez would benefit from that substantially(to be frankly, I had developed a similar shuffle system back ~6-7 years ago). The spark integration is less exciting though.

But if we could introduce the extra plan to optimize the sortMergeJoin from sortExec + exchange to uniffleSortedExchangeExec that will benefit from this feature. Is this possible?

emmm, maybe in your own forked Spark. For open source Spark, it might be possible to inject a strategy rule via SparkSQLExtension.injectQueryPostPlannerStrategyRule to replace SortExec. However, when it comes to Spark's internal EnsureRequirements, you have to touch/modify the internal logic.

@zuston
Copy link
Member

zuston commented Apr 22, 2024

For this particular feature, I think MR/Tez would benefit from that substantially(to be frankly, I had developed a similar shuffle system back ~6-7 years ago). The spark integration is less exciting though.

But if we could introduce the extra plan to optimize the sortMergeJoin from sortExec + exchange to uniffleSortedExchangeExec that will benefit from this feature. Is this possible?

emmm, maybe in your own forked Spark. For open source Spark, it might be possible to inject a strategy rule via SparkSQLExtension.injectQueryPostPlannerStrategyRule to replace SortExec. However, when it comes to Spark's internal EnsureRequirements, you have to touch/modify the internal logic.

Yes, this is acceptable if having performance gain. And I think the extra strategy rule also could be delegated by uniffle if this is valuable. WDYT?

@advancedxy
Copy link
Contributor

And I think the extra strategy rule also could be delegated by uniffle if this is valuable. WDYT?

If it's possible by simply inject a strategy rule, then I'm fine with that. However, if it requires internal changes to Spark, then I don't think it should be hosted/delegated in the Uniffle repo.

@zhengchenyu
Copy link
Collaborator Author

zhengchenyu commented Apr 23, 2024

@advancedxy @zuston
(1) Remote Merge doesn't just work on sort, it also works on combine
You're paying too much attention to sort. In fact, merge may contain sort or combine. For spark, I think combine is more genernal. Remote merge also solves the problem of spilling data when combining. For RDD that requires combine but does not require sort, The shuffle server uses hash(key) for sorting. This will keep the same keys organized together as much as possible. On the reduce side, we can combine in memory.
(2) About sort
I haven't investigated SortExec yet, and I will investigate next. But I guess the reason for introducing sortexec is to save stage. If this is the case, then we can add stage. There are no need to add extra strategy rule. BTW, If the key of new shuffle is same with the prev, we can get better performance[SPARK-46512].
BTW, Although spark is a better computing framework, it is still based on mapreduce mechanism, which is no different from hadoop mapreduce and tez.

@zuston
Copy link
Member

zuston commented Apr 23, 2024

Thanks for your work. @zhengchenyu After reviewing the design doc and concrete code briefly, I have some question about this feature.

Spark client

  1. What's the diference of RecordBlob and RecordBuffer? The combine and sort difference is not reflected on the names
  2. Why not making the RMWriteBufferManager extend the WriteBufferManager, RMWriterBufferManager can almost replace the WriteBufferManager, we could control whether sort by config.
  3. Is the local sort in block level enoguh? Can we make the block bigger if spill to file?

Shuffle-Server

  1. If the merge failed, the reducer should failover to original mechanism
  2. The merge process will introduce too much random read. If this happens on the HDD, the whole process is terrible. So the key point is to make the block bigger (Sort merge them before flushing to disk to create a bigger block?) But this looks will break the original blockId mechanism

@advancedxy
Copy link
Contributor

You're paying too much attention to sort. In fact, merge may contain sort or combine. For spark, I think combine is more genernal. Remote merge also solves the problem of spilling data when combining. For RDD that requires combine but does not require sort, The shuffle server uses hash(key) for sorting. This will keep the same keys organized together as much as possible. On the reduce side, we can combine in memory.

The problem is that there's no sort/merge/combine in the ShuffleDependency for SQL/DataFrame workload, which should already dominate the analytics workloads. Hence, the remote merge will only benefit normal raw RDD workloads, which in my opinion is less exciting.

@zhengchenyu
Copy link
Collaborator Author

@advancedxy
Yes, for spark sql, it doesn't make sense for spark sql right now.

In fact, our cluster mainly use Hive on Tez right now. But we have plans to update spark.

For Hive on Tez/MR, it make sense. We know hive also doesn't use the combine features of MR or TEZ. But why make sense?
We know that the record from shuffle is sorted, we can combine in memory, then hive's aggregation operation is all in memory. In theory, the same can be done with spark-sql. SparkSQL can use sorted shuffle, then aggregate in memory. But we need to change a lot of spark-sql. Maybe we should focus on TEZ/MR firstly.

@zhengchenyu
Copy link
Collaborator Author

Thanks for your work. @zhengchenyu After reviewing the design doc and concrete code briefly, I have some question about this feature.

Spark client

  1. What's the diference of RecordBlob and RecordBuffer? The combine and sort difference is not reflected on the names
  2. Why not making the RMWriteBufferManager extend the WriteBufferManager, RMWriterBufferManager can almost replace the WriteBufferManager, we could control whether sort by config.
  3. Is the local sort in block level enoguh? Can we make the block bigger if spill to file?

Shuffle-Server

  1. If the merge failed, the reducer should failover to original mechanism
  2. The merge process will introduce too much random read. If this happens on the HDD, the whole process is terrible. So the key point is to make the block bigger (Sort merge them before flushing to disk to create a bigger block?) But this looks will break the original blockId mechanism

Spark client
(1) RecordBlob is used for combine and RecordBuffer is used for sort
(2) RMWriteBufferManager should be extended from WriteBufferManager. I just don't want to affect other code. If the proposal is accepted, I will do it later.
(3) If we wanna bigger block, best way is increase the block size.

Shuffle-Server
(1) Yes. We need failover to original rss, especially if we can't load class of key, value, comparator.
(2) I firstly test in HDD, it is terrible. Then I test in SSD cluster, then better. We can try to merge the block, but need lots of modification. I just wanted to do it quickly, so I reused the original mechanism

@zuston
Copy link
Member

zuston commented Apr 23, 2024

Firstly I think this is meaningful for tez/mr. For spark, the sorted shuffle is still meaningful, like the facebook did similar things in their cosco (facebook internal remote shuffle service), @jerqi could add some contexts for that.

(2) I firstly test in HDD, it is terrible. Then I test in SSD cluster, then better. We can try to merge the block, but need lots of modification. I just wanted to do it quickly, so I reused the original mechanism

I'm not sure the current design is available when on producation with too much random io. I prefer using the most general and stable way to optimize this if we have.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants