Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[SPARK-44421][SPARK-44423][CONNECT] Reattachable execution in Spark C…
…onnect ### What changes were proposed in this pull request? Implement server support and scala client for reattachable execution with two new RPCs: * ReattachExecute * ReleaseExecute With reattachable execution, if the response stream of ExecutePlan RPC gets broken, it can be reattached to with ReattachExecute RPC. The server now holds a bit of a backtrack buffer, in case the client coming back with reattach after error did not receive the latest sent responses, and have to backtrack. The client can sends ReleaseExecute responses to let the server know what has already been consumed and can be freed. #### Server: ExecuteResponseObserver If ExecuteHolder is reattachable, ExecuteResponseSender is modified to not throw away responses that were returned to ExecuteGrpcResponseSender immediately, but keep a buffer of `spark.connect.execute.reattachable.observerRetryBufferSize` (default 1 MB) responses behind the last returned response. This facilitates returning consumer being able to backtrack in case the last returned response did not successfully arrive at the client. #### Server: ExecuteGrpcResponseSender If ExecuteHolder is reattachable, ExecuteGrpcResponseSender is modified to gracefully end the RPC response stream after a given time (`spark.connect.execute.reattachable.senderMaxStreamDuration`, default 5 minutes) or size of responses (`spark.connect.execute.reattachable.senderMaxStreamSize`, default 1 GB). After that the response is completed, and client needs to come back with a ReattachExecute RPC to continue. Clients know whether it's the real end of the results, because ExecuteThreadRunner now puts a marker ResultComplete message at the real end of the stream. Cutting the stream manually in a graceful way like that can help prevent errors related to long open streams. It also helps with long held idle stream that may get broken because of idleness. The client is able to handle these errors, but this also provides a cleaner way to minimize the number of network errors. ExecuteGrpcResponseSender also implements flow control of responses sent to the GRPC stream. Normally, grpcResponseObserver.onNext would never block. This doesn't however mean that it would send all responses immediately. The responses could be queued if the client does not keep up receiving and processing them. This queue can grow ubounded, and we don't know what was actually sent, so we don't know how much of a buffer ExecuteResponseObserver really needs to keep. Flow control utilizes functions of ServerCallStreamObserver, which provides means to check if the stream is actually ready to send data, and callbacks to notify when it becomes ready. ExecuteGrpcResponseSender will wait with grpcResponseObserver.onNext until it becomes ready. This also helps with clients that would abandon their result stream. Now we will block from sending the responses, and time out the RPC after senderMaxStreamDuration, instead of pushing the responses to a GRPC outbound queue that is outside our control. The TODO server mechanism would then remove the execution with no RPC attached. Note: in such a situation, responses will still pile up in ExecuteResponseObserver; there is unfortunately no flow control of the Spark task results being pushed to it. #### Client: ExecutePlanResponseReattachableIterator Client wraps the whole implementation inside ExecutePlanResponseReattachableIterator. It's a wrapper that starts with an ExecutePlan result iterator, but if this iterator either finishes without a ResultComplete message, or an intermittent network error occurs, it can acquire a new iterator to continue where it left off: by tracking the last received response, it can request a new iterator from ReattachExecute that starts after that. The client sends ReleaseExecute RPCs asynchronously to release the responses it already consumed (potentially faster than the ExecuteResponseObserver would release from its backtrack buffer). The client doesn't need to check their result or block on them. If something fails, the executions will get cleaned by the TODO cleanup mechanism in the server. #### TODO as followup * What remains TODO for a followup PR is server mechanism for dealing with executions that have been abandoned - the client did not come back with ReatachExecute, and didn't free the execution with ReleaseExecute. This will be done by an operation manager that monitors all active executions, and interrupts and removes ones that were not released, but don't have any client RPC attached to them for a while. * Python client implementation is a followup. ### Why are the changes needed? Robust handling of long running queries in face of intermittent network glitches. ### Does this PR introduce _any_ user-facing change? The re-attachable execution mechanism is not exposed in any public API. Several internal configs have been added: Connect server configs: * `spark.connect.execute.reattachable.senderMaxStreamDuration` * `spark.connect.execute.reattachable.senderMaxStreamSize` * `spark.connect.execute.reattachable.observerRetryBufferSize` Connect scala client configuration element: `useReattachableExecute` ### How was this patch tested? Existing E2E suite utilize re-attachable execution. The tests have been configure to exercise reattaching by using an extremely low senderMaxStreamDuration and senderMaxStreamSize. Closes apache#42228 from juliuszsompolski/SPARK-44421. Lead-authored-by: Juliusz Sompolski <julek@databricks.com> Co-authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
- Loading branch information