Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Support celeborn sort based shuffle #5675

Merged
merged 43 commits into from May 17, 2024

Conversation

kerwin-zk
Copy link
Contributor

What changes were proposed in this pull request?

#4048

How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)

(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Copy link

github-actions bot commented May 9, 2024

Thanks for opening a pull request!

Could you open an issue for this pull request on Github Issues?

https://github.com/apache/incubator-gluten/issues

Then could you also rename commit message and pull request title in the following format?

[GLUTEN-${ISSUES_ID}][COMPONENT]feat/fix: ${detailed message}

See also:

Copy link

github-actions bot commented May 9, 2024

Run Gluten Clickhouse CI

1 similar comment
Copy link

github-actions bot commented May 9, 2024

Run Gluten Clickhouse CI

Copy link

github-actions bot commented May 9, 2024

Run Gluten Clickhouse CI

Copy link

github-actions bot commented May 9, 2024

Run Gluten Clickhouse CI

Copy link

github-actions bot commented May 9, 2024

Run Gluten Clickhouse CI

@FelixYBW
Copy link
Contributor

FelixYBW commented May 9, 2024

Thank you for the PR! Waiting for long time.

@@ -132,6 +132,8 @@ class VeloxShuffleWriter final : public ShuffleWriter {

arrow::Status split(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit) override;

arrow::Status sort(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit) override;
Copy link
Contributor

@FelixYBW FelixYBW May 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may separate the sort based and hash based shuffle in different files

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FelixYBW After Sort's proposal is reviewed, I'll refactor the code structure to meet the requirements of the different files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kerwin-zk After this patch, part of the existing APIs are only used for hash-based shuffle, and the newly added ones are only used for sort-based shuffle, which leads to a mixing of the code. It would be nice to separate the APIs for both writer & reader.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marin-ma Shuffle Writer's api has been split into sort and hash, for Shuffle Reader, it just adds Velox Stream parsing, and doesn't differentiate between sort and hash, the core logic is the same, I don't feel the need to split it out separately.

@FelixYBW FelixYBW requested a review from marin-ma May 9, 2024 19:20
Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

arrow::Status VeloxShuffleWriter::stop() {
arrow::Status VeloxShuffleWriter::doSort(facebook::velox::RowVectorPtr rv, int64_t memLimit) {
currentInputColumnBytes_ += rv->estimateFlatSize();
batches_.push_back(rv);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: looks like this only enqueue the batches, the sort is done in Celeborn side?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhouyuan In the HashPartitioner's compute, it will encapsulate the information of each row belonging to the same PartitionId in the RowVector into an int, and put it into the Vector of the map, which is equivalent to replacing the Sort with a HashMap.

Copy link

Run Gluten Clickhouse CI

3 similar comments
Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

Copy link
Contributor

@marin-ma marin-ma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add cpp UT for sort-based shuffle? If it's possible to implement another LocalRssClient for sort-based shuffle and use it in native tests, we can reuse the existing tests by adding a new test param in VeloxShuffleWriterTest

@@ -148,6 +149,69 @@ class JavaInputStreamAdaptor final : public arrow::io::InputStream {
bool closed_ = false;
};

class JavaInputStreamVeloxWrapper final : public JavaInputStreamWrapper {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use JavaInputStreamAdapter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing JavaInputStreamAdapter is defined here

class JavaInputStreamAdaptor final : public arrow::io::InputStream {
Can we use this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marin-ma done

return std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory));
schema,
std::move(codec),
options.compressionTypeStr,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it's in Velox backend, we can put the str->kind conversion here annd store the velox::common::CompressionKind instead of the compression string.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link

Run Gluten Clickhouse CI

6 similar comments
Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

Copy link
Contributor

@marin-ma marin-ma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few comments.

std::unordered_map<int32_t, std::vector<int64_t>>& rowVectorIndexMap) {
auto index = static_cast<int64_t>(vectorIndex) << 32;
for (auto i = 0; i < numRows; ++i) {
auto pid = pidArr[i] % numPartitions_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the same in the function above. Could you extract the common part?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

facebook::velox::OStreamOutputStream* out,
facebook::velox::RowTypePtr* rowTypePtr) {
int64_t rawSize = batch_->size();
batch_->flush(out);
Copy link
Contributor

@marin-ma marin-ma May 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to PrestoSerializer implementation, looks like there's an unavoidable extra memory copy here, from the intermediate buffer used by batch_->append to std::ostringstream* output. And the memory used by std::ostringstream* output is not being tracked. Is there any way to track the memory usage?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use IOBufOutputStream so that the memory can be tracked by velox memory pool.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kerwin-zk Just found that the data stored in IOBufOutputStream might not be contiguous. Let's keep the current implementation as for now. I will find a better way to resolve it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marin-ma That's the problem. If it's not contiguous, there's an extra copy.

for (int start = 0; start < outputSize; start++) {
const int32_t vectorIndex = static_cast<int32_t>(rowVectorIndex.at(start) >> 32);
const int32_t rowIndex = static_cast<int32_t>(rowVectorIndex.at(start) & 0xFFFFFFFFLL);
groupedIndices[vectorIndex].push_back({rowIndex, 1});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mapping from the input vector index + row index is already ordered. Can we directly build the index range in Partitioner::compute? Then the adjacent row index of the same partition id can be merged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marin-ma Partitioner::compute belongs to the core part of the cpp, it is not appropriate to introduce Velox-related IndexRange, I will implement this part of the logic in evictRowVector.

Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

Copy link
Contributor

@marin-ma marin-ma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few comments. Thanks.

@@ -16,6 +16,9 @@
*/

#include "ShuffleReader.h"

#include <jni/JniCommon.h>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unintended change?

params.push_back(ShuffleTestParams{PartitionWriterType::kRss, compression, compressionThreshold, 0});
params.push_back(ShuffleTestParams{
ShuffleWriterType::kHashShuffle, PartitionWriterType::kRss, compression, compressionThreshold, 0});
params.push_back(ShuffleTestParams{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the "compressionThreshold" is not used in the "sort + rss" code path. We can move this combination out of this loop to reduce the number of test cases.

Comment on lines 363 to 375
std::shared_ptr<VeloxShuffleWriter> shuffleWriter;
if (GetParam().shuffleWriterType == kHashShuffle) {
GLUTEN_ASSIGN_OR_THROW(
shuffleWriter,
VeloxHashBasedShuffleWriter::create(
kNumPartitions, std::move(partitionWriter), std::move(shuffleWriterOptions_), pool_, arrowPool));
} else if (
GetParam().shuffleWriterType == kSortShuffle && GetParam().partitionWriterType == PartitionWriterType::kRss) {
GLUTEN_ASSIGN_OR_THROW(
shuffleWriter,
VeloxSortBasedShuffleWriter::create(
kNumPartitions, std::move(partitionWriter), std::move(shuffleWriterOptions_), pool_, arrowPool));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please extract this logic into a function to avoid copy-paste.

Object pusher,
String partitionWriterType);
String partitionWriterType,
String shuffleWriterType);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can consider separating the JNI call for hash/sort shuffle in the future.

@marin-ma
Copy link
Contributor

marin-ma commented May 16, 2024

@kerwin-zk It's weird that I cannot reproduce the UT failure on my dev environment (ubuntu 22.04), but can reproduce that in the docker container (centos stream 8).

@kerwin-zk
Copy link
Contributor Author

@kerwin-zk It's weird that I cannot reproduce the UT failure on my dev environment (ubuntu 22.04), but can reproduce that in the docker container (centos stream 8).

@marin-ma Yes, I am checking this issue today. I tested it using centos8 and ubuntu2004, but the local development environment could not reproduce it.

@marin-ma
Copy link
Contributor

marin-ma commented May 16, 2024

I tested it using centos8 and ubuntu2004, but the local development environment could not reproduce it.

@kerwin-zk Do you mean it cannot be reproduced on centos8? What GCC version do you use? It's gcc-9 in docker https://github.com/apache/incubator-gluten/blob/main/.github/workflows/velox_docker.yml#L524

@kerwin-zk
Copy link
Contributor Author

I tested it using centos8 and ubuntu2004, but the local development environment could not reproduce it.

@kerwin-zk Do you mean it cannot be reproduced on centos8? What GCC version do you use? It's gcc-9 in docker https://github.com/apache/incubator-gluten/blob/main/.github/workflows/velox_docker.yml#L524

@marin-ma centos8 gcc10

Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

Copy link
Contributor

@marin-ma marin-ma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Some minor comments. Thanks!

@@ -16,9 +16,28 @@
*/

#include "shuffle/HashPartitioner.h"
#include <iostream>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove.

@@ -541,6 +541,10 @@ arrow::Status LocalPartitionWriter::evict(
return arrow::Status::OK();
}

arrow::Status LocalPartitionWriter::evict(uint32_t partitionId, int64_t rawSize, const char* data, int64_t length) {
return arrow::Status::OK();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return arrow::Status::NotImplemented("Invalid code path for local shuffle writer: sort based is not supported.");

enum PartitionWriterType { kLocal, kRss };

struct ShuffleReaderOptions {
arrow::Compression::type compressionType = arrow::Compression::type::LZ4_FRAME;
std::string compressionTypeStr = "lz4";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add static const std::string kDefaultCompressionTypeStr = "lz4"; in above code and use kDefaultCompressionTypeStr here.

@@ -56,6 +63,7 @@ struct PartitionWriterOptions {

int32_t compressionThreshold = kDefaultCompressionThreshold;
arrow::Compression::type compressionType = arrow::Compression::LZ4_FRAME;
std::string compressionTypeStr = "lz4";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kDefaultCompressionTypeStr

@@ -46,7 +46,7 @@ class VeloxColumnarBatch final : public ColumnarBatch {
facebook::velox::RowVectorPtr getRowVector() const;
facebook::velox::RowVectorPtr getFlattenedRowVector();

private:
public:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unintended change?

Comment on lines 52 to 88
#if VELOX_SHUFFLE_WRITER_PRINT

#define VsPrint Print
#define VsPrintLF PrintLF
#define VsPrintSplit PrintSplit
#define VsPrintSplitLF PrintSplitLF
#define VsPrintVectorRange PrintVectorRange
#define VS_PRINT PRINT
#define VS_PRINTLF PRINTLF
#define VS_PRINT_FUNCTION_NAME PRINT_FUNCTION_NAME
#define VS_PRINT_FUNCTION_SPLIT_LINE PRINT_FUNCTION_SPLIT_LINE
#define VS_PRINT_CONTAINER PRINT_CONTAINER
#define VS_PRINT_CONTAINER_TO_STRING PRINT_CONTAINER_TO_STRING
#define VS_PRINT_CONTAINER_2_STRING PRINT_CONTAINER_2_STRING
#define VS_PRINT_VECTOR_TO_STRING PRINT_VECTOR_TO_STRING
#define VS_PRINT_VECTOR_2_STRING PRINT_VECTOR_2_STRING
#define VS_PRINT_VECTOR_MAPPING PRINT_VECTOR_MAPPING

#else // VELOX_SHUFFLE_WRITER_PRINT

#define VsPrint(...) // NOLINT
#define VsPrintLF(...) // NOLINT
#define VsPrintSplit(...) // NOLINT
#define VsPrintSplitLF(...) // NOLINT
#define VsPrintVectorRange(...) // NOLINT
#define VS_PRINT(a)
#define VS_PRINTLF(a)
#define VS_PRINT_FUNCTION_NAME()
#define VS_PRINT_FUNCTION_SPLIT_LINE()
#define VS_PRINT_CONTAINER(c)
#define VS_PRINT_CONTAINER_TO_STRING(c)
#define VS_PRINT_CONTAINER_2_STRING(c)
#define VS_PRINT_VECTOR_TO_STRING(v)
#define VS_PRINT_VECTOR_2_STRING(v)
#define VS_PRINT_VECTOR_MAPPING(v)

#endif // end of VELOX_SHUFFLE_WRITER_PRINT
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these macros used by the sort shuffle writer? If not, please remove.

@marin-ma marin-ma changed the title [WIP][VL] Support celeborn sort based shuffle [VL] Support celeborn sort based shuffle May 17, 2024
Copy link

Run Gluten Clickhouse CI

@marin-ma marin-ma merged commit dfac04f into apache:main May 17, 2024
43 checks passed
@GlutenPerfBot
Copy link
Contributor

===== Performance report for TPCH SF2000 with Velox backend, for reference only ====

query log/native_5675_time.csv log/native_master_05_13_2024_33f993554_time.csv difference percentage
q1 35.86 36.42 0.562 101.57%
q2 24.03 23.70 -0.333 98.62%
q3 36.79 36.61 -0.177 99.52%
q4 36.84 37.83 0.983 102.67%
q5 68.64 70.62 1.980 102.88%
q6 7.70 7.47 -0.232 96.98%
q7 84.61 85.08 0.465 100.55%
q8 85.04 84.30 -0.735 99.14%
q9 123.72 121.00 -2.715 97.81%
q10 44.98 45.37 0.383 100.85%
q11 19.81 19.49 -0.317 98.40%
q12 25.88 26.52 0.644 102.49%
q13 56.41 55.08 -1.331 97.64%
q14 17.54 21.53 3.985 122.71%
q15 31.38 29.02 -2.361 92.47%
q16 14.17 14.00 -0.167 98.82%
q17 103.78 103.62 -0.160 99.85%
q18 145.89 146.24 0.342 100.23%
q19 13.42 13.35 -0.067 99.50%
q20 28.57 29.06 0.490 101.72%
q21 279.91 282.54 2.632 100.94%
q22 15.84 14.61 -1.227 92.26%
total 1300.80 1303.44 2.643 100.20%

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants