-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-44424][CONNECT][PYTHON] Python client for reattaching to existing execute in Spark Connect #42235
Conversation
It's tracked by https://issues.apache.org/jira/browse/SPARK-44424 already as well. |
1d99fd2
to
f4ea862
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, but someone knowing more python should review.
Question: There are python test that work E2E, i.e. connect the client with a separate server?
Could we pass configs like in https://github.com/apache/spark/pull/42228/files#diff-c389e865d11bc54f29d61207c50ced1f9561de1488865eaf11caf789dd6003c2R144 for the testing, so that E2E testing exercises reattaching?
def close(self) -> None: | ||
return super().close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
on close, it should do release_all if it wasn't done.
in scala the Iterator unfortunately doesn't have a close, but if python has it, it should do it.
# Configure logging for the SparkConnect client. | ||
|
||
def disable_reattachable_execute(self) -> "SparkConnectClient": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will probably need some flagging to avoid that this is done while a query is currently being executed so that we know what to do. Or if the information returned from the query is enough to independently understand if it needs to be resumed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used only when creating ExecutePlanResponseReattachableIterator, and once that's created, it will know what to do even if this gets flipped.
assert ret is not None | ||
|
||
self._last_returned_response_id = ret.response_id | ||
if ret.result_complete: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would add a comment that this assumes that this is part of the oneof and set to none if it's not set in this response
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I just used a different method to make this clear. (will push them soon)
return ret | ||
|
||
def _has_next(self) -> bool: | ||
from pyspark.sql.connect.client.core import SparkConnectClient |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the import here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it causes a circular import (because SparkConnectClient
also imports ExecutePlanResponseReattachableIterator
)
return False | ||
else: | ||
first_try = True | ||
for attempt in Retrying( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
attempt is of type AttemptManager
which contains a _retry_state
that has a _count. We can make the retry state visible in the attempt manager if you want to avoid having a first try variable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, this is the time that this context manager shines :-)
can_retry=SparkConnectClient.retry_exception, **self._retry_policy | ||
): | ||
with attempt: | ||
if first_try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or maybe attempt.first_try
(added to attempt manager)
self._result_complete = False | ||
|
||
# Initial iterator comes from ExecutePlan request. | ||
self._iterator: Iterator[pb2.ExecutePlanResponse] = self._stub.ExecutePlan( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So in the case of the target resource throwing a retriable error, this will already raise an exception and break the retry logic. It looks like the current retry handling can only deal with subsequent errors in the handling of the response stream but not the initial error itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@juliuszsompolski is this the same for the current scala client?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least in scala, calling the executePlan
never throws error, even if the service is absolutely not available and one could expect immediate error; I tested that with completely killing the server, then calling executePlan, and never getting an actual error before trying to call next()... The error is only thrown on the first next()
or hasNext()
call of the resulting iterator. So in scala not catching and retrying here, but at the first iterator invocation is fine. So in https://github.com/apache/spark/pull/42228/files#diff-2efc03ab3edd5e5f78652b1454b31fd4917113de09a64fbdc7352f150c685628R89 and also in https://github.com/apache/spark/pull/41829/files#diff-79d952ace19b8b9d4b04e010653aba35e4aeaca957946cc68414b7a766deea99R68 doesn't wrap the first call
we do not retry.
But it's a good callout that it needs checking if python GRPC behaves the same, or maybe just defensively wrap it in retry anyway (and do the same in scala just in case it's a GRPC internal quirk that we shouldn't depend on), or at least it deserves a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I raised #42274 to be defensive about it in scala.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... but what I write here implies that if the error was indeed during the initial ExecutePlan that didn't reach the server, and we catch it on the first hasNext() later, then Reattach won't work, because the query won't be on the server, so it will fail with INVALID_HANDLE.OPERATION_NOT_FOUND. So this indeed means that the case it won't work if retry of the initial ExecutePlan is needed, and wrapping the retry here won't help for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
retrying the failure of initial operation starting is tricky, because:
- if the initial operation reached the server, retry of ExecutePlan would fail with INVALID_HANDLE.OPERATION_ALREADY_EXISTS; in that case ReattachPlan should be used. And this is what happens here.
- but if the initial operation didn't reach the server, then the ReattachPlan will fail with INVALID_HANDLE.OPERATION_NOT_FOUND. This could be improved by making it switch to retrying the ExecutePlan then
But I think this is a border case that can be a followup or decided it's not worth it.
I believe in the state as is, we will get an INVALID_HANDLE.OPERATION_NOT_FOUND error in case the original ExecutePlan never reached the server. In that case, we could improve it by trying to call ExecutePlan again. But that would have to be caught not here, but during the next() calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So now I have 3 PRs:
- [SPARK-44421][CONNECT][FOLLOWUP] Minor comment improvements #42281 adds some comments in scala to explain that GRPC throws errors deferred to the iterator functions, and the limitation of retry of the original execute didn't reach the server
- [SPARK-44624][CONNECT] Retry ExecutePlan in case initial request didn't reach server #42282 fixes the retry in case original execute didn't reach the server
- [SPARK-44624][CONNECT] Retry ExecutePlan in case initial request didn't reach server overkill #42274 does that plus does overkill retries in case the assumption that calling executePlan and reattachExecute won't throw is wrong.
I would suggest merging the first, and discussing this corner case later.
# there is more, and we need to reattach. While ResponseComplete didn't | ||
# arrive, we keep reattaching. | ||
first_loop = True | ||
if not has_next and not self._result_complete: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just readability.
if not has_next and not self._result_complete: | |
if not self._result_complete and not has_next: |
first_loop = True | ||
if not has_next and not self._result_complete: | ||
while not has_next or first_loop: | ||
self._iterator = self._stub.ReattachExecute( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could raise itself with a retriable error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like in https://github.com/apache/spark/pull/42235/files#r1281120432, in scala it wouldn't, but we could be defensive about it.
7d91fb2
to
90555e8
Compare
d040c77
to
eb392ab
Compare
(oops mistakenly removed the comment after applying the change) |
9e04420
to
f1c59e6
Compare
### What changes were proposed in this pull request? Function `removeResponsesUntilId` is used by `ReleaseExecute` RPC, and that needs to be synchronized against `removeCachedResponses` running from `consumeResponse` for `ExecutePlan` or `ReattachExecute` RPC. In general, all public accesses to ExecuteResponseObserver should best be synchronized. ### Why are the changes needed? Fix synchronization bug caught by testing of python client. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Caught in #42235, but want to fix separately because this is a server side change. Closes #42299 from juliuszsompolski/SPARK-44637. Authored-by: Juliusz Sompolski <julek@databricks.com> Signed-off-by: Herman van Hovell <herman@databricks.com>
### What changes were proposed in this pull request? Function `removeResponsesUntilId` is used by `ReleaseExecute` RPC, and that needs to be synchronized against `removeCachedResponses` running from `consumeResponse` for `ExecutePlan` or `ReattachExecute` RPC. In general, all public accesses to ExecuteResponseObserver should best be synchronized. ### Why are the changes needed? Fix synchronization bug caught by testing of python client. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Caught in #42235, but want to fix separately because this is a server side change. Closes #42299 from juliuszsompolski/SPARK-44637. Authored-by: Juliusz Sompolski <julek@databricks.com> Signed-off-by: Herman van Hovell <herman@databricks.com> (cherry picked from commit 26c7e55) Signed-off-by: Herman van Hovell <herman@databricks.com>
7cd1941
to
68eed71
Compare
Co-authored-by: Juliusz Sompolski <julek@databricks.com>
68eed71
to
e786144
Compare
Merged to master and branch-3.5. |
…ing execute in Spark Connect ### What changes were proposed in this pull request? This PR proposes to implement the Python client side for #42228. Basically this PR applies the same changes of `ExecutePlanResponseReattachableIterator`, and `SparkConnectClient` to PySpark as the symmetry. ### Why are the changes needed? To enable the same feature in #42228 ### Does this PR introduce _any_ user-facing change? Yes, see #42228. ### How was this patch tested? Existing unittests because it enables the feature by default. Also, manual E2E tests. Closes #42235 from HyukjinKwon/SPARK-44599. Lead-authored-by: Hyukjin Kwon <gurwls223@apache.org> Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit 68d8e65) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…aries after checking dependencies ### What changes were proposed in this pull request? This PR is a followup of #42235 that fixes the Connect related import after the dependency checking. ### Why are the changes needed? In order to show the end users a nice error message for optional dependencies instead of just saying that they were not found. ### Does this PR introduce _any_ user-facing change? No, the PR has not been released out yet. ### How was this patch tested? Manually. Closes #42311 from HyukjinKwon/SPARK-44424-folowup. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…aries after checking dependencies ### What changes were proposed in this pull request? This PR is a followup of #42235 that fixes the Connect related import after the dependency checking. ### Why are the changes needed? In order to show the end users a nice error message for optional dependencies instead of just saying that they were not found. ### Does this PR introduce _any_ user-facing change? No, the PR has not been released out yet. ### How was this patch tested? Manually. Closes #42311 from HyukjinKwon/SPARK-44424-folowup. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit 7b9057b) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
conf.set("spark.connect.execute.reattachable.senderMaxStreamDuration", "1s") | ||
conf.set("spark.connect.execute.reattachable.senderMaxStreamSize", "123") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon I saw this error when running unit tests locally: org.apache.spark.sql.AnalysisException: Cannot modify the value of a static config: spark.connect.execute.reattachable.senderMaxStreamDuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That should be fixed in the lastest master branch.
### What changes were proposed in this pull request? Function `removeResponsesUntilId` is used by `ReleaseExecute` RPC, and that needs to be synchronized against `removeCachedResponses` running from `consumeResponse` for `ExecutePlan` or `ReattachExecute` RPC. In general, all public accesses to ExecuteResponseObserver should best be synchronized. ### Why are the changes needed? Fix synchronization bug caught by testing of python client. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Caught in apache#42235, but want to fix separately because this is a server side change. Closes apache#42299 from juliuszsompolski/SPARK-44637. Authored-by: Juliusz Sompolski <julek@databricks.com> Signed-off-by: Herman van Hovell <herman@databricks.com>
…ing execute in Spark Connect ### What changes were proposed in this pull request? This PR proposes to implement the Python client side for apache#42228. Basically this PR applies the same changes of `ExecutePlanResponseReattachableIterator`, and `SparkConnectClient` to PySpark as the symmetry. ### Why are the changes needed? To enable the same feature in apache#42228 ### Does this PR introduce _any_ user-facing change? Yes, see apache#42228. ### How was this patch tested? Existing unittests because it enables the feature by default. Also, manual E2E tests. Closes apache#42235 from HyukjinKwon/SPARK-44599. Lead-authored-by: Hyukjin Kwon <gurwls223@apache.org> Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…aries after checking dependencies ### What changes were proposed in this pull request? This PR is a followup of apache#42235 that fixes the Connect related import after the dependency checking. ### Why are the changes needed? In order to show the end users a nice error message for optional dependencies instead of just saying that they were not found. ### Does this PR introduce _any_ user-facing change? No, the PR has not been released out yet. ### How was this patch tested? Manually. Closes apache#42311 from HyukjinKwon/SPARK-44424-folowup. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
What changes were proposed in this pull request?
This PR proposes to implement the Python client side for #42228.
Basically this PR applies the same changes of
ExecutePlanResponseReattachableIterator
, andSparkConnectClient
to PySpark as the symmetry.Why are the changes needed?
To enable the same feature in #42228
Does this PR introduce any user-facing change?
Yes, see #42228.
How was this patch tested?
Existing unittests because it enables the feature by default. Also, manual E2E tests.