Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-44421][SPARK-44423][CONNECT] Reattachable execution in Spark Connect #42228

Closed
wants to merge 18 commits into from

Conversation

juliuszsompolski
Copy link
Contributor

@juliuszsompolski juliuszsompolski commented Jul 30, 2023

What changes were proposed in this pull request?

Implement server support and scala client for reattachable execution with two new RPCs:

  • ReattachExecute
  • ReleaseExecute

With reattachable execution, if the response stream of ExecutePlan RPC gets broken, it can be reattached to with ReattachExecute RPC. The server now holds a bit of a backtrack buffer, in case the client coming back with reattach after error did not receive the latest sent responses, and have to backtrack. The client can sends ReleaseExecute responses to let the server know what has already been consumed and can be freed.

Server: ExecuteResponseObserver

If ExecuteHolder is reattachable, ExecuteResponseSender is modified to not throw away responses that were returned to ExecuteGrpcResponseSender immediately, but keep a buffer of spark.connect.execute.reattachable.observerRetryBufferSize (default 1 MB) responses behind the last returned response. This facilitates returning consumer being able to backtrack in case the last returned response did not successfully arrive at the client.

Server: ExecuteGrpcResponseSender

If ExecuteHolder is reattachable, ExecuteGrpcResponseSender is modified to gracefully end the RPC response stream after a given time (spark.connect.execute.reattachable.senderMaxStreamDuration, default 5 minutes) or size of responses (spark.connect.execute.reattachable.senderMaxStreamSize, default 1 GB). After that the response is completed, and client needs to come back with a ReattachExecute RPC to continue. Clients know whether it's the real end of the results, because ExecuteThreadRunner now puts a marker ResultComplete message at the real end of the stream.

Cutting the stream manually in a graceful way like that can help prevent errors related to long open streams. It also helps with long held idle stream that may get broken because of idleness. The client is able to handle these errors, but this also provides a cleaner way to minimize the number of network errors.

ExecuteGrpcResponseSender also implements flow control of responses sent to the GRPC stream. Normally, grpcResponseObserver.onNext would never block. This doesn't however mean that it would send all responses immediately. The responses could be queued if the client does not keep up receiving and processing them. This queue can grow ubounded, and we don't know what was actually sent, so we don't know how much of a buffer ExecuteResponseObserver really needs to keep. Flow control utilizes functions of ServerCallStreamObserver, which provides means to check if the stream is actually ready to send data, and callbacks to notify when it becomes ready. ExecuteGrpcResponseSender will wait with grpcResponseObserver.onNext until it becomes ready.

This also helps with clients that would abandon their result stream. Now we will block from sending the responses, and time out the RPC after senderMaxStreamDuration, instead of pushing the responses to a GRPC outbound queue that is outside our control. The TODO server mechanism would then remove the execution with no RPC attached. Note: in such a situation, responses will still pile up in ExecuteResponseObserver; there is unfortunately no flow control of the Spark task results being pushed to it.

Client: ExecutePlanResponseReattachableIterator

Client wraps the whole implementation inside ExecutePlanResponseReattachableIterator. It's a wrapper that starts with an ExecutePlan result iterator, but if this iterator either finishes without a ResultComplete message, or an intermittent network error occurs, it can acquire a new iterator to continue where it left off: by tracking the last received response, it can request a new iterator from ReattachExecute that starts after that.

The client sends ReleaseExecute RPCs asynchronously to release the responses it already consumed (potentially faster than the ExecuteResponseObserver would release from its backtrack buffer). The client doesn't need to check their result or block on them. If something fails, the executions will get cleaned by the TODO cleanup mechanism in the server.

TODO as followup

  • What remains TODO for a followup PR is server mechanism for dealing with executions that have been abandoned - the client did not come back with ReatachExecute, and didn't free the execution with ReleaseExecute. This will be done by an operation manager that monitors all active executions, and interrupts and removes ones that were not released, but don't have any client RPC attached to them for a while.
  • Python client implementation is a followup.

