Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1608][part-4] feat(server)(spark3): activate partition reassign when server is inactive #1617

Open
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

zuston
Copy link
Member

@zuston zuston commented Apr 3, 2024

What changes were proposed in this pull request?

When the block failure reassign retry for specified app is enabled, server will fast fail of accepting data when it status is inactive.

Why are the changes needed?

Fix: #1608

Sometimes, the server is unhealthy for local disks, but the existing assignment will still send the data. If it enables
the hdfs as remote storage, the write speed will be slow. This PR could solve this by activating block reassign mechanism.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit tests will be added after #1615 is merged.

@codecov-commenter
Copy link

codecov-commenter commented Apr 3, 2024

Codecov Report

Attention: Patch coverage is 9.75610% with 37 lines in your changes are missing coverage. Please review.

Project coverage is 54.84%. Comparing base (1051d26) to head (7a1c057).

Files Patch % Lines
...pache/uniffle/server/ShuffleServerGrpcService.java 0.00% 13 Missing ⚠️
...niffle/server/netty/ShuffleServerNettyHandler.java 0.00% 12 Missing ⚠️
...ffle/client/request/RssRegisterShuffleRequest.java 0.00% 5 Missing ⚠️
...ffle/client/impl/grpc/ShuffleServerGrpcClient.java 0.00% 3 Missing ⚠️
.../apache/uniffle/client/api/ShuffleWriteClient.java 0.00% 2 Missing ⚠️
...rg/apache/uniffle/server/ShuffleSpecification.java 66.66% 1 Missing ⚠️
...ava/org/apache/uniffle/server/ShuffleTaskInfo.java 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #1617      +/-   ##
============================================
+ Coverage     53.98%   54.84%   +0.86%     
- Complexity     2872     2873       +1     
============================================
  Files           438      418      -20     
  Lines         24927    22605    -2322     
  Branches       2126     2130       +4     
============================================
- Hits          13456    12397    -1059     
+ Misses        10626     9435    -1191     
+ Partials        845      773      -72     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@zuston zuston changed the title [#1608][part-4] feat(server)(spark3): best effort to resend when server is inactive [#1608][part-4] feat(server)(spark3): activate block resend when server is inactive Apr 3, 2024
Copy link

github-actions bot commented Apr 3, 2024

Test Results

 2 398 files  + 7   2 398 suites  +7   4h 42m 47s ⏱️ + 1m 51s
   929 tests + 4     928 ✅ + 4   1 💤 ±0  0 ❌ ±0 
10 761 runs  +49  10 747 ✅ +49  14 💤 ±0  0 ❌ ±0 

Results for commit 015ab6c. ± Comparison against base commit 30bf8dc.

♻️ This comment has been updated with latest results.

@@ -98,6 +99,24 @@ public void exceptionCaught(Throwable cause, TransportClient client) {
public void handleSendShuffleDataRequest(TransportClient client, SendShuffleDataRequest req) {
RpcResponse rpcResponse;
String appId = req.getAppId();

if (shuffleServer.getServerStatus() != ServerStatus.ACTIVE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the timing of calling shuffleBufferManager.releaseMemory(req.encodedLength(), false, true); in the code needs to be reconsidered, as there might be some issues here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you help check? You know I'm not familiar with netty

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the code could be:

public void handleSendShuffleDataRequest(TransportClient client, SendShuffleDataRequest req) {
    RpcResponse rpcResponse;
    String appId = req.getAppId();
    int shuffleId = req.getShuffleId();
    long requireBufferId = req.getRequireId();

    // The codes have been moved here
    ShuffleTaskManager manager = shuffleServer.getShuffleTaskManager();
    PreAllocatedBufferInfo info = manager.getAndRemovePreAllocatedBuffer(requireBufferId);
    boolean isPreAllocated = info != null;
    if (isPreAllocated) {
      ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager();
      shuffleBufferManager.releaseMemory(req.encodedLength(), false, true);
    }

    // Your new code should be put here
    ...

    long timestamp = req.getTimestamp();

    if (timestamp > 0) {
      /*
       * Here we record the transport time, but we don't consider the impact of data size on transport time.
       * The amount of data will not cause great fluctuations in latency. For example, 100K costs 1ms,
       * and 1M costs 10ms. This seems like a normal fluctuation, but it may rise to 10s when the server load is high.
       * In addition, we need to pay attention to that the time of the client machine and the machine
       * time of the Shuffle Server should be kept in sync. TransportTime is accurate only if this condition is met.
       * */
      long transportTime = System.currentTimeMillis() - timestamp;
      if (transportTime > 0) {
        shuffleServer
            .getNettyMetrics()
            .recordTransportTime(SendShuffleDataRequest.class.getName(), transportTime);
      }
    }
    int requireSize = shuffleServer.getShuffleTaskManager().getRequireBufferSize(requireBufferId);
    int requireBlocksSize =
        requireSize - req.encodedLength() < 0 ? 0 : requireSize - req.encodedLength();

    StatusCode ret = StatusCode.SUCCESS;
    String responseMessage = "OK";
    if (req.getPartitionToBlocks().size() > 0) {
      ShuffleServerMetrics.counterTotalReceivedDataSize.inc(requireBlocksSize);
      
      // Previous codes have been moved to the top

      if (!isPreAllocated) {
        req.getPartitionToBlocks().values().stream()
            .flatMap(Collection::stream)
            .forEach(block -> block.getData().release());
      }

      ...
}

You can check the logic again. The main purpose is to release the extra memory reserved during the pre-allocation phase immediately upon entering the handleSendShuffleDataRequest method, because this part of memory has already been released when decoding the SendShuffleDataRequest request.

@dingshun3016
Copy link
Contributor

This function is very useful, but I'm a little worried. when all apps in the cluster are enabled, if server a1 is inactive, it will be immediately assigned to another server a2, and all apps on the server a1 will be written to server a2. Will server a2 also be inactive state soon, will this cause server storm?

@zuston
Copy link
Member Author

zuston commented Apr 11, 2024

This function is very useful, but I'm a little worried. when all apps in the cluster are enabled, if server a1 is inactive, it will be immediately assigned to another server a2, and all apps on the server a1 will be written to server a2. Will server a2 also be inactive state soon, will this cause server storm?

Emm... Actually, it will failover to multiple servers rather than 1 replacement server.

@zuston zuston changed the title [#1608][part-4] feat(server)(spark3): activate block resend when server is inactive [#1608][part-4] feat(server)(spark3): activate partition reassign when server is inactive Apr 23, 2024
@zuston zuston force-pushed the issue-1608-4 branch 2 times, most recently from 2bac7bc to 1c20795 Compare April 24, 2024 03:17
@zuston
Copy link
Member Author

zuston commented May 9, 2024

PTAL @xumanbu @dingshun3016 @rickyma @jerqi

@@ -468,7 +468,7 @@ public class ShuffleServerConf extends RssBaseConf {
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to activate client partition reassign mechanism for server quick decommission");
"Whether to activate client partition reassign mechanism for server quick decommission or inactive.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modify the document.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this config option, I'm not sure whether this should be grouped as decommission .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modify the document.

For client partition reassign mechanism, I will propose another PR to finish this. And this will also be involved in it.

@@ -305,6 +306,7 @@ enum StatusCode {
ACCESS_DENIED = 8;
INVALID_REQUEST = 9;
NO_BUFFER_FOR_HUGE_PARTITION = 10;
SERVER_INACTIVE = 11;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the meaning of this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is ACCESS_DENIED suitable for this case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

Comment on lines 431 to 444
if (shuffleServer.isActivateClientPartitionReassign()
&& shuffleServer
.getShuffleTaskManager()
.getShuffleTaskInfo(appId)
.isBlockFailureReassignEnabled()) {
responseObserver.onNext(
RequireBufferResponse.newBuilder()
.setStatus(StatusCode.SERVER_INACTIVE.toProto())
.setRetMsg("Server is inactive, status: " + shuffleServer.getServerStatus())
.build());
responseObserver.onCompleted();
return;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may easily trigger the restriction of blockFailSentRetryMaxTimes in reassign

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? It's rare that server is not healthy

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our production environment, our health check uses HealthScriptChecker. The server is not healthy very frequently, such as high load or high network card traffic or disk utilization triggering threshold, etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our production environment, our health check uses HealthScriptChecker. The server is not healthy very frequently, such as high load or high network card traffic or disk utilization triggering threshold, etc.

Yes, I introduced the extra config to control whether to enable this feature. So you could disable this.

@zuston
Copy link
Member Author

zuston commented May 11, 2024

All done @jerqi

@jerqi
Copy link
Contributor

jerqi commented May 11, 2024

In general, I want to discuss with some design ideas.
What's the scope of this pull request to handle? Unhealthy node? Decomission node? You create a new concept inactive node. But there isn't concept in our system now. Inactive node is too general concept, Could you give it a more clear definititaion?

@zuston
Copy link
Member Author

zuston commented May 11, 2024

In general, I want to discuss with some design ideas. What's the scope of this pull request to handle? Unhealthy node? Decomission node? You create a new concept inactive node. But there isn't concept in our system now. Inactive node is too general concept, Could you give it a more clear definititaion?

Oh yes. Inactive status means that the server is not active, including all non active status code. like decommission/unhealthy and so on.

Acutally this PR could be scoped for the unhealthy server. let's discuss this point.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support client partition data reassign
6 participants