New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[VL] Support celeborn sort based shuffle #5675
Conversation
Thanks for opening a pull request! Could you open an issue for this pull request on Github Issues? https://github.com/apache/incubator-gluten/issues Then could you also rename commit message and pull request title in the following format?
See also: |
a34f4f9
to
3bed73f
Compare
Run Gluten Clickhouse CI |
1 similar comment
Run Gluten Clickhouse CI |
3bed73f
to
dbd0f80
Compare
Run Gluten Clickhouse CI |
dbd0f80
to
d6cf16d
Compare
Run Gluten Clickhouse CI |
d6cf16d
to
b140199
Compare
Run Gluten Clickhouse CI |
Thank you for the PR! Waiting for long time. |
@@ -132,6 +132,8 @@ class VeloxShuffleWriter final : public ShuffleWriter { | |||
|
|||
arrow::Status split(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit) override; | |||
|
|||
arrow::Status sort(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit) override; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may separate the sort based and hash based shuffle in different files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FelixYBW After Sort's proposal is reviewed, I'll refactor the code structure to meet the requirements of the different files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kerwin-zk After this patch, part of the existing APIs are only used for hash-based shuffle, and the newly added ones are only used for sort-based shuffle, which leads to a mixing of the code. It would be nice to separate the APIs for both writer & reader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@marin-ma Shuffle Writer's api has been split into sort and hash, for Shuffle Reader, it just adds Velox Stream parsing, and doesn't differentiate between sort and hash, the core logic is the same, I don't feel the need to split it out separately.
Run Gluten Clickhouse CI |
828b059
to
81771a4
Compare
Run Gluten Clickhouse CI |
arrow::Status VeloxShuffleWriter::stop() { | ||
arrow::Status VeloxShuffleWriter::doSort(facebook::velox::RowVectorPtr rv, int64_t memLimit) { | ||
currentInputColumnBytes_ += rv->estimateFlatSize(); | ||
batches_.push_back(rv); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QQ: looks like this only enqueue the batches, the sort is done in Celeborn side?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhouyuan In the HashPartitioner's compute, it will encapsulate the information of each row belonging to the same PartitionId in the RowVector into an int, and put it into the Vector of the map, which is equivalent to replacing the Sort with a HashMap.
Run Gluten Clickhouse CI |
3 similar comments
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add cpp UT for sort-based shuffle? If it's possible to implement another LocalRssClient
for sort-based shuffle and use it in native tests, we can reuse the existing tests by adding a new test param in VeloxShuffleWriterTest
cpp/core/jni/JniWrapper.cc
Outdated
@@ -148,6 +149,69 @@ class JavaInputStreamAdaptor final : public arrow::io::InputStream { | |||
bool closed_ = false; | |||
}; | |||
|
|||
class JavaInputStreamVeloxWrapper final : public JavaInputStreamWrapper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use JavaInputStreamAdapter
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The existing JavaInputStreamAdapter
is defined here
class JavaInputStreamAdaptor final : public arrow::io::InputStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@marin-ma done
cpp/velox/compute/VeloxRuntime.cc
Outdated
return std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory)); | ||
schema, | ||
std::move(codec), | ||
options.compressionTypeStr, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As it's in Velox backend, we can put the str->kind conversion here annd store the velox::common::CompressionKind
instead of the compression string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Run Gluten Clickhouse CI |
6 similar comments
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few comments.
cpp/core/shuffle/HashPartitioner.cc
Outdated
std::unordered_map<int32_t, std::vector<int64_t>>& rowVectorIndexMap) { | ||
auto index = static_cast<int64_t>(vectorIndex) << 32; | ||
for (auto i = 0; i < numRows; ++i) { | ||
auto pid = pidArr[i] % numPartitions_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are the same in the function above. Could you extract the common part?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
facebook::velox::OStreamOutputStream* out, | ||
facebook::velox::RowTypePtr* rowTypePtr) { | ||
int64_t rawSize = batch_->size(); | ||
batch_->flush(out); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to PrestoSerializer implementation, looks like there's an unavoidable extra memory copy here, from the intermediate buffer used by batch_->append
to std::ostringstream* output
. And the memory used by std::ostringstream* output
is not being tracked. Is there any way to track the memory usage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use IOBufOutputStream
so that the memory can be tracked by velox memory pool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kerwin-zk Just found that the data stored in IOBufOutputStream
might not be contiguous. Let's keep the current implementation as for now. I will find a better way to resolve it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@marin-ma That's the problem. If it's not contiguous, there's an extra copy.
for (int start = 0; start < outputSize; start++) { | ||
const int32_t vectorIndex = static_cast<int32_t>(rowVectorIndex.at(start) >> 32); | ||
const int32_t rowIndex = static_cast<int32_t>(rowVectorIndex.at(start) & 0xFFFFFFFFLL); | ||
groupedIndices[vectorIndex].push_back({rowIndex, 1}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mapping from the input vector index + row index is already ordered. Can we directly build the index range in Partitioner::compute? Then the adjacent row index of the same partition id can be merged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@marin-ma Partitioner::compute
belongs to the core part of the cpp, it is not appropriate to introduce Velox-related IndexRange
, I will implement this part of the logic in evictRowVector
.
2923db6
to
f9d73e6
Compare
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few comments. Thanks.
cpp/core/shuffle/ShuffleReader.cc
Outdated
@@ -16,6 +16,9 @@ | |||
*/ | |||
|
|||
#include "ShuffleReader.h" | |||
|
|||
#include <jni/JniCommon.h> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unintended change?
params.push_back(ShuffleTestParams{PartitionWriterType::kRss, compression, compressionThreshold, 0}); | ||
params.push_back(ShuffleTestParams{ | ||
ShuffleWriterType::kHashShuffle, PartitionWriterType::kRss, compression, compressionThreshold, 0}); | ||
params.push_back(ShuffleTestParams{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the "compressionThreshold" is not used in the "sort + rss" code path. We can move this combination out of this loop to reduce the number of test cases.
std::shared_ptr<VeloxShuffleWriter> shuffleWriter; | ||
if (GetParam().shuffleWriterType == kHashShuffle) { | ||
GLUTEN_ASSIGN_OR_THROW( | ||
shuffleWriter, | ||
VeloxHashBasedShuffleWriter::create( | ||
kNumPartitions, std::move(partitionWriter), std::move(shuffleWriterOptions_), pool_, arrowPool)); | ||
} else if ( | ||
GetParam().shuffleWriterType == kSortShuffle && GetParam().partitionWriterType == PartitionWriterType::kRss) { | ||
GLUTEN_ASSIGN_OR_THROW( | ||
shuffleWriter, | ||
VeloxSortBasedShuffleWriter::create( | ||
kNumPartitions, std::move(partitionWriter), std::move(shuffleWriterOptions_), pool_, arrowPool)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please extract this logic into a function to avoid copy-paste.
Object pusher, | ||
String partitionWriterType); | ||
String partitionWriterType, | ||
String shuffleWriterType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can consider separating the JNI call for hash/sort shuffle in the future.
@kerwin-zk It's weird that I cannot reproduce the UT failure on my dev environment (ubuntu 22.04), but can reproduce that in the docker container (centos stream 8). |
@marin-ma Yes, I am checking this issue today. I tested it using centos8 and ubuntu2004, but the local development environment could not reproduce it. |
@kerwin-zk Do you mean it cannot be reproduced on centos8? What GCC version do you use? It's gcc-9 in docker https://github.com/apache/incubator-gluten/blob/main/.github/workflows/velox_docker.yml#L524 |
@marin-ma centos8 gcc10 |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Some minor comments. Thanks!
cpp/core/shuffle/HashPartitioner.cc
Outdated
@@ -16,9 +16,28 @@ | |||
*/ | |||
|
|||
#include "shuffle/HashPartitioner.h" | |||
#include <iostream> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove.
@@ -541,6 +541,10 @@ arrow::Status LocalPartitionWriter::evict( | |||
return arrow::Status::OK(); | |||
} | |||
|
|||
arrow::Status LocalPartitionWriter::evict(uint32_t partitionId, int64_t rawSize, const char* data, int64_t length) { | |||
return arrow::Status::OK(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return arrow::Status::NotImplemented("Invalid code path for local shuffle writer: sort based is not supported.");
enum PartitionWriterType { kLocal, kRss }; | ||
|
||
struct ShuffleReaderOptions { | ||
arrow::Compression::type compressionType = arrow::Compression::type::LZ4_FRAME; | ||
std::string compressionTypeStr = "lz4"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add static const std::string kDefaultCompressionTypeStr = "lz4";
in above code and use kDefaultCompressionTypeStr
here.
cpp/core/shuffle/Options.h
Outdated
@@ -56,6 +63,7 @@ struct PartitionWriterOptions { | |||
|
|||
int32_t compressionThreshold = kDefaultCompressionThreshold; | |||
arrow::Compression::type compressionType = arrow::Compression::LZ4_FRAME; | |||
std::string compressionTypeStr = "lz4"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kDefaultCompressionTypeStr
@@ -46,7 +46,7 @@ class VeloxColumnarBatch final : public ColumnarBatch { | |||
facebook::velox::RowVectorPtr getRowVector() const; | |||
facebook::velox::RowVectorPtr getFlattenedRowVector(); | |||
|
|||
private: | |||
public: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unintended change?
#if VELOX_SHUFFLE_WRITER_PRINT | ||
|
||
#define VsPrint Print | ||
#define VsPrintLF PrintLF | ||
#define VsPrintSplit PrintSplit | ||
#define VsPrintSplitLF PrintSplitLF | ||
#define VsPrintVectorRange PrintVectorRange | ||
#define VS_PRINT PRINT | ||
#define VS_PRINTLF PRINTLF | ||
#define VS_PRINT_FUNCTION_NAME PRINT_FUNCTION_NAME | ||
#define VS_PRINT_FUNCTION_SPLIT_LINE PRINT_FUNCTION_SPLIT_LINE | ||
#define VS_PRINT_CONTAINER PRINT_CONTAINER | ||
#define VS_PRINT_CONTAINER_TO_STRING PRINT_CONTAINER_TO_STRING | ||
#define VS_PRINT_CONTAINER_2_STRING PRINT_CONTAINER_2_STRING | ||
#define VS_PRINT_VECTOR_TO_STRING PRINT_VECTOR_TO_STRING | ||
#define VS_PRINT_VECTOR_2_STRING PRINT_VECTOR_2_STRING | ||
#define VS_PRINT_VECTOR_MAPPING PRINT_VECTOR_MAPPING | ||
|
||
#else // VELOX_SHUFFLE_WRITER_PRINT | ||
|
||
#define VsPrint(...) // NOLINT | ||
#define VsPrintLF(...) // NOLINT | ||
#define VsPrintSplit(...) // NOLINT | ||
#define VsPrintSplitLF(...) // NOLINT | ||
#define VsPrintVectorRange(...) // NOLINT | ||
#define VS_PRINT(a) | ||
#define VS_PRINTLF(a) | ||
#define VS_PRINT_FUNCTION_NAME() | ||
#define VS_PRINT_FUNCTION_SPLIT_LINE() | ||
#define VS_PRINT_CONTAINER(c) | ||
#define VS_PRINT_CONTAINER_TO_STRING(c) | ||
#define VS_PRINT_CONTAINER_2_STRING(c) | ||
#define VS_PRINT_VECTOR_TO_STRING(v) | ||
#define VS_PRINT_VECTOR_2_STRING(v) | ||
#define VS_PRINT_VECTOR_MAPPING(v) | ||
|
||
#endif // end of VELOX_SHUFFLE_WRITER_PRINT |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these macros used by the sort shuffle writer? If not, please remove.
Run Gluten Clickhouse CI |
===== Performance report for TPCH SF2000 with Velox backend, for reference only ====
|
What changes were proposed in this pull request?
#4048
How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)