Why are the changes needed?

Robust handling of long running queries in face of intermittent network glitches.

Does this PR introduce any user-facing change?

The re-attachable execution mechanism is not exposed in any public API. Several internal configs have been added:

Connect server configs:

  • spark.connect.execute.reattachable.senderMaxStreamDuration
  • spark.connect.execute.reattachable.senderMaxStreamSize
  • spark.connect.execute.reattachable.observerRetryBufferSize

Connect scala client configuration element: useReattachableExecute

How was this patch tested?

Existing E2E suite utilize re-attachable execution. The tests have been configure to exercise reattaching by using an extremely low senderMaxStreamDuration and senderMaxStreamSize.

request: proto.ExecutePlanRequest,
channel: ManagedChannel,
retryPolicy: GrpcRetryHandler.RetryPolicy)
extends java.util.Iterator[proto.ExecutePlanResponse]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be CloseableIterator instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The regular iterator returned from regular executePlan is not a closeable iterator, and grpc-java doesn't seem to play nicely with closeable iterators in general: grpc/grpc-java#2409
I think the best I can do in Java is make sure that internal users in the client consume it... and over places that give control over it to user (like Dataset.toLocalIterator) there's no good control.
If an iterator is left open and idle:

  • the server will close the RPC stream after 5 minutes (STREAM_MAX_TIME)
  • then the server will tear down executions without attached RPC stream after another 5 minutes (TODO)

(retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
.pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
rawAsyncStub.releaseExecute(requestForRetry,
createRetryingReleaseExecuteResponseObserer(requestForRetry, currentRetryNum + 1))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq why do we do this with StreamObserver? can't we just retry with retry you defined below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want the client to call the releaseExecute async and don't block from continuing with the results on getting an ack that it executed. retry would be a blocking iteration... unless I mixed something up and I could use it for async as well.

Copy link
Contributor

@grundprinzip grundprinzip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did a first pass on the server side.

ConfigBuilder("spark.connect.execute.reattachable.senderMaxStreamDuration")
.internal()
.doc("For reattachable execution, after this amount of time the response stream will be " +
"automatically completed and client needs to send a new ReattachExecute RPC to continue. " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completed is a weird term here. But I don't have better suggestions immediately either.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the completed here comes from server doing an onComplete on the stream... couldn't find a better word either.

var i = lastSentIndex
var totalResponsesSize = 0L
while (i >= 1 && responses.get(i).isDefined && totalResponsesSize < retryBufferSize) {
totalResponsesSize += responses.get(i).get.serializedByteSize
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the total response size only update when we remove the items?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is indeed calculated on the fly when removed, and iterates back from the returned response until retryBufferSize is kept. This could be smarter, we could keep some "second finger" on the oldest buffered, and then cache the total between that and the highest consumed (our "first finger"), but it does actually get a bit more complicated when after a reattach the client backtracks (so our "first finger" moves back)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that it's a total of 1 MB of responses, even if they were tiny (100 bytes?), it's an iteration over at most thousands of elements, and usually a few elements, so I thought it's not worth the complication for the optimization.

@github-actions github-actions bot added the DOCS label Jul 31, 2023
@juliuszsompolski juliuszsompolski changed the title [SPARK-44421][CONNECT] Reattachable execution in Spark Connect [SPARK-44421][SPARK-44423][CONNECT] Reattachable execution in Spark Connect Aug 1, 2023
@github-actions github-actions bot added the CORE label Aug 1, 2023
@@ -149,7 +149,7 @@ class SparkThrowableSuite extends SparkFunSuite {
checkIfUnique(messageFormats)
}

test("Error classes match with document") {
ignore("Error classes match with document") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/juliuszsompolski/apache-spark/actions/runs/5720841151/job/15504640463

This fails. Since I am also actively working on this item together, I will just merge with ignoring this for now. Should be easy to fix. I will leave this to you @juliuszsompolski

@HyukjinKwon
Copy link
Member

Merged to master and branch-3.5.

HyukjinKwon added a commit that referenced this pull request Aug 1, 2023
…onnect

### What changes were proposed in this pull request?

Implement server support and scala client for reattachable execution with two new RPCs:
* ReattachExecute
* ReleaseExecute

With reattachable execution, if the response stream of ExecutePlan RPC gets broken, it can be reattached to with ReattachExecute RPC. The server now holds a bit of a backtrack buffer, in case the client coming back with reattach after error did not receive the latest sent responses, and have to backtrack. The client can sends ReleaseExecute responses to let the server know what has already been consumed and can be freed.

#### Server: ExecuteResponseObserver

If ExecuteHolder is reattachable, ExecuteResponseSender is modified to not throw away responses that were returned to ExecuteGrpcResponseSender immediately, but keep a buffer of `spark.connect.execute.reattachable.observerRetryBufferSize` (default 1 MB) responses behind the last returned response. This facilitates returning consumer being able to backtrack in case the last returned response did not successfully arrive at the client.

#### Server: ExecuteGrpcResponseSender

If ExecuteHolder is reattachable, ExecuteGrpcResponseSender is modified to gracefully end the RPC response stream after a given time (`spark.connect.execute.reattachable.senderMaxStreamDuration`, default 5 minutes) or size of responses (`spark.connect.execute.reattachable.senderMaxStreamSize`, default 1 GB). After that the response is completed, and client needs to come back with a ReattachExecute RPC to continue. Clients know whether it's the real end of the results, because ExecuteThreadRunner now puts a marker ResultComplete message at the real end of the stream.

Cutting the stream manually in a graceful way like that can help prevent errors related to long open streams. It also helps with long held idle stream that may get broken because of idleness. The client is able to handle these errors, but this also provides a cleaner way to minimize the number of network errors.

ExecuteGrpcResponseSender also implements flow control of responses sent to the GRPC stream. Normally, grpcResponseObserver.onNext would never block. This doesn't however mean that it would send all responses immediately. The responses could be queued if the client does not keep up receiving and processing them. This queue can grow ubounded, and we don't know what was actually sent, so we don't know how much of a buffer ExecuteResponseObserver really needs to keep. Flow control utilizes functions of ServerCallStreamObserver, which provides means to check if the stream is actually ready to send data, and callbacks to notify when it becomes ready. ExecuteGrpcResponseSender will wait with grpcResponseObserver.onNext until it becomes ready.

This also helps with clients that would abandon their result stream. Now we will block from sending the responses, and time out the RPC after senderMaxStreamDuration, instead of pushing the responses to a GRPC outbound queue that is outside our control. The TODO server mechanism would then remove the execution with no RPC attached. Note: in such a situation, responses will still pile up in ExecuteResponseObserver; there is unfortunately no flow control of the Spark task results being pushed to it.

#### Client: ExecutePlanResponseReattachableIterator

Client wraps the whole implementation inside ExecutePlanResponseReattachableIterator. It's a wrapper that starts with an ExecutePlan result iterator, but if this iterator either finishes without a ResultComplete message, or an intermittent network error occurs, it can acquire a new iterator to continue where it left off: by tracking the last received response, it can request a new iterator from ReattachExecute that starts after that.

The client sends ReleaseExecute RPCs asynchronously to release the responses it already consumed (potentially faster than the ExecuteResponseObserver would release from its backtrack buffer). The client doesn't need to check their result or block on them. If something fails, the executions will get cleaned by the TODO cleanup mechanism in the server.

#### TODO as followup

* What remains TODO for a followup PR is server mechanism for dealing with executions that have been abandoned - the client did not come back with ReatachExecute, and didn't free the execution with ReleaseExecute. This will be done by an operation manager that monitors all active executions, and interrupts and removes ones that were not released, but don't have any client RPC attached to them for a while.
* Python client implementation is a followup.

### Why are the changes needed?

Robust handling of long running queries in face of intermittent network glitches.

### Does this PR introduce _any_ user-facing change?

The re-attachable execution mechanism is not exposed in any public API. Several internal configs have been added:

Connect server configs:
* `spark.connect.execute.reattachable.senderMaxStreamDuration`
* `spark.connect.execute.reattachable.senderMaxStreamSize`
* `spark.connect.execute.reattachable.observerRetryBufferSize`

Connect scala client configuration element: `useReattachableExecute`

### How was this patch tested?

Existing E2E suite utilize re-attachable execution. The tests have been configure to exercise reattaching by using an extremely low senderMaxStreamDuration and senderMaxStreamSize.

Closes #42228 from juliuszsompolski/SPARK-44421.

Lead-authored-by: Juliusz Sompolski <julek@databricks.com>
Co-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 3e5203c)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
@ueshin
Copy link
Member

ueshin commented Aug 1, 2023

Hi, this seems to break 3.5 build.

[error] /home/runner/work/spark/spark/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:328:9: not found: value logDebug
[error]         logDebug(s"Session not found: ($userId, $sessionId)")
[error]         ^
[error] one error found

@HyukjinKwon
Copy link
Member

Sure, made a followup: #42254

@juliuszsompolski
Copy link
Contributor Author

juliuszsompolski commented Aug 1, 2023 via email

HyukjinKwon added a commit that referenced this pull request Aug 2, 2023
…ing execute in Spark Connect

### What changes were proposed in this pull request?

This PR proposes to implement the Python client side for #42228.

Basically this PR applies the same changes of `ExecutePlanResponseReattachableIterator`, and `SparkConnectClient` to PySpark as  the symmetry.

### Why are the changes needed?

To enable the same feature in #42228

### Does this PR introduce _any_ user-facing change?

Yes, see #42228.

### How was this patch tested?

Existing unittests because it enables the feature by default. Also, manual E2E tests.

Closes #42235 from HyukjinKwon/SPARK-44599.

Lead-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
HyukjinKwon added a commit that referenced this pull request Aug 2, 2023
…ing execute in Spark Connect

### What changes were proposed in this pull request?

This PR proposes to implement the Python client side for #42228.

Basically this PR applies the same changes of `ExecutePlanResponseReattachableIterator`, and `SparkConnectClient` to PySpark as  the symmetry.

### Why are the changes needed?

To enable the same feature in #42228

### Does this PR introduce _any_ user-facing change?

Yes, see #42228.

### How was this patch tested?

Existing unittests because it enables the feature by default. Also, manual E2E tests.

Closes #42235 from HyukjinKwon/SPARK-44599.

Lead-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 68d8e65)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
ragnarok56 pushed a commit to ragnarok56/spark that referenced this pull request Mar 2, 2024
…onnect

### What changes were proposed in this pull request?

Implement server support and scala client for reattachable execution with two new RPCs:
* ReattachExecute
* ReleaseExecute

With reattachable execution, if the response stream of ExecutePlan RPC gets broken, it can be reattached to with ReattachExecute RPC. The server now holds a bit of a backtrack buffer, in case the client coming back with reattach after error did not receive the latest sent responses, and have to backtrack. The client can sends ReleaseExecute responses to let the server know what has already been consumed and can be freed.

#### Server: ExecuteResponseObserver

If ExecuteHolder is reattachable, ExecuteResponseSender is modified to not throw away responses that were returned to ExecuteGrpcResponseSender immediately, but keep a buffer of `spark.connect.execute.reattachable.observerRetryBufferSize` (default 1 MB) responses behind the last returned response. This facilitates returning consumer being able to backtrack in case the last returned response did not successfully arrive at the client.

#### Server: ExecuteGrpcResponseSender

If ExecuteHolder is reattachable, ExecuteGrpcResponseSender is modified to gracefully end the RPC response stream after a given time (`spark.connect.execute.reattachable.senderMaxStreamDuration`, default 5 minutes) or size of responses (`spark.connect.execute.reattachable.senderMaxStreamSize`, default 1 GB). After that the response is completed, and client needs to come back with a ReattachExecute RPC to continue. Clients know whether it's the real end of the results, because ExecuteThreadRunner now puts a marker ResultComplete message at the real end of the stream.

Cutting the stream manually in a graceful way like that can help prevent errors related to long open streams. It also helps with long held idle stream that may get broken because of idleness. The client is able to handle these errors, but this also provides a cleaner way to minimize the number of network errors.

ExecuteGrpcResponseSender also implements flow control of responses sent to the GRPC stream. Normally, grpcResponseObserver.onNext would never block. This doesn't however mean that it would send all responses immediately. The responses could be queued if the client does not keep up receiving and processing them. This queue can grow ubounded, and we don't know what was actually sent, so we don't know how much of a buffer ExecuteResponseObserver really needs to keep. Flow control utilizes functions of ServerCallStreamObserver, which provides means to check if the stream is actually ready to send data, and callbacks to notify when it becomes ready. ExecuteGrpcResponseSender will wait with grpcResponseObserver.onNext until it becomes ready.

This also helps with clients that would abandon their result stream. Now we will block from sending the responses, and time out the RPC after senderMaxStreamDuration, instead of pushing the responses to a GRPC outbound queue that is outside our control. The TODO server mechanism would then remove the execution with no RPC attached. Note: in such a situation, responses will still pile up in ExecuteResponseObserver; there is unfortunately no flow control of the Spark task results being pushed to it.

#### Client: ExecutePlanResponseReattachableIterator

Client wraps the whole implementation inside ExecutePlanResponseReattachableIterator. It's a wrapper that starts with an ExecutePlan result iterator, but if this iterator either finishes without a ResultComplete message, or an intermittent network error occurs, it can acquire a new iterator to continue where it left off: by tracking the last received response, it can request a new iterator from ReattachExecute that starts after that.

The client sends ReleaseExecute RPCs asynchronously to release the responses it already consumed (potentially faster than the ExecuteResponseObserver would release from its backtrack buffer). The client doesn't need to check their result or block on them. If something fails, the executions will get cleaned by the TODO cleanup mechanism in the server.

#### TODO as followup

* What remains TODO for a followup PR is server mechanism for dealing with executions that have been abandoned - the client did not come back with ReatachExecute, and didn't free the execution with ReleaseExecute. This will be done by an operation manager that monitors all active executions, and interrupts and removes ones that were not released, but don't have any client RPC attached to them for a while.
* Python client implementation is a followup.

### Why are the changes needed?

Robust handling of long running queries in face of intermittent network glitches.

### Does this PR introduce _any_ user-facing change?

The re-attachable execution mechanism is not exposed in any public API. Several internal configs have been added:

Connect server configs:
* `spark.connect.execute.reattachable.senderMaxStreamDuration`
* `spark.connect.execute.reattachable.senderMaxStreamSize`
* `spark.connect.execute.reattachable.observerRetryBufferSize`

Connect scala client configuration element: `useReattachableExecute`

### How was this patch tested?

Existing E2E suite utilize re-attachable execution. The tests have been configure to exercise reattaching by using an extremely low senderMaxStreamDuration and senderMaxStreamSize.

Closes apache#42228 from juliuszsompolski/SPARK-44421.

Lead-authored-by: Juliusz Sompolski <julek@databricks.com>
Co-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
ragnarok56 pushed a commit to ragnarok56/spark that referenced this pull request Mar 2, 2024
…ing execute in Spark Connect

### What changes were proposed in this pull request?

This PR proposes to implement the Python client side for apache#42228.

Basically this PR applies the same changes of `ExecutePlanResponseReattachableIterator`, and `SparkConnectClient` to PySpark as  the symmetry.

### Why are the changes needed?

To enable the same feature in apache#42228

### Does this PR introduce _any_ user-facing change?

Yes, see apache#42228.

### How was this patch tested?

Existing unittests because it enables the feature by default. Also, manual E2E tests.

Closes apache#42235 from HyukjinKwon/SPARK-44599.

Lead-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants