diff --git a/google/api_core/extended_operation.py b/google/api_core/extended_operation.py new file mode 100644 index 00000000..209f1213 --- /dev/null +++ b/google/api_core/extended_operation.py @@ -0,0 +1,206 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Futures for extended long-running operations returned from Google Cloud APIs. + +These futures can be used to synchronously wait for the result of a +long-running operations using :meth:`ExtendedOperation.result`: + +.. code-block:: python + + extended_operation = my_api_client.long_running_method() + + extended_operation.result() + +Or asynchronously using callbacks and :meth:`Operation.add_done_callback`: + +.. code-block:: python + + extended_operation = my_api_client.long_running_method() + + def my_callback(ex_op): + print(f"Operation {ex_op.name} completed") + + extended_operation.add_done_callback(my_callback) + +""" + +import threading + +from google.api_core import exceptions +from google.api_core.future import polling + + +class ExtendedOperation(polling.PollingFuture): + """An ExtendedOperation future for interacting with a Google API Long-Running Operation. + + Args: + extended_operation (proto.Message): The initial operation. + refresh (Callable[[], type(extended_operation)]): A callable that returns + the latest state of the operation. + cancel (Callable[[], None]): A callable that tries to cancel the operation. + retry: Optional(google.api_core.retry.Retry): The retry configuration used + when polling. This can be used to control how often :meth:`done` + is polled. Regardless of the retry's ``deadline``, it will be + overridden by the ``timeout`` argument to :meth:`result`. + + Note: Most long-running API methods use google.api_core.operation.Operation + This class is a wrapper for a subset of methods that use alternative + Long-Running Operation (LRO) semantics. + + Note: there is not a concrete type the extended operation must be. + It MUST have fields that correspond to the following, POSSIBLY WITH DIFFERENT NAMES: + * name: str + * status: Union[str, bool, enum.Enum] + * error_code: int + * error_message: str + """ + + def __init__( + self, extended_operation, refresh, cancel, retry=polling.DEFAULT_RETRY + ): + super().__init__(retry=retry) + self._extended_operation = extended_operation + self._refresh = refresh + self._cancel = cancel + # Note: the extended operation does not give a good way to indicate cancellation. + # We make do with manually tracking cancellation and checking for doneness. + self._cancelled = False + self._completion_lock = threading.Lock() + # Invoke in case the operation came back already complete. + self._handle_refreshed_operation() + + # Note: the following four properties MUST be overridden in a subclass + # if, and only if, the fields in the corresponding extended operation message + # have different names. + # + # E.g. we have an extended operation class that looks like + # + # class MyOperation(proto.Message): + # moniker = proto.Field(proto.STRING, number=1) + # status_msg = proto.Field(proto.STRING, number=2) + # optional http_error_code = proto.Field(proto.INT32, number=3) + # optional http_error_msg = proto.Field(proto.STRING, number=4) + # + # the ExtendedOperation subclass would provide property overrrides that map + # to these (poorly named) fields. + @property + def name(self): + return self._extended_operation.name + + @property + def status(self): + return self._extended_operation.status + + @property + def error_code(self): + return self._extended_operation.error_code + + @property + def error_message(self): + return self._extended_operation.error_message + + def done(self, retry=polling.DEFAULT_RETRY): + self._refresh_and_update(retry) + return self._extended_operation.done + + def cancel(self): + if self.done(): + return False + + self._cancel() + self._cancelled = True + return True + + def cancelled(self): + # TODO(dovs): there is not currently a good way to determine whether the + # operation has been cancelled. + # The best we can do is manually keep track of cancellation + # and check for doneness. + if not self._cancelled: + return False + + self._refresh_and_update() + return self._extended_operation.done + + def _refresh_and_update(self, retry=polling.DEFAULT_RETRY): + if not self._extended_operation.done: + self._extended_operation = self._refresh(retry=retry) + self._handle_refreshed_operation() + + def _handle_refreshed_operation(self): + with self._completion_lock: + if not self._extended_operation.done: + return + + if self.error_code and self.error_message: + exception = exceptions.from_http_status( + status_code=self.error_code, + message=self.error_message, + response=self._extended_operation, + ) + self.set_exception(exception) + elif self.error_code or self.error_message: + exception = exceptions.GoogleAPICallError( + f"Unexpected error {self.error_code}: {self.error_message}" + ) + self.set_exception(exception) + else: + # Extended operations have no payload. + self.set_result(None) + + @classmethod + def make(cls, refresh, cancel, extended_operation, **kwargs): + """ + Return an instantiated ExtendedOperation (or child) that wraps + * a refresh callable + * a cancel callable (can be a no-op) + * an initial result + + .. note:: + It is the caller's responsibility to set up refresh and cancel + with their correct request argument. + The reason for this is that the services that use Extended Operations + have rpcs that look something like the following: + + // service.proto + service MyLongService { + rpc StartLongTask(StartLongTaskRequest) returns (ExtendedOperation) { + option (google.cloud.operation_service) = "CustomOperationService"; + } + } + + service CustomOperationService { + rpc Get(GetOperationRequest) returns (ExtendedOperation) { + option (google.cloud.operation_polling_method) = true; + } + } + + Any info needed for the poll, e.g. a name, path params, etc. + is held in the request, which the initial client method is in a much + better position to make made because the caller made the initial request. + + TL;DR: the caller sets up closures for refresh and cancel that carry + the properly configured requests. + + Args: + refresh (Callable[Optional[Retry]][type(extended_operation)]): A callable that + returns the latest state of the operation. + cancel (Callable[][Any]): A callable that tries to cancel the operation + on a best effort basis. + extended_operation (Any): The initial response of the long running method. + See the docstring for ExtendedOperation.__init__ for requirements on + the type and fields of extended_operation + """ + return cls(extended_operation, refresh, cancel, **kwargs) diff --git a/noxfile.py b/noxfile.py index 003a276d..e02a1280 100644 --- a/noxfile.py +++ b/noxfile.py @@ -92,7 +92,7 @@ def default(session, install_grpc=True): ) # Install all test dependencies, then install this package in-place. - session.install("mock", "pytest", "pytest-cov") + session.install("dataclasses", "mock", "pytest", "pytest-cov", "pytest-xdist") if install_grpc: session.install("-e", ".[grpc]", "-c", constraints_path) else: @@ -102,28 +102,36 @@ def default(session, install_grpc=True): "python", "-m", "py.test", - "--quiet", - "--cov=google.api_core", - "--cov=tests.unit", - "--cov-append", - "--cov-config=.coveragerc", - "--cov-report=", - "--cov-fail-under=0", - os.path.join("tests", "unit"), + *( + # Helpful for running a single test or testfile. + session.posargs + or [ + "--quiet", + "--cov=google.api_core", + "--cov=tests.unit", + "--cov-append", + "--cov-config=.coveragerc", + "--cov-report=", + "--cov-fail-under=0", + # Running individual tests with parallelism enabled is usually not helpful. + "-n=auto", + os.path.join("tests", "unit"), + ] + ), ] - pytest_args.extend(session.posargs) # Inject AsyncIO content and proto-plus, if version >= 3.6. # proto-plus is needed for a field mask test in test_protobuf_helpers.py if _greater_or_equal_than_36(session.python): session.install("asyncmock", "pytest-asyncio", "proto-plus") - pytest_args.append("--cov=tests.asyncio") - pytest_args.append(os.path.join("tests", "asyncio")) - session.run(*pytest_args) - else: - # Run py.test against the unit tests. - session.run(*pytest_args) + # Having positional arguments means the user wants to run specific tests. + # Best not to add additional tests to that list. + if not session.posargs: + pytest_args.append("--cov=tests.asyncio") + pytest_args.append(os.path.join("tests", "asyncio")) + + session.run(*pytest_args) @nox.session(python=["3.6", "3.7", "3.8", "3.9", "3.10"]) @@ -171,7 +179,11 @@ def mypy(session): """Run type-checking.""" session.install(".[grpc, grpcgcp]", "mypy") session.install( - "types-setuptools", "types-requests", "types-protobuf", "types-mock" + "types-setuptools", + "types-requests", + "types-protobuf", + "types-mock", + "types-dataclasses", ) session.run("mypy", "google", "tests") diff --git a/tests/unit/test_extended_operation.py b/tests/unit/test_extended_operation.py new file mode 100644 index 00000000..7fcebed8 --- /dev/null +++ b/tests/unit/test_extended_operation.py @@ -0,0 +1,182 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import dataclasses +import enum +import typing + +import mock +import pytest + +from google.api_core import exceptions +from google.api_core import extended_operation +from google.api_core import retry + +TEST_OPERATION_NAME = "test/extended_operation" + + +@dataclasses.dataclass(frozen=True) +class CustomOperation: + class StatusCode(enum.Enum): + UNKNOWN = 0 + DONE = 1 + PENDING = 2 + + name: str + status: StatusCode + error_code: typing.Optional[int] = None + error_message: typing.Optional[str] = None + + # Note: in generated clients, this property must be generated for each + # extended operation message type. + # The status may be an enum, a string, or a bool. If it's a string or enum, + # its text is compared to the string "DONE". + @property + def done(self): + return self.status.name == "DONE" + + +def make_extended_operation(responses=None): + client_operations_responses = responses or [ + CustomOperation( + name=TEST_OPERATION_NAME, status=CustomOperation.StatusCode.PENDING + ) + ] + + refresh = mock.Mock(spec=["__call__"], side_effect=client_operations_responses) + refresh.responses = client_operations_responses + cancel = mock.Mock(spec=["__call__"]) + extended_operation_future = extended_operation.ExtendedOperation.make( + refresh, cancel, client_operations_responses[0], + ) + + return extended_operation_future, refresh, cancel + + +def test_constructor(): + ex_op, refresh, _ = make_extended_operation() + assert ex_op._extended_operation == refresh.responses[0] + assert ex_op.cancelled() is False + assert ex_op.done() is False + assert ex_op.name == TEST_OPERATION_NAME + assert ex_op.status == CustomOperation.StatusCode.PENDING + assert ex_op.error_code is None + assert ex_op.error_message is None + + +def test_done(): + responses = [ + CustomOperation( + name=TEST_OPERATION_NAME, status=CustomOperation.StatusCode.PENDING + ), + # Second response indicates that the operation has finished. + CustomOperation( + name=TEST_OPERATION_NAME, status=CustomOperation.StatusCode.DONE + ), + # Bumper to make sure we stop polling on DONE. + CustomOperation( + name=TEST_OPERATION_NAME, + status=CustomOperation.StatusCode.DONE, + error_message="Gone too far!", + ), + ] + ex_op, refresh, _ = make_extended_operation(responses) + + # Start out not done. + assert not ex_op.done() + assert refresh.call_count == 1 + + # Refresh brings us to the done state. + assert ex_op.done() + assert refresh.call_count == 2 + assert not ex_op.error_message + + # Make sure that subsequent checks are no-ops. + assert ex_op.done() + assert refresh.call_count == 2 + assert not ex_op.error_message + + +def test_cancellation(): + responses = [ + CustomOperation( + name=TEST_OPERATION_NAME, status=CustomOperation.StatusCode.PENDING + ), + # Second response indicates that the operation was cancelled. + CustomOperation( + name=TEST_OPERATION_NAME, status=CustomOperation.StatusCode.DONE + ), + ] + ex_op, _, cancel = make_extended_operation(responses) + + assert not ex_op.cancelled() + + assert ex_op.cancel() + assert ex_op.cancelled() + cancel.assert_called_once_with() + + # Cancelling twice should have no effect. + assert not ex_op.cancel() + cancel.assert_called_once_with() + + +def test_done_w_retry(): + # Not sure what's going on here with the coverage, so just ignore it. + test_retry = retry.Retry(predicate=lambda x: True) # pragma: NO COVER + + responses = [ + CustomOperation( + name=TEST_OPERATION_NAME, status=CustomOperation.StatusCode.PENDING + ), + CustomOperation( + name=TEST_OPERATION_NAME, status=CustomOperation.StatusCode.DONE + ), + ] + + ex_op, refresh, _ = make_extended_operation(responses) + + ex_op.done(retry=test_retry) + + refresh.assert_called_once_with(retry=test_retry) + + +def test_error(): + responses = [ + CustomOperation( + name=TEST_OPERATION_NAME, + status=CustomOperation.StatusCode.DONE, + error_code=400, + error_message="Bad request", + ), + ] + + ex_op, _, _ = make_extended_operation(responses) + + # Defaults to CallError when grpc is not installed + with pytest.raises(exceptions.BadRequest): + ex_op.result() + + # Inconsistent result + responses = [ + CustomOperation( + name=TEST_OPERATION_NAME, + status=CustomOperation.StatusCode.DONE, + error_code=2112, + ), + ] + + ex_op, _, _ = make_extended_operation(responses) + + with pytest.raises(exceptions.GoogleAPICallError): + ex_op.result()