Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-44424][CONNECT][PYTHON] Python client for reattaching to existing execute in Spark Connect #42235

Closed
wants to merge 20 commits into from

Conversation

HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Jul 31, 2023

What changes were proposed in this pull request?

This PR proposes to implement the Python client side for #42228.

Basically this PR applies the same changes of ExecutePlanResponseReattachableIterator, and SparkConnectClient to PySpark as the symmetry.

Why are the changes needed?

To enable the same feature in #42228

Does this PR introduce any user-facing change?

Yes, see #42228.

How was this patch tested?

Existing unittests because it enables the feature by default. Also, manual E2E tests.

@juliuszsompolski
Copy link
Contributor

It's tracked by https://issues.apache.org/jira/browse/SPARK-44424 already as well.

@HyukjinKwon HyukjinKwon changed the title [SPARK-44599][CONNECT][PYTHON] Python client for reattaching to existing execute in Spark Connect [SPARK-44424][CONNECT][PYTHON] Python client for reattaching to existing execute in Spark Connect Aug 1, 2023
@HyukjinKwon HyukjinKwon force-pushed the SPARK-44599 branch 2 times, most recently from 1d99fd2 to f4ea862 Compare August 1, 2023 08:50
Copy link
Contributor

@juliuszsompolski juliuszsompolski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, but someone knowing more python should review.

Question: There are python test that work E2E, i.e. connect the client with a separate server?
Could we pass configs like in https://github.com/apache/spark/pull/42228/files#diff-c389e865d11bc54f29d61207c50ced1f9561de1488865eaf11caf789dd6003c2R144 for the testing, so that E2E testing exercises reattaching?

Comment on lines +222 to +234
def close(self) -> None:
return super().close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on close, it should do release_all if it wasn't done.
in scala the Iterator unfortunately doesn't have a close, but if python has it, it should do it.

# Configure logging for the SparkConnect client.

def disable_reattachable_execute(self) -> "SparkConnectClient":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will probably need some flagging to avoid that this is done while a query is currently being executed so that we know what to do. Or if the information returned from the query is enough to independently understand if it needs to be resumed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used only when creating ExecutePlanResponseReattachableIterator, and once that's created, it will know what to do even if this gets flipped.

assert ret is not None

self._last_returned_response_id = ret.response_id
if ret.result_complete:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a comment that this assumes that this is part of the oneof and set to none if it's not set in this response

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I just used a different method to make this clear. (will push them soon)

return ret

def _has_next(self) -> bool:
from pyspark.sql.connect.client.core import SparkConnectClient
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the import here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it causes a circular import (because SparkConnectClient also imports ExecutePlanResponseReattachableIterator)

return False
else:
first_try = True
for attempt in Retrying(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

attempt is of type AttemptManager which contains a _retry_state that has a _count. We can make the retry state visible in the attempt manager if you want to avoid having a first try variable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, this is the time that this context manager shines :-)

can_retry=SparkConnectClient.retry_exception, **self._retry_policy
):
with attempt:
if first_try:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or maybe attempt.first_try (added to attempt manager)

self._result_complete = False

# Initial iterator comes from ExecutePlan request.
self._iterator: Iterator[pb2.ExecutePlanResponse] = self._stub.ExecutePlan(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in the case of the target resource throwing a retriable error, this will already raise an exception and break the retry logic. It looks like the current retry handling can only deal with subsequent errors in the handling of the response stream but not the initial error itself.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@juliuszsompolski is this the same for the current scala client?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least in scala, calling the executePlan never throws error, even if the service is absolutely not available and one could expect immediate error; I tested that with completely killing the server, then calling executePlan, and never getting an actual error before trying to call next()... The error is only thrown on the first next() or hasNext() call of the resulting iterator. So in scala not catching and retrying here, but at the first iterator invocation is fine. So in https://github.com/apache/spark/pull/42228/files#diff-2efc03ab3edd5e5f78652b1454b31fd4917113de09a64fbdc7352f150c685628R89 and also in https://github.com/apache/spark/pull/41829/files#diff-79d952ace19b8b9d4b04e010653aba35e4aeaca957946cc68414b7a766deea99R68 doesn't wrap the first call
we do not retry.

But it's a good callout that it needs checking if python GRPC behaves the same, or maybe just defensively wrap it in retry anyway (and do the same in scala just in case it's a GRPC internal quirk that we shouldn't depend on), or at least it deserves a comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I raised #42274 to be defensive about it in scala.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... but what I write here implies that if the error was indeed during the initial ExecutePlan that didn't reach the server, and we catch it on the first hasNext() later, then Reattach won't work, because the query won't be on the server, so it will fail with INVALID_HANDLE.OPERATION_NOT_FOUND. So this indeed means that the case it won't work if retry of the initial ExecutePlan is needed, and wrapping the retry here won't help for that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retrying the failure of initial operation starting is tricky, because:

  • if the initial operation reached the server, retry of ExecutePlan would fail with INVALID_HANDLE.OPERATION_ALREADY_EXISTS; in that case ReattachPlan should be used. And this is what happens here.
  • but if the initial operation didn't reach the server, then the ReattachPlan will fail with INVALID_HANDLE.OPERATION_NOT_FOUND. This could be improved by making it switch to retrying the ExecutePlan then
    But I think this is a border case that can be a followup or decided it's not worth it.

I believe in the state as is, we will get an INVALID_HANDLE.OPERATION_NOT_FOUND error in case the original ExecutePlan never reached the server. In that case, we could improve it by trying to call ExecutePlan again. But that would have to be caught not here, but during the next() calls.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now I have 3 PRs:

I would suggest merging the first, and discussing this corner case later.

# there is more, and we need to reattach. While ResponseComplete didn't
# arrive, we keep reattaching.
first_loop = True
if not has_next and not self._result_complete:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just readability.

Suggested change
if not has_next and not self._result_complete:
if not self._result_complete and not has_next:

first_loop = True
if not has_next and not self._result_complete:
while not has_next or first_loop:
self._iterator = self._stub.ReattachExecute(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could raise itself with a retriable error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like in https://github.com/apache/spark/pull/42235/files#r1281120432, in scala it wouldn't, but we could be defensive about it.

@HyukjinKwon HyukjinKwon marked this pull request as ready for review August 2, 2023 02:29
@HyukjinKwon HyukjinKwon force-pushed the SPARK-44599 branch 6 times, most recently from d040c77 to eb392ab Compare August 2, 2023 09:49
@apache apache deleted a comment from juliuszsompolski Aug 2, 2023
@HyukjinKwon
Copy link
Member Author

(oops mistakenly removed the comment after applying the change)

@juliuszsompolski juliuszsompolski force-pushed the SPARK-44599 branch 2 times, most recently from 9e04420 to f1c59e6 Compare August 2, 2023 18:49
hvanhovell pushed a commit that referenced this pull request Aug 2, 2023
### What changes were proposed in this pull request?

Function `removeResponsesUntilId` is used by `ReleaseExecute` RPC, and that needs to be synchronized against `removeCachedResponses` running from `consumeResponse` for `ExecutePlan` or `ReattachExecute` RPC.

In general, all public accesses to ExecuteResponseObserver should best be synchronized.

### Why are the changes needed?

Fix synchronization bug caught by testing of python client.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Caught in #42235, but want to fix separately because this is a server side change.

Closes #42299 from juliuszsompolski/SPARK-44637.

Authored-by: Juliusz Sompolski <julek@databricks.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
hvanhovell pushed a commit that referenced this pull request Aug 2, 2023
### What changes were proposed in this pull request?

Function `removeResponsesUntilId` is used by `ReleaseExecute` RPC, and that needs to be synchronized against `removeCachedResponses` running from `consumeResponse` for `ExecutePlan` or `ReattachExecute` RPC.

In general, all public accesses to ExecuteResponseObserver should best be synchronized.

### Why are the changes needed?

Fix synchronization bug caught by testing of python client.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Caught in #42235, but want to fix separately because this is a server side change.

Closes #42299 from juliuszsompolski/SPARK-44637.

Authored-by: Juliusz Sompolski <julek@databricks.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
(cherry picked from commit 26c7e55)
Signed-off-by: Herman van Hovell <herman@databricks.com>
@juliuszsompolski juliuszsompolski force-pushed the SPARK-44599 branch 2 times, most recently from 7cd1941 to 68eed71 Compare August 2, 2023 19:23
Co-authored-by: Juliusz Sompolski <julek@databricks.com>
@HyukjinKwon
Copy link
Member Author

Merged to master and branch-3.5.

HyukjinKwon added a commit that referenced this pull request Aug 2, 2023
…ing execute in Spark Connect

### What changes were proposed in this pull request?

This PR proposes to implement the Python client side for #42228.

Basically this PR applies the same changes of `ExecutePlanResponseReattachableIterator`, and `SparkConnectClient` to PySpark as  the symmetry.

### Why are the changes needed?

To enable the same feature in #42228

### Does this PR introduce _any_ user-facing change?

Yes, see #42228.

### How was this patch tested?

Existing unittests because it enables the feature by default. Also, manual E2E tests.

Closes #42235 from HyukjinKwon/SPARK-44599.

Lead-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 68d8e65)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
HyukjinKwon added a commit that referenced this pull request Aug 2, 2023
…aries after checking dependencies

### What changes were proposed in this pull request?

This PR is a followup of #42235 that fixes the Connect related import after the dependency checking.

### Why are the changes needed?

In order to show the end users a nice error message for optional dependencies instead of just saying that they were not found.

### Does this PR introduce _any_ user-facing change?

No, the PR has not been released out yet.

### How was this patch tested?

Manually.

Closes #42311 from HyukjinKwon/SPARK-44424-folowup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
HyukjinKwon added a commit that referenced this pull request Aug 2, 2023
…aries after checking dependencies

### What changes were proposed in this pull request?

This PR is a followup of #42235 that fixes the Connect related import after the dependency checking.

### Why are the changes needed?

In order to show the end users a nice error message for optional dependencies instead of just saying that they were not found.

### Does this PR introduce _any_ user-facing change?

No, the PR has not been released out yet.

### How was this patch tested?

Manually.

Closes #42311 from HyukjinKwon/SPARK-44424-folowup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 7b9057b)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Comment on lines +175 to +176
conf.set("spark.connect.execute.reattachable.senderMaxStreamDuration", "1s")
conf.set("spark.connect.execute.reattachable.senderMaxStreamSize", "123")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon I saw this error when running unit tests locally: org.apache.spark.sql.AnalysisException: Cannot modify the value of a static config: spark.connect.execute.reattachable.senderMaxStreamDuration.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should be fixed in the lastest master branch.

@HyukjinKwon HyukjinKwon deleted the SPARK-44599 branch January 15, 2024 00:48
ragnarok56 pushed a commit to ragnarok56/spark that referenced this pull request Mar 2, 2024
### What changes were proposed in this pull request?

Function `removeResponsesUntilId` is used by `ReleaseExecute` RPC, and that needs to be synchronized against `removeCachedResponses` running from `consumeResponse` for `ExecutePlan` or `ReattachExecute` RPC.

In general, all public accesses to ExecuteResponseObserver should best be synchronized.

### Why are the changes needed?

Fix synchronization bug caught by testing of python client.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Caught in apache#42235, but want to fix separately because this is a server side change.

Closes apache#42299 from juliuszsompolski/SPARK-44637.

Authored-by: Juliusz Sompolski <julek@databricks.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
ragnarok56 pushed a commit to ragnarok56/spark that referenced this pull request Mar 2, 2024
…ing execute in Spark Connect

### What changes were proposed in this pull request?

This PR proposes to implement the Python client side for apache#42228.

Basically this PR applies the same changes of `ExecutePlanResponseReattachableIterator`, and `SparkConnectClient` to PySpark as  the symmetry.

### Why are the changes needed?

To enable the same feature in apache#42228

### Does this PR introduce _any_ user-facing change?

Yes, see apache#42228.

### How was this patch tested?

Existing unittests because it enables the feature by default. Also, manual E2E tests.

Closes apache#42235 from HyukjinKwon/SPARK-44599.

Lead-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
ragnarok56 pushed a commit to ragnarok56/spark that referenced this pull request Mar 2, 2024
…aries after checking dependencies

### What changes were proposed in this pull request?

This PR is a followup of apache#42235 that fixes the Connect related import after the dependency checking.

### Why are the changes needed?

In order to show the end users a nice error message for optional dependencies instead of just saying that they were not found.

### Does this PR introduce _any_ user-facing change?

No, the PR has not been released out yet.

### How was this patch tested?

Manually.

Closes apache#42311 from HyukjinKwon/SPARK-44424-folowup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants