New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#1608][part-4] feat(server)(spark3): activate partition reassign when server is inactive #1617
base: master
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #1617 +/- ##
============================================
+ Coverage 53.98% 54.84% +0.86%
- Complexity 2872 2873 +1
============================================
Files 438 418 -20
Lines 24927 22605 -2322
Branches 2126 2130 +4
============================================
- Hits 13456 12397 -1059
+ Misses 10626 9435 -1191
+ Partials 845 773 -72 ☔ View full report in Codecov by Sentry. |
@@ -98,6 +99,24 @@ public void exceptionCaught(Throwable cause, TransportClient client) { | |||
public void handleSendShuffleDataRequest(TransportClient client, SendShuffleDataRequest req) { | |||
RpcResponse rpcResponse; | |||
String appId = req.getAppId(); | |||
|
|||
if (shuffleServer.getServerStatus() != ServerStatus.ACTIVE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the timing of calling shuffleBufferManager.releaseMemory(req.encodedLength(), false, true);
in the code needs to be reconsidered, as there might be some issues here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you help check? You know I'm not familiar with netty
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the code could be:
public void handleSendShuffleDataRequest(TransportClient client, SendShuffleDataRequest req) {
RpcResponse rpcResponse;
String appId = req.getAppId();
int shuffleId = req.getShuffleId();
long requireBufferId = req.getRequireId();
// The codes have been moved here
ShuffleTaskManager manager = shuffleServer.getShuffleTaskManager();
PreAllocatedBufferInfo info = manager.getAndRemovePreAllocatedBuffer(requireBufferId);
boolean isPreAllocated = info != null;
if (isPreAllocated) {
ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager();
shuffleBufferManager.releaseMemory(req.encodedLength(), false, true);
}
// Your new code should be put here
...
long timestamp = req.getTimestamp();
if (timestamp > 0) {
/*
* Here we record the transport time, but we don't consider the impact of data size on transport time.
* The amount of data will not cause great fluctuations in latency. For example, 100K costs 1ms,
* and 1M costs 10ms. This seems like a normal fluctuation, but it may rise to 10s when the server load is high.
* In addition, we need to pay attention to that the time of the client machine and the machine
* time of the Shuffle Server should be kept in sync. TransportTime is accurate only if this condition is met.
* */
long transportTime = System.currentTimeMillis() - timestamp;
if (transportTime > 0) {
shuffleServer
.getNettyMetrics()
.recordTransportTime(SendShuffleDataRequest.class.getName(), transportTime);
}
}
int requireSize = shuffleServer.getShuffleTaskManager().getRequireBufferSize(requireBufferId);
int requireBlocksSize =
requireSize - req.encodedLength() < 0 ? 0 : requireSize - req.encodedLength();
StatusCode ret = StatusCode.SUCCESS;
String responseMessage = "OK";
if (req.getPartitionToBlocks().size() > 0) {
ShuffleServerMetrics.counterTotalReceivedDataSize.inc(requireBlocksSize);
// Previous codes have been moved to the top
if (!isPreAllocated) {
req.getPartitionToBlocks().values().stream()
.flatMap(Collection::stream)
.forEach(block -> block.getData().release());
}
...
}
You can check the logic again. The main purpose is to release the extra memory reserved during the pre-allocation phase immediately upon entering the handleSendShuffleDataRequest
method, because this part of memory has already been released when decoding the SendShuffleDataRequest
request.
This function is very useful, but I'm a little worried. when all apps in the cluster are enabled, if server a1 is inactive, it will be immediately assigned to another server a2, and all apps on the server a1 will be written to server a2. Will server a2 also be inactive state soon, will this cause server storm? |
Emm... Actually, it will failover to multiple servers rather than 1 replacement server. |
2bac7bc
to
1c20795
Compare
…n server is inactive
@@ -468,7 +468,7 @@ public class ShuffleServerConf extends RssBaseConf { | |||
.booleanType() | |||
.defaultValue(false) | |||
.withDescription( | |||
"Whether to activate client partition reassign mechanism for server quick decommission"); | |||
"Whether to activate client partition reassign mechanism for server quick decommission or inactive."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modify the document.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this config option, I'm not sure whether this should be grouped as decommission
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modify the document.
For client partition reassign mechanism, I will propose another PR to finish this. And this will also be involved in it.
proto/src/main/proto/Rss.proto
Outdated
@@ -305,6 +306,7 @@ enum StatusCode { | |||
ACCESS_DENIED = 8; | |||
INVALID_REQUEST = 9; | |||
NO_BUFFER_FOR_HUGE_PARTITION = 10; | |||
SERVER_INACTIVE = 11; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the meaning of this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is ACCESS_DENIED
suitable for this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
if (shuffleServer.isActivateClientPartitionReassign() | ||
&& shuffleServer | ||
.getShuffleTaskManager() | ||
.getShuffleTaskInfo(appId) | ||
.isBlockFailureReassignEnabled()) { | ||
responseObserver.onNext( | ||
RequireBufferResponse.newBuilder() | ||
.setStatus(StatusCode.SERVER_INACTIVE.toProto()) | ||
.setRetMsg("Server is inactive, status: " + shuffleServer.getServerStatus()) | ||
.build()); | ||
responseObserver.onCompleted(); | ||
return; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may easily trigger the restriction of blockFailSentRetryMaxTimes
in reassign
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? It's rare that server is not healthy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In our production environment, our health check uses HealthScriptChecker. The server is not healthy very frequently, such as high load or high network card traffic or disk utilization triggering threshold, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In our production environment, our health check uses HealthScriptChecker. The server is not healthy very frequently, such as high load or high network card traffic or disk utilization triggering threshold, etc.
Yes, I introduced the extra config to control whether to enable this feature. So you could disable this.
All done @jerqi |
In general, I want to discuss with some design ideas. |
Oh yes. Inactive status means that the server is not active, including all non active status code. like decommission/unhealthy and so on. Acutally this PR could be scoped for the unhealthy server. let's discuss this point. |
What changes were proposed in this pull request?
When the block failure reassign retry for specified app is enabled, server will fast fail of accepting data when it status is inactive.
Why are the changes needed?
Fix: #1608
Sometimes, the server is unhealthy for local disks, but the existing assignment will still send the data. If it enables
the hdfs as remote storage, the write speed will be slow. This PR could solve this by activating block reassign mechanism.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Unit tests will be added after #1615 is merged.