From 2fb6e1533192ae81dceee5c71283169a0a85a015 Mon Sep 17 00:00:00 2001 From: Prad Nelluru Date: Fri, 4 Mar 2022 09:42:08 -0500 Subject: [PATCH] feat: exactly-once delivery support (#550) --- google/cloud/pubsub_v1/proto/pubsub.proto | 1 + .../subscriber/_protocol/dispatcher.py | 145 ++- .../pubsub_v1/subscriber/_protocol/leaser.py | 7 +- .../subscriber/_protocol/requests.py | 7 + .../_protocol/streaming_pull_manager.py | 289 +++++- .../cloud/pubsub_v1/subscriber/exceptions.py | 44 + google/cloud/pubsub_v1/subscriber/futures.py | 45 +- google/cloud/pubsub_v1/subscriber/message.py | 190 +++- google/cloud/pubsub_v1/types.py | 7 + noxfile.py | 10 +- setup.py | 1 + .../pubsub_v1/subscriber/test_dispatcher.py | 228 ++++- .../subscriber/test_futures_subscriber.py | 31 + .../unit/pubsub_v1/subscriber/test_leaser.py | 23 +- .../unit/pubsub_v1/subscriber/test_message.py | 103 ++- .../subscriber/test_messages_on_hold.py | 8 +- .../subscriber/test_streaming_pull_manager.py | 850 +++++++++++++++++- .../subscriber/test_subscriber_client.py | 2 +- 18 files changed, 1837 insertions(+), 154 deletions(-) create mode 100644 google/cloud/pubsub_v1/subscriber/exceptions.py diff --git a/google/cloud/pubsub_v1/proto/pubsub.proto b/google/cloud/pubsub_v1/proto/pubsub.proto index c5cb855d6..716c7ba05 100644 --- a/google/cloud/pubsub_v1/proto/pubsub.proto +++ b/google/cloud/pubsub_v1/proto/pubsub.proto @@ -1164,6 +1164,7 @@ message StreamingPullRequest { message StreamingPullResponse { // Subscription properties sent as part of the response. message SubscriptionProperties { + bool exactly_once_delivery_enabled = 1; // True iff message ordering is enabled for this subscription. bool message_ordering_enabled = 2; } diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py index 885210fc6..6ab5165d1 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py @@ -15,17 +15,19 @@ from __future__ import absolute_import from __future__ import division +import functools import itertools import logging import math +import time import threading import typing from typing import List, Optional, Sequence, Union import warnings +from google.api_core.retry import exponential_sleep_generator from google.cloud.pubsub_v1.subscriber._protocol import helper_threads from google.cloud.pubsub_v1.subscriber._protocol import requests -from google.pubsub_v1 import types as gapic_types if typing.TYPE_CHECKING: # pragma: NO COVER import queue @@ -66,6 +68,14 @@ IDs at a time. """ +_MIN_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS = 1 +"""The time to wait for the first retry of failed acks and modacks when exactly-once +delivery is enabled.""" + +_MAX_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS = 10 * 60 +"""The maximum amount of time in seconds to retry failed acks and modacks when +exactly-once delivery is enabled.""" + class Dispatcher(object): def __init__(self, manager: "StreamingPullManager", queue: "queue.Queue"): @@ -168,17 +178,66 @@ def ack(self, items: Sequence[requests.AckRequest]) -> None: # We must potentially split the request into multiple smaller requests # to avoid the server-side max request size limit. - ack_ids = (item.ack_id for item in items) + items_gen = iter(items) + ack_ids_gen = (item.ack_id for item in items) total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE)) for _ in range(total_chunks): - request = gapic_types.StreamingPullRequest( - ack_ids=itertools.islice(ack_ids, _ACK_IDS_BATCH_SIZE) + ack_reqs_dict = { + req.ack_id: req + for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE) + } + requests_completed, requests_to_retry = self._manager.send_unary_ack( + ack_ids=list(itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE)), + ack_reqs_dict=ack_reqs_dict, + ) + + # Remove the completed messages from lease management. + self.drop(requests_completed) + + # Retry on a separate thread so the dispatcher thread isn't blocked + # by sleeps. + if requests_to_retry: + self._start_retry_thread( + "Thread-RetryAcks", + functools.partial(self._retry_acks, requests_to_retry), + ) + + def _start_retry_thread(self, thread_name, thread_target): + # note: if the thread is *not* a daemon, a memory leak exists due to a cpython issue. + # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-829910303 + # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-830092418 + retry_thread = threading.Thread( + name=thread_name, target=thread_target, daemon=True, + ) + # The thread finishes when the requests succeed or eventually fail with + # a back-end timeout error or other permanent failure. + retry_thread.start() + + def _retry_acks(self, requests_to_retry): + retry_delay_gen = exponential_sleep_generator( + initial=_MIN_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS, + maximum=_MAX_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS, + ) + while requests_to_retry: + time_to_wait = next(retry_delay_gen) + _LOGGER.debug( + "Retrying {len(requests_to_retry)} ack(s) after delay of " + + str(time_to_wait) + + " seconds" ) - self._manager.send(request) + time.sleep(time_to_wait) - # Remove the message from lease management. - self.drop(items) + ack_reqs_dict = {req.ack_id: req for req in requests_to_retry} + requests_completed, requests_to_retry = self._manager.send_unary_ack( + ack_ids=[req.ack_id for req in requests_to_retry], + ack_reqs_dict=ack_reqs_dict, + ) + assert ( + len(requests_to_retry) <= _ACK_IDS_BATCH_SIZE + ), "Too many requests to be retried." + # Remove the completed messages from lease management. + self.drop(requests_completed) def drop( self, @@ -215,16 +274,58 @@ def modify_ack_deadline(self, items: Sequence[requests.ModAckRequest]) -> None: """ # We must potentially split the request into multiple smaller requests # to avoid the server-side max request size limit. - ack_ids = (item.ack_id for item in items) - seconds = (item.seconds for item in items) + items_gen = iter(items) + ack_ids_gen = (item.ack_id for item in items) + deadline_seconds_gen = (item.seconds for item in items) total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE)) for _ in range(total_chunks): - request = gapic_types.StreamingPullRequest( - modify_deadline_ack_ids=itertools.islice(ack_ids, _ACK_IDS_BATCH_SIZE), - modify_deadline_seconds=itertools.islice(seconds, _ACK_IDS_BATCH_SIZE), + ack_reqs_dict = { + req.ack_id: req + for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE) + } + # no further work needs to be done for `requests_to_retry` + requests_completed, requests_to_retry = self._manager.send_unary_modack( + modify_deadline_ack_ids=list( + itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE) + ), + modify_deadline_seconds=list( + itertools.islice(deadline_seconds_gen, _ACK_IDS_BATCH_SIZE) + ), + ack_reqs_dict=ack_reqs_dict, + ) + assert ( + len(requests_to_retry) <= _ACK_IDS_BATCH_SIZE + ), "Too many requests to be retried." + + # Retry on a separate thread so the dispatcher thread isn't blocked + # by sleeps. + if requests_to_retry: + self._start_retry_thread( + "Thread-RetryModAcks", + functools.partial(self._retry_modacks, requests_to_retry), + ) + + def _retry_modacks(self, requests_to_retry): + retry_delay_gen = exponential_sleep_generator( + initial=_MIN_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS, + maximum=_MAX_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS, + ) + while requests_to_retry: + time_to_wait = next(retry_delay_gen) + _LOGGER.debug( + "Retrying {len(requests_to_retry)} modack(s) after delay of " + + str(time_to_wait) + + " seconds" + ) + time.sleep(time_to_wait) + + ack_reqs_dict = {req.ack_id: req for req in requests_to_retry} + requests_completed, requests_to_retry = self._manager.send_unary_modack( + modify_deadline_ack_ids=[req.ack_id for req in requests_to_retry], + modify_deadline_seconds=[req.seconds for req in requests_to_retry], + ack_reqs_dict=ack_reqs_dict, ) - self._manager.send(request) def nack(self, items: Sequence[requests.NackRequest]) -> None: """Explicitly deny receipt of messages. @@ -233,6 +334,20 @@ def nack(self, items: Sequence[requests.NackRequest]) -> None: items: The items to deny. """ self.modify_ack_deadline( - [requests.ModAckRequest(ack_id=item.ack_id, seconds=0) for item in items] + [ + requests.ModAckRequest( + ack_id=item.ack_id, seconds=0, future=item.future + ) + for item in items + ] + ) + self.drop( + [ + requests.DropRequest( + ack_id=item.ack_id, + byte_size=item.byte_size, + ordering_key=item.ordering_key, + ) + for item in items + ] ) - self.drop([requests.DropRequest(*item) for item in items]) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py index bfa1b5a49..de110e992 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py @@ -181,7 +181,7 @@ def maintain_leases(self) -> None: for item in to_drop: leased_messages.pop(item.ack_id) - # Create a streaming pull request. + # Create a modack request. # We do not actually call `modify_ack_deadline` over and over # because it is more efficient to make a single request. ack_ids = leased_messages.keys() @@ -194,9 +194,8 @@ def maintain_leases(self) -> None: # way for ``send_request`` to fail when the consumer # is inactive. assert self._manager.dispatcher is not None - self._manager.dispatcher.modify_ack_deadline( - [requests.ModAckRequest(ack_id, deadline) for ack_id in ack_ids] - ) + ack_id_gen = (ack_id for ack_id in ack_ids) + self._manager._send_lease_modacks(ack_id_gen, deadline) # Now wait an appropriate period of time and do this again. # diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/requests.py b/google/cloud/pubsub_v1/subscriber/_protocol/requests.py index 7481d95a9..9cd387545 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/requests.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/requests.py @@ -12,8 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import typing from typing import NamedTuple, Optional +if typing.TYPE_CHECKING: # pragma: NO COVER + from google.cloud.pubsub_v1.subscriber import futures + # Namedtuples for management requests. Used by the Message class to communicate # items of work back to the policy. @@ -22,6 +26,7 @@ class AckRequest(NamedTuple): byte_size: int time_to_ack: float ordering_key: Optional[str] + future: Optional["futures.Future"] class DropRequest(NamedTuple): @@ -39,9 +44,11 @@ class LeaseRequest(NamedTuple): class ModAckRequest(NamedTuple): ack_id: str seconds: float + future: Optional["futures.Future"] class NackRequest(NamedTuple): ack_id: str byte_size: int ordering_key: Optional[str] + future: Optional["futures.Future"] diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index d207718fc..5a9d08026 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -20,7 +20,7 @@ import logging import threading import typing -from typing import Any, Callable, Iterable, List, Optional, Union +from typing import Any, Callable, Iterable, List, Optional, Tuple, Union import uuid import grpc # type: ignore @@ -34,12 +34,22 @@ from google.cloud.pubsub_v1.subscriber._protocol import leaser from google.cloud.pubsub_v1.subscriber._protocol import messages_on_hold from google.cloud.pubsub_v1.subscriber._protocol import requests +from google.cloud.pubsub_v1.subscriber.exceptions import ( + AcknowledgeError, + AcknowledgeStatus, +) import google.cloud.pubsub_v1.subscriber.message +from google.cloud.pubsub_v1.subscriber import futures from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler from google.pubsub_v1 import types as gapic_types +from grpc_status import rpc_status # type: ignore +from google.rpc.error_details_pb2 import ErrorInfo # type: ignore +from google.rpc import code_pb2 # type: ignore +from google.rpc import status_pb2 if typing.TYPE_CHECKING: # pragma: NO COVER from google.cloud.pubsub_v1 import subscriber + from google.protobuf.internal import containers _LOGGER = logging.getLogger(__name__) @@ -60,6 +70,11 @@ _RESUME_THRESHOLD = 0.8 """The load threshold below which to resume the incoming message stream.""" +_MIN_ACK_DEADLINE_SECS_WHEN_EXACTLY_ONCE_ENABLED = 60 +"""The minimum ack_deadline, in seconds, for when exactly_once is enabled for +a subscription. We do this to reduce premature ack expiration. +""" + def _wrap_as_exception(maybe_exception: Any) -> BaseException: """Wrap an object as a Python exception, if needed. @@ -104,6 +119,86 @@ def _wrap_callback_errors( on_callback_error(exc) +def _get_status(exc: exceptions.GoogleAPICallError,) -> Optional["status_pb2.Status"]: + if not exc.response: + _LOGGER.debug("No response obj in errored RPC call.") + return None + try: + return rpc_status.from_call(exc.response) + # Possible "If the gRPC call’s code or details are inconsistent + # with the status code and message inside of the + # google.rpc.status.Status" + except ValueError: + _LOGGER.debug("ValueError when parsing ErrorInfo.", exc_info=True) + return None + + +def _get_ack_errors( + exc: exceptions.GoogleAPICallError, +) -> Optional["containers.ScalarMap"]: + status = _get_status(exc) + if not status: + _LOGGER.debug("Unable to get status of errored RPC.") + return None + for detail in status.details: + info = ErrorInfo() + if not (detail.Is(ErrorInfo.DESCRIPTOR) and detail.Unpack(info)): + _LOGGER.debug("Unable to unpack ErrorInfo.") + return None + return info.metadata + return None + + +def _process_requests( + error_status: Optional["status_pb2.Status"], + ack_reqs_dict: "containers.ScalarMap", + errors_dict: Optional["containers.ScalarMap"], +): + """Process requests by referring to error_status and errors_dict. + + The errors returned by the server in as `error_status` or in `errors_dict` + are used to complete the request futures in `ack_reqs_dict` (with a success + or exception) or to return requests for further retries. + """ + requests_completed = [] + requests_to_retry = [] + for ack_id in ack_reqs_dict: + if errors_dict and ack_id in errors_dict: + exactly_once_error = errors_dict[ack_id] + if exactly_once_error.startswith("TRANSIENT_"): + requests_to_retry.append(ack_reqs_dict[ack_id]) + else: + if exactly_once_error == "PERMANENT_FAILURE_INVALID_ACK_ID": + exc = AcknowledgeError(AcknowledgeStatus.INVALID_ACK_ID, info=None) + else: + exc = AcknowledgeError(AcknowledgeStatus.OTHER, exactly_once_error) + + future = ack_reqs_dict[ack_id].future + future.set_exception(exc) + requests_completed.append(ack_reqs_dict[ack_id]) + elif error_status: + # Only permanent errors are expected here b/c retriable errors are + # retried at the lower, GRPC level. + if error_status.code == code_pb2.PERMISSION_DENIED: + exc = AcknowledgeError(AcknowledgeStatus.PERMISSION_DENIED, info=None) + elif error_status.code == code_pb2.FAILED_PRECONDITION: + exc = AcknowledgeError(AcknowledgeStatus.FAILED_PRECONDITION, info=None) + else: + exc = AcknowledgeError(AcknowledgeStatus.OTHER, str(error_status)) + future = ack_reqs_dict[ack_id].future + future.set_exception(exc) + requests_completed.append(ack_reqs_dict[ack_id]) + elif ack_reqs_dict[ack_id].future: + future = ack_reqs_dict[ack_id].future + # success + future.set_result(AcknowledgeStatus.SUCCESS) + requests_completed.append(ack_reqs_dict[ack_id]) + else: + requests_completed.append(ack_reqs_dict[ack_id]) + + return requests_completed, requests_to_retry + + class StreamingPullManager(object): """The streaming pull manager coordinates pulling messages from Pub/Sub, leasing them, and scheduling them to be processed. @@ -148,6 +243,7 @@ def __init__( ): self._client = client self._subscription = subscription + self._exactly_once_enabled = False self._flow_control = flow_control self._use_legacy_flow_control = use_legacy_flow_control self._await_callbacks_on_shutdown = await_callbacks_on_shutdown @@ -159,6 +255,8 @@ def __init__( self._closing = threading.Lock() self._closed = False self._close_callbacks: List[Callable[["StreamingPullManager", Any], Any]] = [] + # Guarded by self._exactly_once_enabled_lock + self._send_new_ack_deadline = False # A shutdown thread is created on intentional shutdown. self._regular_shutdown_thread: Optional[threading.Thread] = None @@ -189,6 +287,12 @@ def __init__( # currently on hold. self._pause_resume_lock = threading.Lock() + # A lock guarding the self._exactly_once_enabled variable. We may also + # acquire the self._ack_deadline_lock while this lock is held, but not + # the reverse. So, we maintain a simple ordering of these two locks to + # prevent deadlocks. + self._exactly_once_enabled_lock = threading.Lock() + # A lock protecting the current ACK deadline used in the lease management. This # value can be potentially updated both by the leaser thread and by the message # consumer thread when invoking the internal _on_response() callback. @@ -273,6 +377,22 @@ def _obtain_ack_deadline(self, maybe_update: bool) -> float: histogram.MIN_ACK_DEADLINE, ) self._ack_deadline = min(self._ack_deadline, flow_control_setting) + + # If the user explicitly sets a min ack_deadline, respect it. + if self.flow_control.min_duration_per_lease_extension > 0: + # The setting in flow control could be too high, adjust if needed. + flow_control_setting = min( + self.flow_control.min_duration_per_lease_extension, + histogram.MAX_ACK_DEADLINE, + ) + self._ack_deadline = max(self._ack_deadline, flow_control_setting) + elif self._exactly_once_enabled: + # Higher minimum ack_deadline for subscriptions with + # exactly-once delivery enabled. + self._ack_deadline = max( + self._ack_deadline, _MIN_ACK_DEADLINE_SECS_WHEN_EXACTLY_ONCE_ENABLED + ) + return self._ack_deadline @property @@ -311,7 +431,7 @@ def load(self) -> float: ) def add_close_callback( - self, callback: Callable[["StreamingPullManager", Any], Any], + self, callback: Callable[["StreamingPullManager", Any], Any] ) -> None: """Schedules a callable when the manager closes. @@ -435,49 +555,88 @@ def _schedule_message_on_hold( assert self._callback is not None self._scheduler.schedule(self._callback, msg) - def _send_unary_request(self, request: gapic_types.StreamingPullRequest) -> None: + def send_unary_ack( + self, ack_ids, ack_reqs_dict + ) -> Tuple[List[requests.AckRequest], List[requests.AckRequest]]: """Send a request using a separate unary request instead of over the stream. - Args: - request: The stream request to be mapped into unary requests. + If a RetryError occurs, the manager shutdown is triggered, and the + error is re-raised. """ - if request.ack_ids: - self._client.acknowledge( # type: ignore - subscription=self._subscription, ack_ids=list(request.ack_ids) + assert ack_ids + assert len(ack_ids) == len(ack_reqs_dict) + + error_status = None + ack_errors_dict = None + try: + self._client.acknowledge(subscription=self._subscription, ack_ids=ack_ids) + except exceptions.GoogleAPICallError as exc: + _LOGGER.debug( + "Exception while sending unary RPC. This is typically " + "non-fatal as stream requests are best-effort.", + exc_info=True, ) + error_status = _get_status(exc) + ack_errors_dict = _get_ack_errors(exc) + except exceptions.RetryError as exc: + status = status_pb2.Status() + status.code = code_pb2.DEADLINE_EXCEEDED + # Makes sure to complete futures so they don't block forever. + _process_requests(status, ack_reqs_dict, None) + _LOGGER.debug( + "RetryError while sending unary RPC. Waiting on a transient " + "error resolution for too long, will now trigger shutdown.", + exc_info=False, + ) + # The underlying channel has been suffering from a retryable error + # for too long, time to give up and shut the streaming pull down. + self._on_rpc_done(exc) + raise - if request.modify_deadline_ack_ids: + requests_completed, requests_to_retry = _process_requests( + error_status, ack_reqs_dict, ack_errors_dict + ) + return requests_completed, requests_to_retry + + def send_unary_modack( + self, modify_deadline_ack_ids, modify_deadline_seconds, ack_reqs_dict + ) -> Tuple[List[requests.ModAckRequest], List[requests.ModAckRequest]]: + """Send a request using a separate unary request instead of over the stream. + + If a RetryError occurs, the manager shutdown is triggered, and the + error is re-raised. + """ + assert modify_deadline_ack_ids + + error_status = None + modack_errors_dict = None + try: # Send ack_ids with the same deadline seconds together. deadline_to_ack_ids = collections.defaultdict(list) - for n, ack_id in enumerate(request.modify_deadline_ack_ids): - deadline = request.modify_deadline_seconds[n] + for n, ack_id in enumerate(modify_deadline_ack_ids): + deadline = modify_deadline_seconds[n] deadline_to_ack_ids[deadline].append(ack_id) for deadline, ack_ids in deadline_to_ack_ids.items(): - self._client.modify_ack_deadline( # type: ignore + self._client.modify_ack_deadline( subscription=self._subscription, ack_ids=ack_ids, ack_deadline_seconds=deadline, ) - - _LOGGER.debug("Sent request(s) over unary RPC.") - - def send(self, request: gapic_types.StreamingPullRequest) -> None: - """Queue a request to be sent to the RPC. - - If a RetryError occurs, the manager shutdown is triggered, and the - error is re-raised. - """ - try: - self._send_unary_request(request) - except exceptions.GoogleAPICallError: + except exceptions.GoogleAPICallError as exc: _LOGGER.debug( "Exception while sending unary RPC. This is typically " "non-fatal as stream requests are best-effort.", exc_info=True, ) + error_status = _get_status(exc) + modack_errors_dict = _get_ack_errors(exc) except exceptions.RetryError as exc: + status = status_pb2.Status() + status.code = code_pb2.DEADLINE_EXCEEDED + # Makes sure to complete futures so they don't block forever. + _process_requests(status, ack_reqs_dict, None) _LOGGER.debug( "RetryError while sending unary RPC. Waiting on a transient " "error resolution for too long, will now trigger shutdown.", @@ -488,14 +647,34 @@ def send(self, request: gapic_types.StreamingPullRequest) -> None: self._on_rpc_done(exc) raise + requests_completed, requests_to_retry = _process_requests( + error_status, ack_reqs_dict, modack_errors_dict + ) + return requests_completed, requests_to_retry + def heartbeat(self) -> bool: - """Sends an empty request over the streaming pull RPC. + """Sends a heartbeat request over the streaming pull RPC. + + The request is empty by default, but may contain the current ack_deadline + if the self._exactly_once_enabled flag has changed. Returns: If a heartbeat request has actually been sent. """ if self._rpc is not None and self._rpc.is_active: - self._rpc.send(gapic_types.StreamingPullRequest()) + send_new_ack_deadline = False + with self._exactly_once_enabled_lock: + send_new_ack_deadline = self._send_new_ack_deadline + self._send_new_ack_deadline = False + + if send_new_ack_deadline: + request = gapic_types.StreamingPullRequest( + stream_ack_deadline_seconds=self.ack_deadline + ) + else: + request = gapic_types.StreamingPullRequest() + + self._rpc.send(request) return True return False @@ -700,6 +879,42 @@ def _get_initial_request( # Return the initial request. return request + def _send_lease_modacks(self, ack_ids: Iterable[str], ack_deadline: float): + exactly_once_enabled = False + with self._exactly_once_enabled_lock: + exactly_once_enabled = self._exactly_once_enabled + if exactly_once_enabled: + items = [] + for ack_id in ack_ids: + future = futures.Future() + request = requests.ModAckRequest(ack_id, ack_deadline, future) + items.append(request) + + assert self._dispatcher is not None + self._dispatcher.modify_ack_deadline(items) + + for req in items: + try: + assert req.future is not None + req.future.result() + except AcknowledgeError: + _LOGGER.warning( + "AcknowledgeError when lease-modacking a message.", + exc_info=True, + ) + else: + items = [ + requests.ModAckRequest(ack_id, self.ack_deadline, None) + for ack_id in ack_ids + ] + assert self._dispatcher is not None + self._dispatcher.modify_ack_deadline(items) + + def _exactly_once_delivery_enabled(self) -> bool: + """Whether exactly-once delivery is enabled for the subscription.""" + with self._exactly_once_enabled_lock: + return self._exactly_once_enabled + def _on_response(self, response: gapic_types.StreamingPullResponse) -> None: """Process all received Pub/Sub messages. @@ -730,15 +945,24 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None: self._on_hold_bytes, ) + with self._exactly_once_enabled_lock: + if ( + response.subscription_properties.exactly_once_delivery_enabled + != self._exactly_once_enabled + ): + self._exactly_once_enabled = ( + response.subscription_properties.exactly_once_delivery_enabled + ) + # Update ack_deadline, whose minimum depends on self._exactly_once_enabled + # This method acquires the self._ack_deadline_lock lock. + self._obtain_ack_deadline(maybe_update=True) + self._send_new_ack_deadline = True + # Immediately (i.e. without waiting for the auto lease management) # modack the messages we received, as this tells the server that we've # received them. - items = [ - requests.ModAckRequest(message.ack_id, self.ack_deadline) - for message in received_messages - ] - assert self._dispatcher is not None - self._dispatcher.modify_ack_deadline(items) + ack_id_gen = (message.ack_id for message in received_messages) + self._send_lease_modacks(ack_id_gen, self.ack_deadline) with self._pause_resume_lock: assert self._scheduler is not None @@ -750,6 +974,7 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None: received_message.ack_id, received_message.delivery_attempt, self._scheduler.queue, + self._exactly_once_delivery_enabled, ) self._messages_on_hold.put(message) self._on_hold_bytes += message.size diff --git a/google/cloud/pubsub_v1/subscriber/exceptions.py b/google/cloud/pubsub_v1/subscriber/exceptions.py new file mode 100644 index 000000000..a5dad31a9 --- /dev/null +++ b/google/cloud/pubsub_v1/subscriber/exceptions.py @@ -0,0 +1,44 @@ +# Copyright 2017, Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import + +from enum import Enum +from google.api_core.exceptions import GoogleAPICallError +from typing import Optional + + +class AcknowledgeStatus(Enum): + SUCCESS = 1 + PERMISSION_DENIED = 2 + FAILED_PRECONDITION = 3 + INVALID_ACK_ID = 4 + OTHER = 5 + + +class AcknowledgeError(GoogleAPICallError): + """Error during ack/modack/nack operation on exactly-once-enabled subscription.""" + + def __init__(self, error_code: AcknowledgeStatus, info: Optional[str]): + self.error_code = error_code + self.info = info + message = None + if info: + message = str(self.error_code) + " : " + str(self.info) + else: + message = str(self.error_code) + super(AcknowledgeError, self).__init__(message) + + +__all__ = ("AcknowledgeError",) diff --git a/google/cloud/pubsub_v1/subscriber/futures.py b/google/cloud/pubsub_v1/subscriber/futures.py index a024ba698..f043b7eb5 100644 --- a/google/cloud/pubsub_v1/subscriber/futures.py +++ b/google/cloud/pubsub_v1/subscriber/futures.py @@ -16,9 +16,10 @@ import typing from typing import Any +from typing import Union from google.cloud.pubsub_v1 import futures - +from google.cloud.pubsub_v1.subscriber.exceptions import AcknowledgeStatus if typing.TYPE_CHECKING: # pragma: NO COVER from google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager import ( @@ -80,3 +81,45 @@ def cancelled(self) -> bool: ``True`` if the subscription has been cancelled. """ return self.__cancelled + + +class Future(futures.Future): + """This future object is for subscribe-side calls. + + Calling :meth:`result` will resolve the future by returning the message + ID, unless an error occurs. + """ + + def cancel(self) -> bool: + """Actions in Pub/Sub generally may not be canceled. + + This method always returns ``False``. + """ + return False + + def cancelled(self) -> bool: + """Actions in Pub/Sub generally may not be canceled. + + This method always returns ``False``. + """ + return False + + def result(self, timeout: Union[int, float] = None) -> AcknowledgeStatus: + """Return a success code or raise an exception. + + This blocks until the operation completes successfully and + returns the error code unless an exception is raised. + + Args: + timeout: The number of seconds before this call + times out and raises TimeoutError. + + Returns: + AcknowledgeStatus.SUCCESS if the operation succeeded. + + Raises: + concurrent.futures.TimeoutError: If the request times out. + AcknowledgeError: If the operation did not succeed for another + reason. + """ + return super().result(timeout=timeout) diff --git a/google/cloud/pubsub_v1/subscriber/message.py b/google/cloud/pubsub_v1/subscriber/message.py index 2d72bba57..5744aa71c 100644 --- a/google/cloud/pubsub_v1/subscriber/message.py +++ b/google/cloud/pubsub_v1/subscriber/message.py @@ -19,9 +19,12 @@ import math import time import typing -from typing import Optional +from typing import Optional, Callable from google.cloud.pubsub_v1.subscriber._protocol import requests +from google.cloud.pubsub_v1.subscriber import futures +from google.cloud.pubsub_v1.subscriber.exceptions import AcknowledgeStatus + if typing.TYPE_CHECKING: # pragma: NO COVER import datetime @@ -87,6 +90,7 @@ def __init__( ack_id: str, delivery_attempt: int, request_queue: "queue.Queue", + exactly_once_delivery_enabled_func: Callable[[], bool] = lambda: False, ): """Construct the Message. @@ -110,11 +114,14 @@ def __init__( request_queue: A queue provided by the policy that can accept requests; the policy is responsible for handling those requests. + exactly_once_delivery_enabled_func: + A Callable that returns whether exactly-once delivery is currently-enabled. Defaults to a lambda that always returns False. """ self._message = message self._ack_id = ack_id self._delivery_attempt = delivery_attempt if delivery_attempt > 0 else None self._request_queue = request_queue + self._exactly_once_delivery_enabled_func = exactly_once_delivery_enabled_func self.message_id = message.message_id # The instantiation time is the time that this message @@ -233,7 +240,10 @@ def ack(self) -> None: .. warning:: Acks in Pub/Sub are best effort. You should always ensure that your processing code is idempotent, as you may - receive any given message more than once. + receive any given message more than once. If you need strong + guarantees about acks and re-deliveres, enable exactly-once + delivery on your subscription and use the `ack_with_response` + method instead. """ time_to_ack = math.ceil(time.time() - self._received_timestamp) self._request_queue.put( @@ -242,9 +252,63 @@ def ack(self) -> None: byte_size=self.size, time_to_ack=time_to_ack, ordering_key=self.ordering_key, + future=None, ) ) + def ack_with_response(self) -> "futures.Future": + """Acknowledge the given message. + + Acknowledging a message in Pub/Sub means that you are done + with it, and it will not be delivered to this subscription again. + You should avoid acknowledging messages until you have + *finished* processing them, so that in the event of a failure, + you receive the message again. + + If exactly-once delivery is enabled on the subscription, the + future returned by this method tracks the state of acknowledgement + operation. If the future completes successfully, the message is + guaranteed NOT to be re-delivered. Otherwise, the future will + contain an exception with more details about the failure and the + message may be re-delivered. + + If exactly-once delivery is NOT enabled on the subscription, the + future returns immediately with an AcknowledgeStatus.SUCCESS. + Since acks in Cloud Pub/Sub are best effort when exactly-once + delivery is disabled, the message may be re-delivered. Because + re-deliveries are possible, you should ensure that your processing + code is idempotent, as you may receive any given message more than + once. + + Returns: + A :class:`~google.cloud.pubsub_v1.subscriber.futures.Future` + instance that conforms to Python Standard library's + :class:`~concurrent.futures.Future` interface (but not an + instance of that class). Call `result()` to get the result + of the operation; upon success, a + pubsub_v1.subscriber.exceptions.AcknowledgeStatus.SUCCESS + will be returned and upon an error, an + pubsub_v1.subscriber.exceptions.AcknowledgeError exception + will be thrown. + """ + future = futures.Future() + req_future = None + if self._exactly_once_delivery_enabled_func(): + req_future = future + else: + future.set_result(AcknowledgeStatus.SUCCESS) + time_to_ack = math.ceil(time.time() - self._received_timestamp) + self._request_queue.put( + requests.AckRequest( + ack_id=self._ack_id, + byte_size=self.size, + time_to_ack=time_to_ack, + ordering_key=self.ordering_key, + future=req_future, + ) + ) + return future + def drop(self) -> None: """Release the message from lease management. @@ -269,8 +333,8 @@ def modify_ack_deadline(self, seconds: int) -> None: New deadline will be the given value of seconds from now. - The default implementation handles this for you; you should not need - to manually deal with setting ack deadlines. The exception case is + The default implementation handles automatically modacking received messages for you; + you should not need to manually deal with setting ack deadlines. The exception case is if you are implementing your own custom subclass of :class:`~.pubsub_v1.subcriber._consumer.Consumer`. @@ -281,16 +345,126 @@ def modify_ack_deadline(self, seconds: int) -> None: against. """ self._request_queue.put( - requests.ModAckRequest(ack_id=self._ack_id, seconds=seconds) + requests.ModAckRequest(ack_id=self._ack_id, seconds=seconds, future=None) ) + def modify_ack_deadline_with_response(self, seconds: int) -> "futures.Future": + """Resets the deadline for acknowledgement and returns the response + status via a future. + + New deadline will be the given value of seconds from now. + + The default implementation handles automatically modacking received messages for you; + you should not need to manually deal with setting ack deadlines. The exception case is + if you are implementing your own custom subclass of + :class:`~.pubsub_v1.subcriber._consumer.Consumer`. + + If exactly-once delivery is enabled on the subscription, the + future returned by this method tracks the state of the + modify-ack-deadline operation. If the future completes successfully, + the message is guaranteed NOT to be re-delivered within the new deadline. + Otherwise, the future will contain an exception with more details about + the failure and the message will be redelivered according to its + currently-set ack deadline. + + If exactly-once delivery is NOT enabled on the subscription, the + future returns immediately with an AcknowledgeStatus.SUCCESS. + Since modify-ack-deadline operations in Cloud Pub/Sub are best effort + when exactly-once delivery is disabled, the message may be re-delivered + within the set deadline. + + Args: + seconds: + The number of seconds to set the lease deadline to. This should be + between 0 and 600. Due to network latency, values below 10 are advised + against. + Returns: + A :class:`~google.cloud.pubsub_v1.subscriber.futures.Future` + instance that conforms to Python Standard library's + :class:`~concurrent.futures.Future` interface (but not an + instance of that class). Call `result()` to get the result + of the operation; upon success, a + pubsub_v1.subscriber.exceptions.AcknowledgeStatus.SUCCESS + will be returned and upon an error, an + pubsub_v1.subscriber.exceptions.AcknowledgeError exception + will be thrown. + + """ + future = futures.Future() + req_future = None + if self._exactly_once_delivery_enabled_func(): + req_future = future + else: + future.set_result(AcknowledgeStatus.SUCCESS) + + self._request_queue.put( + requests.ModAckRequest( + ack_id=self._ack_id, seconds=seconds, future=req_future + ) + ) + + return future + def nack(self) -> None: - """Decline to acknowldge the given message. + """Decline to acknowledge the given message. - This will cause the message to be re-delivered to the subscription. + This will cause the message to be re-delivered to subscribers. Re-deliveries + may take place immediately or after a delay, and may arrive at this subscriber + or another. """ self._request_queue.put( requests.NackRequest( - ack_id=self._ack_id, byte_size=self.size, ordering_key=self.ordering_key + ack_id=self._ack_id, + byte_size=self.size, + ordering_key=self.ordering_key, + future=None, + ) + ) + + def nack_with_response(self) -> "futures.Future": + """Decline to acknowledge the given message, returning the response status via + a future. + + This will cause the message to be re-delivered to subscribers. Re-deliveries + may take place immediately or after a delay, and may arrive at this subscriber + or another. + + If exactly-once delivery is enabled on the subscription, the + future returned by this method tracks the state of the + nack operation. If the future completes successfully, + the future's result will be an AcknowledgeStatus.SUCCESS. + Otherwise, the future will contain an exception with more details about + the failure. + + If exactly-once delivery is NOT enabled on the subscription, the + future returns immediately with an AcknowledgeStatus.SUCCESS. + + Returns: + A :class:`~google.cloud.pubsub_v1.subscriber.futures.Future` + instance that conforms to Python Standard library's + :class:`~concurrent.futures.Future` interface (but not an + instance of that class). Call `result()` to get the result + of the operation; upon success, a + pubsub_v1.subscriber.exceptions.AcknowledgeStatus.SUCCESS + will be returned and upon an error, an + pubsub_v1.subscriber.exceptions.AcknowledgeError exception + will be thrown. + + """ + future = futures.Future() + req_future = None + if self._exactly_once_delivery_enabled_func(): + req_future = future + else: + future.set_result(AcknowledgeStatus.SUCCESS) + + self._request_queue.put( + requests.NackRequest( + ack_id=self._ack_id, + byte_size=self.size, + ordering_key=self.ordering_key, + future=req_future, ) ) + + return future diff --git a/google/cloud/pubsub_v1/types.py b/google/cloud/pubsub_v1/types.py index e843a6da9..109d4aadc 100644 --- a/google/cloud/pubsub_v1/types.py +++ b/google/cloud/pubsub_v1/types.py @@ -162,6 +162,13 @@ class FlowControl(NamedTuple): "before dropping it from the lease management." ) + min_duration_per_lease_extension: float = 0 + ( + "The min amount of time in seconds for a single lease extension attempt. " + "Must be between 10 and 600 (inclusive). Ignored by default, but set to " + "60 seconds if the subscription has exactly-once delivery enabled." + ) + max_duration_per_lease_extension: float = 0 # disabled by default ( "The max amount of time in seconds for a single lease extension attempt. " diff --git a/noxfile.py b/noxfile.py index e9fea8af8..ba16d5a46 100644 --- a/noxfile.py +++ b/noxfile.py @@ -146,6 +146,9 @@ def default(session): session.install("-e", ".", "-c", constraints_path) # Run py.test against the unit tests. + # THe following flags are useful during development: + # "-s" -> show print() statement output + # "-k " -> filter test cases session.run( "py.test", "--quiet", @@ -156,6 +159,7 @@ def default(session): "--cov-config=.coveragerc", "--cov-report=", "--cov-fail-under=0", + "-s", os.path.join("tests", "unit"), *session.posargs, ) @@ -200,6 +204,9 @@ def system(session): session.install("-e", ".", "-c", constraints_path) # Run py.test against the system tests. + # THe following flags are useful during development: + # "-s" -> show print() statement output + # "-k " -> filter test cases if system_test_exists: session.run( "py.test", @@ -226,7 +233,8 @@ def cover(session): test runs (not system test runs), and then erases coverage data. """ session.install("coverage", "pytest-cov") - session.run("coverage", "report", "--show-missing", "--fail-under=100") + # Tip: The "-i" flag lets you ignore errors with specific files. + session.run("coverage", "report", "-i", "--show-missing", "--fail-under=100") session.run("coverage", "erase") diff --git a/setup.py b/setup.py index 666fa61aa..5423bbb0d 100644 --- a/setup.py +++ b/setup.py @@ -36,6 +36,7 @@ "google-api-core[grpc] >= 1.28.0, <3.0.0dev", "proto-plus >= 1.7.1", "grpc-google-iam-v1 >= 0.12.3, < 0.13dev", + "grpcio-status >= 1.16.0", ] extras = {"libcst": "libcst >= 0.3.10"} diff --git a/tests/unit/pubsub_v1/subscriber/test_dispatcher.py b/tests/unit/pubsub_v1/subscriber/test_dispatcher.py index 539ae40c7..bbc6170e2 100644 --- a/tests/unit/pubsub_v1/subscriber/test_dispatcher.py +++ b/tests/unit/pubsub_v1/subscriber/test_dispatcher.py @@ -15,13 +15,12 @@ import collections import queue import threading -import warnings from google.cloud.pubsub_v1.subscriber._protocol import dispatcher from google.cloud.pubsub_v1.subscriber._protocol import helper_threads from google.cloud.pubsub_v1.subscriber._protocol import requests from google.cloud.pubsub_v1.subscriber._protocol import streaming_pull_manager -from google.pubsub_v1 import types as gapic_types +from google.cloud.pubsub_v1.subscriber import futures import mock import pytest @@ -30,11 +29,11 @@ @pytest.mark.parametrize( "item,method_name", [ - (requests.AckRequest("0", 0, 0, ""), "ack"), + (requests.AckRequest("0", 0, 0, "", None), "ack"), (requests.DropRequest("0", 0, ""), "drop"), (requests.LeaseRequest("0", 0, ""), "lease"), - (requests.ModAckRequest("0", 0), "modify_ack_deadline"), - (requests.NackRequest("0", 0, ""), "nack"), + (requests.ModAckRequest("0", 0, None), "modify_ack_deadline"), + (requests.NackRequest("0", 0, "", None), "nack"), ], ) def test_dispatch_callback_active_manager(item, method_name): @@ -54,11 +53,11 @@ def test_dispatch_callback_active_manager(item, method_name): @pytest.mark.parametrize( "item,method_name", [ - (requests.AckRequest("0", 0, 0, ""), "ack"), + (requests.AckRequest("0", 0, 0, "", None), "ack"), (requests.DropRequest("0", 0, ""), "drop"), (requests.LeaseRequest("0", 0, ""), "lease"), - (requests.ModAckRequest("0", 0), "modify_ack_deadline"), - (requests.NackRequest("0", 0, ""), "nack"), + (requests.ModAckRequest("0", 0, None), "modify_ack_deadline"), + (requests.NackRequest("0", 0, "", None), "nack"), ], ) def test_dispatch_callback_inactive_manager(item, method_name): @@ -76,24 +75,15 @@ def test_dispatch_callback_inactive_manager(item, method_name): method.assert_called_once_with([item]) -def test_dispatch_callback_inactive_manager_unknown_request(): +def test_unknown_request_type(): manager = mock.create_autospec( streaming_pull_manager.StreamingPullManager, instance=True ) - manager.is_active = False dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) - FooType = type("FooType", (), {}) - items = [FooType()] - - with warnings.catch_warnings(record=True) as warned: - dispatcher_.dispatch_callback(items) - - assert len(warned) == 1 - assert issubclass(warned[0].category, RuntimeWarning) - warning_msg = str(warned[0].message) - assert "unknown request item" in warning_msg - assert "FooType" in warning_msg + items = ["a random string, not a known request type"] + manager.send_unary_ack.return_value = (items, []) + dispatcher_.dispatch_callback(items) def test_ack(): @@ -104,13 +94,18 @@ def test_ack(): items = [ requests.AckRequest( - ack_id="ack_id_string", byte_size=0, time_to_ack=20, ordering_key="" + ack_id="ack_id_string", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=None, ) ] + manager.send_unary_ack.return_value = (items, []) dispatcher_.ack(items) - manager.send.assert_called_once_with( - gapic_types.StreamingPullRequest(ack_ids=["ack_id_string"]) + manager.send_unary_ack.assert_called_once_with( + ack_ids=["ack_id_string"], ack_reqs_dict={"ack_id_string": items[0]} ) manager.leaser.remove.assert_called_once_with(items) @@ -126,13 +121,18 @@ def test_ack_no_time(): items = [ requests.AckRequest( - ack_id="ack_id_string", byte_size=0, time_to_ack=None, ordering_key="" + ack_id="ack_id_string", + byte_size=0, + time_to_ack=None, + ordering_key="", + future=None, ) ] + manager.send_unary_ack.return_value = (items, []) dispatcher_.ack(items) - manager.send.assert_called_once_with( - gapic_types.StreamingPullRequest(ack_ids=["ack_id_string"]) + manager.send_unary_ack.assert_called_once_with( + ack_ids=["ack_id_string"], ack_reqs_dict={"ack_id_string": items[0]} ) manager.ack_histogram.add.assert_not_called() @@ -147,27 +147,152 @@ def test_ack_splitting_large_payload(): items = [ # use realistic lengths for ACK IDs (max 176 bytes) requests.AckRequest( - ack_id=str(i).zfill(176), byte_size=0, time_to_ack=20, ordering_key="" + ack_id=str(i).zfill(176), + byte_size=0, + time_to_ack=20, + ordering_key="", + future=None, ) for i in range(5001) ] + manager.send_unary_ack.return_value = (items, []) dispatcher_.ack(items) - calls = manager.send.call_args_list + calls = manager.send_unary_ack.call_args_list assert len(calls) == 3 all_ack_ids = {item.ack_id for item in items} sent_ack_ids = collections.Counter() for call in calls: - message = call.args[0] - assert message._pb.ByteSize() <= 524288 # server-side limit (2**19) - sent_ack_ids.update(message.ack_ids) + ack_ids = call[1]["ack_ids"] + assert len(ack_ids) <= dispatcher._ACK_IDS_BATCH_SIZE + sent_ack_ids.update(ack_ids) assert set(sent_ack_ids) == all_ack_ids # all messages should have been ACK-ed assert sent_ack_ids.most_common(1)[0][1] == 1 # each message ACK-ed exactly once +def test_retry_acks_in_new_thread(): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True + ) + dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) + + f = futures.Future() + items = [ + requests.AckRequest( + ack_id="ack_id_string", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=f, + ) + ] + # failure triggers creation of new retry thread + manager.send_unary_ack.side_effect = [([], items)] + with mock.patch("time.sleep", return_value=None): + with mock.patch.object(threading, "Thread", autospec=True) as Thread: + dispatcher_.ack(items) + + assert len(Thread.mock_calls) == 2 + ctor_call = Thread.mock_calls[0] + assert ctor_call.kwargs["name"] == "Thread-RetryAcks" + assert ctor_call.kwargs["target"].args[0] == items + assert ctor_call.kwargs["daemon"] + + +def test_retry_acks(): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True + ) + dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) + + f = futures.Future() + items = [ + requests.AckRequest( + ack_id="ack_id_string", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=f, + ) + ] + # first and second `send_unary_ack` calls fail, third one succeeds + manager.send_unary_ack.side_effect = [([], items), ([], items), (items, [])] + with mock.patch("time.sleep", return_value=None): + dispatcher_._retry_acks(items) + + manager.send_unary_ack.assert_has_calls( + [ + mock.call( + ack_ids=["ack_id_string"], ack_reqs_dict={"ack_id_string": items[0]} + ), + mock.call( + ack_ids=["ack_id_string"], ack_reqs_dict={"ack_id_string": items[0]} + ), + mock.call( + ack_ids=["ack_id_string"], ack_reqs_dict={"ack_id_string": items[0]} + ), + ] + ) + + +def test_retry_modacks_in_new_thread(): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True + ) + dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) + + f = futures.Future() + items = [requests.ModAckRequest(ack_id="ack_id_string", seconds=20, future=f,)] + # failure triggers creation of new retry thread + manager.send_unary_modack.side_effect = [([], items)] + with mock.patch("time.sleep", return_value=None): + with mock.patch.object(threading, "Thread", autospec=True) as Thread: + dispatcher_.modify_ack_deadline(items) + + assert len(Thread.mock_calls) == 2 + ctor_call = Thread.mock_calls[0] + assert ctor_call.kwargs["name"] == "Thread-RetryModAcks" + assert ctor_call.kwargs["target"].args[0] == items + assert ctor_call.kwargs["daemon"] + + +def test_retry_modacks(): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True + ) + dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) + + f = futures.Future() + items = [requests.ModAckRequest(ack_id="ack_id_string", seconds=20, future=f,)] + # first and second calls fail, third one succeeds + manager.send_unary_modack.side_effect = [([], items), ([], items), (items, [])] + with mock.patch("time.sleep", return_value=None): + dispatcher_._retry_modacks(items) + + manager.send_unary_modack.assert_has_calls( + [ + mock.call( + modify_deadline_ack_ids=["ack_id_string"], + modify_deadline_seconds=[20], + ack_reqs_dict={"ack_id_string": items[0]}, + ), + mock.call( + modify_deadline_ack_ids=["ack_id_string"], + modify_deadline_seconds=[20], + ack_reqs_dict={"ack_id_string": items[0]}, + ), + mock.call( + modify_deadline_ack_ids=["ack_id_string"], + modify_deadline_seconds=[20], + ack_reqs_dict={"ack_id_string": items[0]}, + ), + ] + ) + + def test_lease(): manager = mock.create_autospec( streaming_pull_manager.StreamingPullManager, instance=True @@ -224,14 +349,21 @@ def test_nack(): dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) items = [ - requests.NackRequest(ack_id="ack_id_string", byte_size=10, ordering_key="") + requests.NackRequest( + ack_id="ack_id_string", byte_size=10, ordering_key="", future=None + ) ] + manager.send_unary_modack.return_value = (items, []) dispatcher_.nack(items) - manager.send.assert_called_once_with( - gapic_types.StreamingPullRequest( - modify_deadline_ack_ids=["ack_id_string"], modify_deadline_seconds=[0] - ) + manager.send_unary_modack.assert_called_once_with( + modify_deadline_ack_ids=["ack_id_string"], + modify_deadline_seconds=[0], + ack_reqs_dict={ + "ack_id_string": requests.ModAckRequest( + ack_id="ack_id_string", seconds=0, future=None + ) + }, ) @@ -241,13 +373,14 @@ def test_modify_ack_deadline(): ) dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) - items = [requests.ModAckRequest(ack_id="ack_id_string", seconds=60)] + items = [requests.ModAckRequest(ack_id="ack_id_string", seconds=60, future=None)] + manager.send_unary_modack.return_value = (items, []) dispatcher_.modify_ack_deadline(items) - manager.send.assert_called_once_with( - gapic_types.StreamingPullRequest( - modify_deadline_ack_ids=["ack_id_string"], modify_deadline_seconds=[60] - ) + manager.send_unary_modack.assert_called_once_with( + modify_deadline_ack_ids=["ack_id_string"], + modify_deadline_seconds=[60], + ack_reqs_dict={"ack_id_string": items[0]}, ) @@ -259,21 +392,22 @@ def test_modify_ack_deadline_splitting_large_payload(): items = [ # use realistic lengths for ACK IDs (max 176 bytes) - requests.ModAckRequest(ack_id=str(i).zfill(176), seconds=60) + requests.ModAckRequest(ack_id=str(i).zfill(176), seconds=60, future=None) for i in range(5001) ] + manager.send_unary_modack.return_value = (items, []) dispatcher_.modify_ack_deadline(items) - calls = manager.send.call_args_list + calls = manager.send_unary_modack.call_args_list assert len(calls) == 3 all_ack_ids = {item.ack_id for item in items} sent_ack_ids = collections.Counter() for call in calls: - message = call.args[0] - assert message._pb.ByteSize() <= 524288 # server-side limit (2**19) - sent_ack_ids.update(message.modify_deadline_ack_ids) + modack_ackids = call[1]["modify_deadline_ack_ids"] + assert len(modack_ackids) <= dispatcher._ACK_IDS_BATCH_SIZE + sent_ack_ids.update(modack_ackids) assert set(sent_ack_ids) == all_ack_ids # all messages should have been MODACK-ed assert sent_ack_ids.most_common(1)[0][1] == 1 # each message MODACK-ed exactly once diff --git a/tests/unit/pubsub_v1/subscriber/test_futures_subscriber.py b/tests/unit/pubsub_v1/subscriber/test_futures_subscriber.py index 5411674c0..9f71109e7 100644 --- a/tests/unit/pubsub_v1/subscriber/test_futures_subscriber.py +++ b/tests/unit/pubsub_v1/subscriber/test_futures_subscriber.py @@ -19,6 +19,10 @@ from google.cloud.pubsub_v1.subscriber import futures from google.cloud.pubsub_v1.subscriber._protocol import streaming_pull_manager +from google.cloud.pubsub_v1.subscriber.exceptions import ( + AcknowledgeError, + AcknowledgeStatus, +) class TestStreamingPullFuture(object): @@ -76,3 +80,30 @@ def test_cancel(self): manager.close.assert_called_once() assert future.cancelled() + + +class TestFuture(object): + def test_cancel(self): + future = futures.Future() + assert future.cancel() is False + + def test_cancelled(self): + future = futures.Future() + assert future.cancelled() is False + + def test_result_on_success(self): + future = futures.Future() + future.set_result(AcknowledgeStatus.SUCCESS) + assert future.result() == AcknowledgeStatus.SUCCESS + + def test_result_on_failure(self): + future = futures.Future() + future.set_exception( + AcknowledgeError( + AcknowledgeStatus.PERMISSION_DENIED, "Something bad happened." + ) + ) + with pytest.raises(AcknowledgeError) as e: + future.result() + assert e.value.error_code == AcknowledgeStatus.PERMISSION_DENIED + assert e.value.info == "Something bad happened." diff --git a/tests/unit/pubsub_v1/subscriber/test_leaser.py b/tests/unit/pubsub_v1/subscriber/test_leaser.py index f389e5205..890c3c947 100644 --- a/tests/unit/pubsub_v1/subscriber/test_leaser.py +++ b/tests/unit/pubsub_v1/subscriber/test_leaser.py @@ -102,7 +102,7 @@ def test_maintain_leases_inactive_manager(caplog): leaser_.maintain_leases() # Leases should still be maintained even if the manager is inactive. - manager.dispatcher.modify_ack_deadline.assert_called() + manager._send_lease_modacks.assert_called() assert "exiting" in caplog.text @@ -138,9 +138,11 @@ def test_maintain_leases_ack_ids(): leaser_.maintain_leases() - manager.dispatcher.modify_ack_deadline.assert_called_once_with( - [requests.ModAckRequest(ack_id="my ack id", seconds=10)] - ) + assert len(manager._send_lease_modacks.mock_calls) == 1 + call = manager._send_lease_modacks.mock_calls[0] + ack_ids = list(call.args[0]) + assert ack_ids == ["my ack id"] + assert call.args[1] == 10 def test_maintain_leases_no_ack_ids(): @@ -182,14 +184,11 @@ def test_maintain_leases_outdated_items(time): leaser_.maintain_leases() # ack2, ack3, and ack4 should be renewed. ack1 should've been dropped - modacks = manager.dispatcher.modify_ack_deadline.call_args.args[0] - expected = [ - requests.ModAckRequest(ack_id="ack2", seconds=10), - requests.ModAckRequest(ack_id="ack3", seconds=10), - requests.ModAckRequest(ack_id="ack4", seconds=10), - ] - # Use sorting to allow for ordering variance. - assert sorted(modacks) == sorted(expected) + assert len(manager._send_lease_modacks.mock_calls) == 1 + call = manager._send_lease_modacks.mock_calls[0] + ack_ids = list(call.args[0]) + assert ack_ids == ["ack2", "ack3", "ack4"] + assert call.args[1] == 10 manager.dispatcher.drop.assert_called_once_with( [requests.DropRequest(ack_id="ack1", byte_size=50, ordering_key="")] diff --git a/tests/unit/pubsub_v1/subscriber/test_message.py b/tests/unit/pubsub_v1/subscriber/test_message.py index e3c14c93c..f5c7bf3c7 100644 --- a/tests/unit/pubsub_v1/subscriber/test_message.py +++ b/tests/unit/pubsub_v1/subscriber/test_message.py @@ -23,6 +23,7 @@ from google.cloud.pubsub_v1.subscriber._protocol import requests from google.protobuf import timestamp_pb2 from google.pubsub_v1 import types as gapic_types +from google.cloud.pubsub_v1.subscriber.exceptions import AcknowledgeStatus RECEIVED = datetime.datetime(2012, 4, 21, 15, 0, tzinfo=datetime.timezone.utc) @@ -32,7 +33,14 @@ PUBLISHED_SECONDS = datetime_helpers.to_milliseconds(PUBLISHED) // 1000 -def create_message(data, ack_id="ACKID", delivery_attempt=0, ordering_key="", **attrs): +def create_message( + data, + ack_id="ACKID", + delivery_attempt=0, + ordering_key="", + exactly_once_delivery_enabled=False, + **attrs +): with mock.patch.object(time, "time") as time_: time_.return_value = RECEIVED_SECONDS gapic_pubsub_message = gapic_types.PubsubMessage( @@ -51,6 +59,7 @@ def create_message(data, ack_id="ACKID", delivery_attempt=0, ordering_key="", ** ack_id=ack_id, delivery_attempt=delivery_attempt, request_queue=queue.Queue(), + exactly_once_delivery_enabled_func=lambda: exactly_once_delivery_enabled, ) return msg @@ -127,6 +136,42 @@ def test_ack(): byte_size=30, time_to_ack=mock.ANY, ordering_key="", + future=None, + ) + ) + check_call_types(put, requests.AckRequest) + + +def test_ack_with_response_exactly_once_delivery_disabled(): + msg = create_message(b"foo", ack_id="bogus_ack_id") + with mock.patch.object(msg._request_queue, "put") as put: + future = msg.ack_with_response() + put.assert_called_once_with( + requests.AckRequest( + ack_id="bogus_ack_id", + byte_size=30, + time_to_ack=mock.ANY, + ordering_key="", + future=None, + ) + ) + assert future.result() == AcknowledgeStatus.SUCCESS + check_call_types(put, requests.AckRequest) + + +def test_ack_with_response_exactly_once_delivery_enabled(): + msg = create_message( + b"foo", ack_id="bogus_ack_id", exactly_once_delivery_enabled=True + ) + with mock.patch.object(msg._request_queue, "put") as put: + future = msg.ack_with_response() + put.assert_called_once_with( + requests.AckRequest( + ack_id="bogus_ack_id", + byte_size=30, + time_to_ack=mock.ANY, + ordering_key="", + future=future, ) ) check_call_types(put, requests.AckRequest) @@ -147,7 +192,30 @@ def test_modify_ack_deadline(): with mock.patch.object(msg._request_queue, "put") as put: msg.modify_ack_deadline(60) put.assert_called_once_with( - requests.ModAckRequest(ack_id="bogus_ack_id", seconds=60) + requests.ModAckRequest(ack_id="bogus_ack_id", seconds=60, future=None) + ) + check_call_types(put, requests.ModAckRequest) + + +def test_modify_ack_deadline_with_response_exactly_once_delivery_disabled(): + msg = create_message(b"foo", ack_id="bogus_ack_id") + with mock.patch.object(msg._request_queue, "put") as put: + future = msg.modify_ack_deadline_with_response(60) + put.assert_called_once_with( + requests.ModAckRequest(ack_id="bogus_ack_id", seconds=60, future=None) + ) + assert future.result() == AcknowledgeStatus.SUCCESS + check_call_types(put, requests.ModAckRequest) + + +def test_modify_ack_deadline_with_response_exactly_once_delivery_enabled(): + msg = create_message( + b"foo", ack_id="bogus_ack_id", exactly_once_delivery_enabled=True + ) + with mock.patch.object(msg._request_queue, "put") as put: + future = msg.modify_ack_deadline_with_response(60) + put.assert_called_once_with( + requests.ModAckRequest(ack_id="bogus_ack_id", seconds=60, future=future) ) check_call_types(put, requests.ModAckRequest) @@ -157,7 +225,36 @@ def test_nack(): with mock.patch.object(msg._request_queue, "put") as put: msg.nack() put.assert_called_once_with( - requests.NackRequest(ack_id="bogus_ack_id", byte_size=30, ordering_key="") + requests.NackRequest( + ack_id="bogus_ack_id", byte_size=30, ordering_key="", future=None + ) + ) + check_call_types(put, requests.NackRequest) + + +def test_nack_with_response_exactly_once_delivery_disabled(): + msg = create_message(b"foo", ack_id="bogus_ack_id") + with mock.patch.object(msg._request_queue, "put") as put: + future = msg.nack_with_response() + put.assert_called_once_with( + requests.NackRequest( + ack_id="bogus_ack_id", byte_size=30, ordering_key="", future=None + ) + ) + assert future.result() == AcknowledgeStatus.SUCCESS + check_call_types(put, requests.NackRequest) + + +def test_nack_with_response_exactly_once_delivery_enabled(): + msg = create_message( + b"foo", ack_id="bogus_ack_id", exactly_once_delivery_enabled=True + ) + with mock.patch.object(msg._request_queue, "put") as put: + future = msg.nack_with_response() + put.assert_called_once_with( + requests.NackRequest( + ack_id="bogus_ack_id", byte_size=30, ordering_key="", future=future + ) ) check_call_types(put, requests.NackRequest) diff --git a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py index 797430e07..0d28ec447 100644 --- a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py +++ b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py @@ -21,7 +21,13 @@ def make_message(ack_id, ordering_key): proto_msg = gapic_types.PubsubMessage(data=b"Q", ordering_key=ordering_key) - return message.Message(proto_msg._pb, ack_id, 0, queue.Queue()) + return message.Message( + proto_msg._pb, + ack_id, + 0, + queue.Queue(), + exactly_once_delivery_enabled_func=lambda: False, # pragma: NO COVER + ) def test_init(): diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 42c14c47d..9e8d6c5ed 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -33,8 +33,13 @@ from google.cloud.pubsub_v1.subscriber._protocol import messages_on_hold from google.cloud.pubsub_v1.subscriber._protocol import requests from google.cloud.pubsub_v1.subscriber._protocol import streaming_pull_manager +from google.cloud.pubsub_v1.subscriber import exceptions as subscriber_exceptions +from google.cloud.pubsub_v1.subscriber import futures from google.pubsub_v1 import types as gapic_types import grpc +from google.rpc import status_pb2 +from google.rpc import code_pb2 +from google.rpc import error_details_pb2 @pytest.mark.parametrize( @@ -122,6 +127,16 @@ def make_manager(**kwargs): ) +def complete_modify_ack_deadline_calls(dispatcher): + def complete_futures(*args, **kwargs): + modack_requests = args[0] + for req in modack_requests: + if req.future: + req.future.set_result(subscriber_exceptions.AcknowledgeStatus.SUCCESS) + + dispatcher.modify_ack_deadline.side_effect = complete_futures + + def fake_leaser_add(leaser, init_msg_count=0, assumed_msg_size=10): """Add a simplified fake add() method to a leaser instance. @@ -144,8 +159,11 @@ def test__obtain_ack_deadline_no_custom_flow_control_setting(): manager = make_manager() - # Make sure that max_duration_per_lease_extension is disabled. - manager._flow_control = types.FlowControl(max_duration_per_lease_extension=0) + # Make sure that min_duration_per_lease_extension and + # max_duration_per_lease_extension is disabled. + manager._flow_control = types.FlowControl( + min_duration_per_lease_extension=0, max_duration_per_lease_extension=0 + ) deadline = manager._obtain_ack_deadline(maybe_update=True) assert deadline == histogram.MIN_ACK_DEADLINE @@ -175,6 +193,20 @@ def test__obtain_ack_deadline_with_max_duration_per_lease_extension(): assert deadline == histogram.MIN_ACK_DEADLINE + 1 +def test__obtain_ack_deadline_with_min_duration_per_lease_extension(): + from google.cloud.pubsub_v1.subscriber._protocol import histogram + + manager = make_manager() + manager._flow_control = types.FlowControl( + min_duration_per_lease_extension=histogram.MAX_ACK_DEADLINE + ) + manager.ack_histogram.add(histogram.MIN_ACK_DEADLINE) # make p99 value small + + # The deadline configured in flow control should prevail. + deadline = manager._obtain_ack_deadline(maybe_update=True) + assert deadline == histogram.MAX_ACK_DEADLINE + + def test__obtain_ack_deadline_with_max_duration_per_lease_extension_too_low(): from google.cloud.pubsub_v1.subscriber._protocol import histogram @@ -182,13 +214,58 @@ def test__obtain_ack_deadline_with_max_duration_per_lease_extension_too_low(): manager._flow_control = types.FlowControl( max_duration_per_lease_extension=histogram.MIN_ACK_DEADLINE - 1 ) - manager.ack_histogram.add(histogram.MIN_ACK_DEADLINE * 3) # make p99 value large # The deadline configured in flow control should be adjusted to the minimum allowed. deadline = manager._obtain_ack_deadline(maybe_update=True) assert deadline == histogram.MIN_ACK_DEADLINE +def test__obtain_ack_deadline_with_min_duration_per_lease_extension_too_high(): + from google.cloud.pubsub_v1.subscriber._protocol import histogram + + manager = make_manager() + manager._flow_control = types.FlowControl( + min_duration_per_lease_extension=histogram.MAX_ACK_DEADLINE + 1 + ) + + # The deadline configured in flow control should be adjusted to the maximum allowed. + deadline = manager._obtain_ack_deadline(maybe_update=True) + assert deadline == histogram.MAX_ACK_DEADLINE + + +def test__obtain_ack_deadline_with_exactly_once_enabled(): + manager = make_manager() + manager._flow_control = types.FlowControl( + min_duration_per_lease_extension=0 # leave as default value + ) + manager._exactly_once_enabled = True + manager.ack_histogram.add( + 10 + ) # reduce p99 value below 60s min for exactly_once subscriptions + + deadline = manager._obtain_ack_deadline(maybe_update=True) + # Since the 60-second min ack_deadline value for exactly_once subscriptions + # seconds is higher than the histogram value, the deadline should be 60 sec. + assert deadline == 60 + + +def test__obtain_ack_deadline_with_min_duration_per_lease_extension_with_exactly_once_enabled(): + from google.cloud.pubsub_v1.subscriber._protocol import histogram + + manager = make_manager() + manager._flow_control = types.FlowControl( + min_duration_per_lease_extension=histogram.MAX_ACK_DEADLINE + ) + manager._exactly_once_enabled = True + manager.ack_histogram.add(histogram.MIN_ACK_DEADLINE) # make p99 value small + + # The deadline configured in flow control should prevail. + deadline = manager._obtain_ack_deadline(maybe_update=True) + # User-defined custom min ack_deadline value takes precedence over + # exactly_once default of 60 seconds. + assert deadline == histogram.MAX_ACK_DEADLINE + + def test__obtain_ack_deadline_no_value_update(): manager = make_manager() @@ -426,21 +503,38 @@ def test__maybe_release_messages_negative_on_hold_bytes_warning(caplog): assert manager._on_hold_bytes == 0 # should be auto-corrected -def test_send_unary(): +def test_send_unary_ack(): manager = make_manager() - manager.send( - gapic_types.StreamingPullRequest( - ack_ids=["ack_id1", "ack_id2"], - modify_deadline_ack_ids=["ack_id3", "ack_id4", "ack_id5"], - modify_deadline_seconds=[10, 20, 20], - ) - ) + ack_reqs_dict = { + "ack_id1": requests.AckRequest( + ack_id="ack_id1", byte_size=0, time_to_ack=20, ordering_key="", future=None + ), + "ack_id2": requests.AckRequest( + ack_id="ack_id2", byte_size=0, time_to_ack=20, ordering_key="", future=None + ), + } + manager.send_unary_ack(ack_ids=["ack_id1", "ack_id2"], ack_reqs_dict=ack_reqs_dict) manager._client.acknowledge.assert_called_once_with( subscription=manager._subscription, ack_ids=["ack_id1", "ack_id2"] ) + +def test_send_unary_modack(): + manager = make_manager() + + ack_reqs_dict = { + "ack_id3": requests.ModAckRequest(ack_id="ack_id3", seconds=60, future=None), + "ack_id4": requests.ModAckRequest(ack_id="ack_id4", seconds=60, future=None), + "ack_id5": requests.ModAckRequest(ack_id="ack_id5", seconds=60, future=None), + } + manager.send_unary_modack( + modify_deadline_ack_ids=["ack_id3", "ack_id4", "ack_id5"], + modify_deadline_seconds=[10, 20, 20], + ack_reqs_dict=ack_reqs_dict, + ) + manager._client.modify_ack_deadline.assert_has_calls( [ mock.call( @@ -458,29 +552,61 @@ def test_send_unary(): ) -def test_send_unary_empty(): +def test_send_unary_ack_api_call_error(caplog): + caplog.set_level(logging.DEBUG) + manager = make_manager() - manager.send(gapic_types.StreamingPullRequest()) + error = exceptions.GoogleAPICallError("The front fell off") + manager._client.acknowledge.side_effect = error - manager._client.acknowledge.assert_not_called() - manager._client.modify_ack_deadline.assert_not_called() + ack_reqs_dict = { + "ack_id1": requests.AckRequest( + ack_id="ack_id1", byte_size=0, time_to_ack=20, ordering_key="", future=None + ), + "ack_id2": requests.AckRequest( + ack_id="ack_id2", byte_size=0, time_to_ack=20, ordering_key="", future=None + ), + } + manager.send_unary_ack(ack_ids=["ack_id1", "ack_id2"], ack_reqs_dict=ack_reqs_dict) + + assert "The front fell off" in caplog.text -def test_send_unary_api_call_error(caplog): +def test_send_unary_modack_api_call_error(caplog): caplog.set_level(logging.DEBUG) manager = make_manager() error = exceptions.GoogleAPICallError("The front fell off") - manager._client.acknowledge.side_effect = error - - manager.send(gapic_types.StreamingPullRequest(ack_ids=["ack_id1", "ack_id2"])) + manager._client.modify_ack_deadline.side_effect = error + + ack_reqs_dict = { + "ack_id1": requests.AckRequest( + ack_id="ack_id1", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=futures.Future(), + ), + "ack_id2": requests.AckRequest( + ack_id="ack_id2", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=futures.Future(), + ), + } + manager.send_unary_modack( + modify_deadline_ack_ids=["ack_id_string"], + modify_deadline_seconds=[0], + ack_reqs_dict=ack_reqs_dict, + ) assert "The front fell off" in caplog.text -def test_send_unary_retry_error(caplog): +def test_send_unary_ack_retry_error(caplog): caplog.set_level(logging.DEBUG) manager, _, _, _, _, _ = make_running_manager() @@ -490,11 +616,68 @@ def test_send_unary_retry_error(caplog): ) manager._client.acknowledge.side_effect = error + future1 = futures.Future() + future2 = futures.Future() + ack_reqs_dict = { + "ack_id1": requests.AckRequest( + ack_id="ack_id1", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=future1, + ), + "ack_id2": requests.AckRequest( + ack_id="ack_id2", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=future2, + ), + } + with pytest.raises(exceptions.RetryError): + manager.send_unary_ack( + ack_ids=["ack_id1", "ack_id2"], ack_reqs_dict=ack_reqs_dict + ) + + assert "RetryError while sending unary RPC" in caplog.text + assert "signaled streaming pull manager shutdown" in caplog.text + assert isinstance(future1.exception(), subscriber_exceptions.AcknowledgeError) + assert ( + future1.exception().error_code is subscriber_exceptions.AcknowledgeStatus.OTHER + ) + assert isinstance(future2.exception(), subscriber_exceptions.AcknowledgeError) + assert ( + future2.exception().error_code is subscriber_exceptions.AcknowledgeStatus.OTHER + ) + + +def test_send_unary_modack_retry_error(caplog): + caplog.set_level(logging.DEBUG) + + manager, _, _, _, _, _ = make_running_manager() + + error = exceptions.RetryError( + "Too long a transient error", cause=Exception("Out of time!") + ) + manager._client.modify_ack_deadline.side_effect = error + + future = futures.Future() + ack_reqs_dict = { + "ackid1": requests.ModAckRequest(ack_id="ackid1", seconds=60, future=future) + } with pytest.raises(exceptions.RetryError): - manager.send(gapic_types.StreamingPullRequest(ack_ids=["ack_id1", "ack_id2"])) + manager.send_unary_modack( + modify_deadline_ack_ids=["ackid1"], + modify_deadline_seconds=[0], + ack_reqs_dict=ack_reqs_dict, + ) assert "RetryError while sending unary RPC" in caplog.text assert "signaled streaming pull manager shutdown" in caplog.text + assert isinstance(future.exception(), subscriber_exceptions.AcknowledgeError) + assert ( + future.exception().error_code is subscriber_exceptions.AcknowledgeStatus.OTHER + ) def test_heartbeat(): @@ -519,6 +702,23 @@ def test_heartbeat_inactive(): assert not result +def test_heartbeat_stream_ack_deadline_seconds(): + manager = make_manager() + manager._rpc = mock.create_autospec(bidi.BidiRpc, instance=True) + manager._rpc.is_active = True + # Send new ack deadline with next heartbeat. + manager._send_new_ack_deadline = True + + result = manager.heartbeat() + + manager._rpc.send.assert_called_once_with( + gapic_types.StreamingPullRequest(stream_ack_deadline_seconds=10) + ) + assert result + # Set to false after a send is initiated. + assert not manager._send_new_ack_deadline + + @mock.patch("google.api_core.bidi.ResumableBidiRpc", autospec=True) @mock.patch("google.api_core.bidi.BackgroundConsumer", autospec=True) @mock.patch("google.cloud.pubsub_v1.subscriber._protocol.leaser.Leaser", autospec=True) @@ -860,7 +1060,127 @@ def test__on_response_modifies_ack_deadline(): manager._on_response(response) dispatcher.modify_ack_deadline.assert_called_once_with( - [requests.ModAckRequest("ack_1", 18), requests.ModAckRequest("ack_2", 18)] + [ + requests.ModAckRequest("ack_1", 18, None), + requests.ModAckRequest("ack_2", 18, None), + ] + ) + + +def test__on_response_modifies_ack_deadline_with_exactly_once_min_lease(): + # exactly_once is disabled by default. + manager, _, dispatcher, leaser, _, scheduler = make_running_manager() + manager._callback = mock.sentinel.callback + complete_modify_ack_deadline_calls(dispatcher) + + # make p99 value smaller than exactly_once min lease + manager.ack_histogram.add(10) + + # adjust message bookkeeping in leaser + fake_leaser_add(leaser, init_msg_count=0, assumed_msg_size=42) + + # Set up the response with the first set of messages and exactly_once not + # enabled. + response1 = gapic_types.StreamingPullResponse( + received_messages=[ + gapic_types.ReceivedMessage( + ack_id="ack_1", + message=gapic_types.PubsubMessage(data=b"foo", message_id="1"), + ), + gapic_types.ReceivedMessage( + ack_id="ack_2", + message=gapic_types.PubsubMessage(data=b"bar", message_id="2"), + ), + ], + subscription_properties=gapic_types.StreamingPullResponse.SubscriptionProperties( + exactly_once_delivery_enabled=False + ), + ) + + # Set up the response with the second set of messages and exactly_once enabled. + response2 = gapic_types.StreamingPullResponse( + received_messages=[ + gapic_types.ReceivedMessage( + ack_id="ack_3", + message=gapic_types.PubsubMessage(data=b"foo", message_id="1"), + ), + gapic_types.ReceivedMessage( + ack_id="ack_4", + message=gapic_types.PubsubMessage(data=b"bar", message_id="2"), + ), + ], + subscription_properties=gapic_types.StreamingPullResponse.SubscriptionProperties( + exactly_once_delivery_enabled=True + ), + ) + + # exactly_once is still disabled b/c subscription_properties says so + manager._on_response(response1) + + # expect mod-acks are called with histogram-based lease value + assert len(dispatcher.modify_ack_deadline.mock_calls) == 1 + call = dispatcher.modify_ack_deadline.mock_calls[0] + assert call.args[0] == [ + requests.ModAckRequest("ack_1", 10, None), + requests.ModAckRequest("ack_2", 10, None), + ] + + # exactly_once should be enabled after this request b/c subscription_properties says so + manager._on_response(response2) + + # expect mod-acks called with 60 sec min lease value for exactly_once subscriptions + # ignore the futures here + assert len(dispatcher.modify_ack_deadline.mock_calls) == 2 + call = dispatcher.modify_ack_deadline.mock_calls[1] + modack_reqs = call.args[0] + assert modack_reqs[0].ack_id == "ack_3" + assert modack_reqs[0].seconds == 60 + assert modack_reqs[1].ack_id == "ack_4" + assert modack_reqs[1].seconds == 60 + + +def test__on_response_send_ack_deadline_after_enabling_exactly_once(): + # exactly_once is disabled by default. + manager, _, dispatcher, leaser, _, scheduler = make_running_manager() + manager._callback = mock.sentinel.callback + complete_modify_ack_deadline_calls(dispatcher) + + # set up an active RPC + manager._rpc = mock.create_autospec(bidi.BidiRpc, instance=True) + manager._rpc.is_active = True + + # make p99 value smaller than exactly_once min lease + manager.ack_histogram.add(10) + + # adjust message bookkeeping in leaser + fake_leaser_add(leaser, init_msg_count=0, assumed_msg_size=42) + + # Set up the response with the a message and exactly_once enabled. + response2 = gapic_types.StreamingPullResponse( + received_messages=[ + gapic_types.ReceivedMessage( + ack_id="ack_1", + message=gapic_types.PubsubMessage(data=b"foo", message_id="1"), + ) + ], + subscription_properties=gapic_types.StreamingPullResponse.SubscriptionProperties( + exactly_once_delivery_enabled=True + ), + ) + + # exactly_once should be enabled after this request b/c subscription_properties says so + # when exactly_once is enabled or disabled, we send a new ack_deadline via + # the heartbeat + # should satisfy assertion 1 + manager._on_response(response2) + + # simulate periodic heartbeat trigger + heartbeat_request_sent = manager.heartbeat() + assert heartbeat_request_sent + + # heartbeat request is sent with the 60 sec min lease value for exactly_once subscriptions + manager._rpc.send.assert_called_once_with( + gapic_types.StreamingPullRequest(stream_ack_deadline_seconds=60) ) @@ -890,7 +1210,10 @@ def test__on_response_no_leaser_overload(): manager._on_response(response) dispatcher.modify_ack_deadline.assert_called_once_with( - [requests.ModAckRequest("fack", 10), requests.ModAckRequest("back", 10)] + [ + requests.ModAckRequest("fack", 10, None), + requests.ModAckRequest("back", 10, None), + ] ) schedule_calls = scheduler.schedule.mock_calls @@ -937,9 +1260,9 @@ def test__on_response_with_leaser_overload(): # deadline extended, even those not dispatched to callbacks dispatcher.modify_ack_deadline.assert_called_once_with( [ - requests.ModAckRequest("fack", 10), - requests.ModAckRequest("back", 10), - requests.ModAckRequest("zack", 10), + requests.ModAckRequest("fack", 10, None), + requests.ModAckRequest("back", 10, None), + requests.ModAckRequest("zack", 10, None), ] ) @@ -1017,9 +1340,9 @@ def test__on_response_with_ordering_keys(): # deadline extended, even those not dispatched to callbacks. dispatcher.modify_ack_deadline.assert_called_once_with( [ - requests.ModAckRequest("fack", 10), - requests.ModAckRequest("back", 10), - requests.ModAckRequest("zack", 10), + requests.ModAckRequest("fack", 10, None), + requests.ModAckRequest("back", 10, None), + requests.ModAckRequest("zack", 10, None), ] ) @@ -1058,6 +1381,109 @@ def test__on_response_with_ordering_keys(): assert manager._messages_on_hold.get() is None +def test__on_response_enable_exactly_once(): + manager, _, dispatcher, leaser, _, scheduler = make_running_manager() + manager._callback = mock.sentinel.callback + complete_modify_ack_deadline_calls(dispatcher) + + # Set up the messages. + response = gapic_types.StreamingPullResponse( + received_messages=[ + gapic_types.ReceivedMessage( + ack_id="fack", + message=gapic_types.PubsubMessage(data=b"foo", message_id="1"), + ) + ], + subscription_properties=gapic_types.StreamingPullResponse.SubscriptionProperties( + exactly_once_delivery_enabled=True + ), + ) + + # adjust message bookkeeping in leaser + fake_leaser_add(leaser, init_msg_count=0, assumed_msg_size=42) + + # exactly_once should be enabled + manager._on_response(response) + + assert manager._exactly_once_delivery_enabled() + # new deadline for exactly_once subscriptions should be used + assert manager.ack_deadline == 60 + + +def test__on_response_disable_exactly_once(): + from google.cloud.pubsub_v1.subscriber._protocol import histogram + + manager, _, dispatcher, leaser, _, scheduler = make_running_manager() + manager._callback = mock.sentinel.callback + + manager._flow_control = types.FlowControl( + min_duration_per_lease_extension=histogram.MIN_ACK_DEADLINE + ) + # enable exactly_once + manager._exactly_once_enabled = True + + # Set up the messages. + response = gapic_types.StreamingPullResponse( + received_messages=[ + gapic_types.ReceivedMessage( + ack_id="fack", + message=gapic_types.PubsubMessage(data=b"foo", message_id="1"), + ) + ], + subscription_properties=gapic_types.StreamingPullResponse.SubscriptionProperties( + exactly_once_delivery_enabled=False + ), + ) + + # adjust message bookkeeping in leaser + fake_leaser_add(leaser, init_msg_count=0, assumed_msg_size=42) + + # exactly_once should be disabled + manager._on_response(response) + + assert not manager._exactly_once_enabled + # The deadline configured in flow control should be used, not the + # exactly_once minimum since exactly_once has been disabled. + deadline = manager._obtain_ack_deadline(maybe_update=True) + assert deadline == histogram.MIN_ACK_DEADLINE + + +def test__on_response_exactly_once_immediate_modacks_fail(): + manager, _, dispatcher, leaser, _, scheduler = make_running_manager() + manager._callback = mock.sentinel.callback + + def complete_futures_with_error(*args, **kwargs): + modack_requests = args[0] + for req in modack_requests: + req.future.set_exception( + subscriber_exceptions.AcknowledgeError( + subscriber_exceptions.AcknowledgeStatus.SUCCESS, None + ) + ) + + dispatcher.modify_ack_deadline.side_effect = complete_futures_with_error + + # Set up the messages. + response = gapic_types.StreamingPullResponse( + received_messages=[ + gapic_types.ReceivedMessage( + ack_id="fack", + message=gapic_types.PubsubMessage(data=b"foo", message_id="1"), + ) + ], + subscription_properties=gapic_types.StreamingPullResponse.SubscriptionProperties( + exactly_once_delivery_enabled=True + ), + ) + + # adjust message bookkeeping in leaser + fake_leaser_add(leaser, init_msg_count=0, assumed_msg_size=42) + + # exactly_once should be enabled + manager._on_response(response) + # exceptions are logged, but otherwise no effect + + def test__should_recover_true(): manager = make_manager() @@ -1130,3 +1556,369 @@ def test_activate_ordering_keys_stopped_scheduler(): manager.activate_ordering_keys(["key1", "key2"]) manager._messages_on_hold.activate_ordering_keys.assert_not_called() + + +@mock.patch("grpc_status.rpc_status.from_call") +@mock.patch("google.protobuf.any_pb2.Any.Unpack") +def test_get_ack_errors_unable_to_unpack(from_call, unpack): + st = status_pb2.Status() + st.code = code_pb2.Code.INTERNAL + st.message = "qmsg" + error_info = error_details_pb2.ErrorInfo() + error_info.metadata["ack_1"] = "error1" + st.details.add().Pack(error_info) + mock_gprc_call = mock.Mock(spec=grpc.Call) + exception = exceptions.InternalServerError( + "msg", errors=(), response=mock_gprc_call + ) + from_call.return_value = st + # Unpack() failed + unpack.return_value = None + + assert not streaming_pull_manager._get_ack_errors(exception) + + +@mock.patch("grpc_status.rpc_status.from_call") +def test_get_ack_errors_no_response_obj(from_call): + exception = exceptions.InternalServerError("msg", errors=(), response=None) + # No response obj + assert not streaming_pull_manager._get_ack_errors(exception) + + +@mock.patch("grpc_status.rpc_status.from_call") +def test_get_ack_errors_from_call_returned_none(from_call): + mock_gprc_call = mock.Mock(spec=grpc.Call) + exception = exceptions.InternalServerError( + "msg", errors=(), response=mock_gprc_call + ) + from_call.return_value = None + # rpc_status.from_call() returned None + assert not streaming_pull_manager._get_ack_errors(exception) + + +@mock.patch("grpc_status.rpc_status.from_call") +def test_get_ack_errors_value_error_thrown(from_call): + mock_gprc_call = mock.Mock(spec=grpc.Call) + exception = exceptions.InternalServerError( + "msg", errors=(), response=mock_gprc_call + ) + from_call.side_effect = ValueError("val error msg") + # ValueError thrown, so return None + assert not streaming_pull_manager._get_ack_errors(exception) + + +@mock.patch("grpc_status.rpc_status.from_call") +def test_get_ack_errors_no_error_details(from_call): + st = status_pb2.Status() + st.code = code_pb2.Code.INTERNAL + st.message = "qmsg" + mock_gprc_call = mock.Mock(spec=grpc.Call) + exception = exceptions.InternalServerError( + "msg", errors=(), response=mock_gprc_call + ) + from_call.side_effect = None + from_call.return_value = st + # status has no details to extract exactly-once error info from + assert not streaming_pull_manager._get_ack_errors(exception) + + +@mock.patch("grpc_status.rpc_status.from_call") +def test_get_ack_errors_detail_not_error_info(from_call): + st = status_pb2.Status() + st.code = code_pb2.Code.INTERNAL + st.message = "qmsg" + # pack a dummy status instead of an ErrorInfo + dummy_status = status_pb2.Status() + st.details.add().Pack(dummy_status) + mock_gprc_call = mock.Mock(spec=grpc.Call) + exception = exceptions.InternalServerError( + "msg", errors=(), response=mock_gprc_call + ) + from_call.side_effect = None + from_call.return_value = st + assert not streaming_pull_manager._get_ack_errors(exception) + + +@mock.patch("grpc_status.rpc_status.from_call") +def test_get_ack_errors_happy_case(from_call): + st = status_pb2.Status() + st.code = code_pb2.Code.INTERNAL + st.message = "qmsg" + error_info = error_details_pb2.ErrorInfo() + error_info.metadata["ack_1"] = "error1" + st.details.add().Pack(error_info) + mock_gprc_call = mock.Mock(spec=grpc.Call) + exception = exceptions.InternalServerError( + "msg", errors=(), response=mock_gprc_call + ) + from_call.side_effect = None + from_call.return_value = st + # happy case - errors returned in a map + ack_errors = streaming_pull_manager._get_ack_errors(exception) + assert ack_errors + assert ack_errors["ack_1"] == "error1" + + +def test_process_requests_no_requests(): + # no requests so no items in results lists + ack_reqs_dict = {} + errors_dict = {} + requests_completed, requests_to_retry = streaming_pull_manager._process_requests( + None, ack_reqs_dict, errors_dict + ) + assert not requests_completed + assert not requests_to_retry + + +def test_process_requests_error_dict_is_none(): + # it's valid to pass in `None` for `errors_dict` + ack_reqs_dict = {} + errors_dict = None + requests_completed, requests_to_retry = streaming_pull_manager._process_requests( + None, ack_reqs_dict, errors_dict + ) + assert not requests_completed + assert not requests_to_retry + + +def test_process_requests_no_errors_has_no_future(): + # no errors so request should be completed, even with no future + ack_reqs_dict = { + "ackid1": requests.AckRequest( + ack_id="ackid1", byte_size=0, time_to_ack=20, ordering_key="", future=None + ) + } + errors_dict = {} + requests_completed, requests_to_retry = streaming_pull_manager._process_requests( + None, ack_reqs_dict, errors_dict + ) + assert requests_completed[0].ack_id == "ackid1" + assert not requests_to_retry + + +def test_process_requests_no_errors(): + # no errors so request and its future should be completed + future = futures.Future() + ack_reqs_dict = { + "ackid1": requests.AckRequest( + ack_id="ackid1", byte_size=0, time_to_ack=20, ordering_key="", future=future + ) + } + errors_dict = {} + requests_completed, requests_to_retry = streaming_pull_manager._process_requests( + None, ack_reqs_dict, errors_dict + ) + assert requests_completed[0].ack_id == "ackid1" + assert future.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS + assert not requests_to_retry + + +def test_process_requests_permanent_error_raises_exception(): + # a permanent error raises an exception + future = futures.Future() + ack_reqs_dict = { + "ackid1": requests.AckRequest( + ack_id="ackid1", byte_size=0, time_to_ack=20, ordering_key="", future=future + ) + } + errors_dict = {"ackid1": "PERMANENT_FAILURE_INVALID_ACK_ID"} + requests_completed, requests_to_retry = streaming_pull_manager._process_requests( + None, ack_reqs_dict, errors_dict + ) + assert requests_completed[0].ack_id == "ackid1" + with pytest.raises(subscriber_exceptions.AcknowledgeError) as exc_info: + future.result() + assert ( + exc_info.value.error_code + == subscriber_exceptions.AcknowledgeStatus.INVALID_ACK_ID + ) + assert not requests_to_retry + + +def test_process_requests_transient_error_returns_request(): + # a transient error returns the request in `requests_to_retry` + future = futures.Future() + ack_reqs_dict = { + "ackid1": requests.AckRequest( + ack_id="ackid1", byte_size=0, time_to_ack=20, ordering_key="", future=future + ) + } + errors_dict = {"ackid1": "TRANSIENT_FAILURE_INVALID_ACK_ID"} + requests_completed, requests_to_retry = streaming_pull_manager._process_requests( + None, ack_reqs_dict, errors_dict + ) + assert not requests_completed + assert requests_to_retry[0].ack_id == "ackid1" + assert not future.done() + + +def test_process_requests_unknown_error_raises_exception(): + # an unknown error raises an exception + future = futures.Future() + ack_reqs_dict = { + "ackid1": requests.AckRequest( + ack_id="ackid1", byte_size=0, time_to_ack=20, ordering_key="", future=future + ) + } + errors_dict = {"ackid1": "unknown_error"} + requests_completed, requests_to_retry = streaming_pull_manager._process_requests( + None, ack_reqs_dict, errors_dict + ) + assert requests_completed[0].ack_id == "ackid1" + with pytest.raises(subscriber_exceptions.AcknowledgeError) as exc_info: + future.result() + assert exc_info.value.error_code == subscriber_exceptions.AcknowledgeStatus.OTHER + assert exc_info.value.info == "unknown_error" + assert not requests_to_retry + + +def test_process_requests_permission_denied_error_status_raises_exception(): + # a permission-denied error status raises an exception + future = futures.Future() + ack_reqs_dict = { + "ackid1": requests.AckRequest( + ack_id="ackid1", byte_size=0, time_to_ack=20, ordering_key="", future=future + ) + } + st = status_pb2.Status() + st.code = code_pb2.Code.PERMISSION_DENIED + requests_completed, requests_to_retry = streaming_pull_manager._process_requests( + st, ack_reqs_dict, None + ) + assert requests_completed[0].ack_id == "ackid1" + with pytest.raises(subscriber_exceptions.AcknowledgeError) as exc_info: + future.result() + assert ( + exc_info.value.error_code + == subscriber_exceptions.AcknowledgeStatus.PERMISSION_DENIED + ) + assert exc_info.value.info is None + assert not requests_to_retry + + +def test_process_requests_failed_precondition_error_status_raises_exception(): + # a failed-precondition error status raises an exception + future = futures.Future() + ack_reqs_dict = { + "ackid1": requests.AckRequest( + ack_id="ackid1", byte_size=0, time_to_ack=20, ordering_key="", future=future + ) + } + st = status_pb2.Status() + st.code = code_pb2.Code.FAILED_PRECONDITION + requests_completed, requests_to_retry = streaming_pull_manager._process_requests( + st, ack_reqs_dict, None + ) + assert requests_completed[0].ack_id == "ackid1" + with pytest.raises(subscriber_exceptions.AcknowledgeError) as exc_info: + future.result() + assert ( + exc_info.value.error_code + == subscriber_exceptions.AcknowledgeStatus.FAILED_PRECONDITION + ) + assert exc_info.value.info is None + assert not requests_to_retry + + +def test_process_requests_other_error_status_raises_exception(): + # an unrecognized error status raises an exception + future = futures.Future() + ack_reqs_dict = { + "ackid1": requests.AckRequest( + ack_id="ackid1", byte_size=0, time_to_ack=20, ordering_key="", future=future + ) + } + st = status_pb2.Status() + st.code = code_pb2.Code.OUT_OF_RANGE + requests_completed, requests_to_retry = streaming_pull_manager._process_requests( + st, ack_reqs_dict, None + ) + assert requests_completed[0].ack_id == "ackid1" + with pytest.raises(subscriber_exceptions.AcknowledgeError) as exc_info: + future.result() + assert exc_info.value.error_code == subscriber_exceptions.AcknowledgeStatus.OTHER + assert not requests_to_retry + + +def test_process_requests_mixed_success_and_failure_acks(): + # mixed success and failure (acks) + future1 = futures.Future() + future2 = futures.Future() + future3 = futures.Future() + ack_reqs_dict = { + "ackid1": requests.AckRequest( + ack_id="ackid1", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=future1, + ), + "ackid2": requests.AckRequest( + ack_id="ackid2", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=future2, + ), + "ackid3": requests.AckRequest( + ack_id="ackid3", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=future3, + ), + } + errors_dict = { + "ackid1": "PERMANENT_FAILURE_INVALID_ACK_ID", + "ackid2": "TRANSIENT_FAILURE_INVALID_ACK_ID", + } + requests_completed, requests_to_retry = streaming_pull_manager._process_requests( + None, ack_reqs_dict, errors_dict + ) + # message with ack_id 'ackid1' fails with an exception + assert requests_completed[0].ack_id == "ackid1" + with pytest.raises(subscriber_exceptions.AcknowledgeError) as exc_info: + future1.result() + assert ( + exc_info.value.error_code + == subscriber_exceptions.AcknowledgeStatus.INVALID_ACK_ID + ) + # message with ack_id 'ackid2' is to be retried + assert requests_to_retry[0].ack_id == "ackid2" + assert not requests_to_retry[0].future.done() + # message with ack_id 'ackid3' succeeds + assert requests_completed[1].ack_id == "ackid3" + assert future3.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS + + +def test_process_requests_mixed_success_and_failure_modacks(): + # mixed success and failure (modacks) + future1 = futures.Future() + future2 = futures.Future() + future3 = futures.Future() + ack_reqs_dict = { + "ackid1": requests.ModAckRequest(ack_id="ackid1", seconds=60, future=future1), + "ackid2": requests.ModAckRequest(ack_id="ackid2", seconds=60, future=future2), + "ackid3": requests.ModAckRequest(ack_id="ackid3", seconds=60, future=future3), + } + errors_dict = { + "ackid1": "PERMANENT_FAILURE_INVALID_ACK_ID", + "ackid2": "TRANSIENT_FAILURE_INVALID_ACK_ID", + } + requests_completed, requests_to_retry = streaming_pull_manager._process_requests( + None, ack_reqs_dict, errors_dict + ) + # message with ack_id 'ackid1' fails with an exception + assert requests_completed[0].ack_id == "ackid1" + with pytest.raises(subscriber_exceptions.AcknowledgeError) as exc_info: + future1.result() + assert ( + exc_info.value.error_code + == subscriber_exceptions.AcknowledgeStatus.INVALID_ACK_ID + ) + # message with ack_id 'ackid2' is to be retried + assert requests_to_retry[0].ack_id == "ackid2" + assert not requests_to_retry[0].future.done() + # message with ack_id 'ackid3' succeeds + assert requests_completed[1].ack_id == "ackid3" + assert future3.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS diff --git a/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py b/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py index 1f60b536d..793ceca3c 100644 --- a/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py +++ b/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py @@ -205,7 +205,7 @@ def test_context_manager_raises_if_closed(creds): expetect_msg = r"(?i).*closed.*cannot.*context manager.*" with pytest.raises(RuntimeError, match=expetect_msg): with client: - pass + pass # pragma: NO COVER def test_api_property_deprecated(creds):