Skip to content

Commit

Permalink
feat: exactly-once delivery support (#550)
Browse files Browse the repository at this point in the history
  • Loading branch information
pradn committed Mar 4, 2022
1 parent ee7286e commit 2fb6e15
Show file tree
Hide file tree
Showing 18 changed files with 1,837 additions and 154 deletions.
1 change: 1 addition & 0 deletions google/cloud/pubsub_v1/proto/pubsub.proto
Expand Up @@ -1164,6 +1164,7 @@ message StreamingPullRequest {
message StreamingPullResponse {
// Subscription properties sent as part of the response.
message SubscriptionProperties {
bool exactly_once_delivery_enabled = 1;
// True iff message ordering is enabled for this subscription.
bool message_ordering_enabled = 2;
}
Expand Down
145 changes: 130 additions & 15 deletions google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
Expand Up @@ -15,17 +15,19 @@
from __future__ import absolute_import
from __future__ import division

import functools
import itertools
import logging
import math
import time
import threading
import typing
from typing import List, Optional, Sequence, Union
import warnings
from google.api_core.retry import exponential_sleep_generator

from google.cloud.pubsub_v1.subscriber._protocol import helper_threads
from google.cloud.pubsub_v1.subscriber._protocol import requests
from google.pubsub_v1 import types as gapic_types

if typing.TYPE_CHECKING: # pragma: NO COVER
import queue
Expand Down Expand Up @@ -66,6 +68,14 @@
IDs at a time.
"""

_MIN_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS = 1
"""The time to wait for the first retry of failed acks and modacks when exactly-once
delivery is enabled."""

_MAX_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS = 10 * 60
"""The maximum amount of time in seconds to retry failed acks and modacks when
exactly-once delivery is enabled."""


class Dispatcher(object):
def __init__(self, manager: "StreamingPullManager", queue: "queue.Queue"):
Expand Down Expand Up @@ -168,17 +178,66 @@ def ack(self, items: Sequence[requests.AckRequest]) -> None:

# We must potentially split the request into multiple smaller requests
# to avoid the server-side max request size limit.
ack_ids = (item.ack_id for item in items)
items_gen = iter(items)
ack_ids_gen = (item.ack_id for item in items)
total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE))

for _ in range(total_chunks):
request = gapic_types.StreamingPullRequest(
ack_ids=itertools.islice(ack_ids, _ACK_IDS_BATCH_SIZE)
ack_reqs_dict = {
req.ack_id: req
for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE)
}
requests_completed, requests_to_retry = self._manager.send_unary_ack(
ack_ids=list(itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE)),
ack_reqs_dict=ack_reqs_dict,
)

# Remove the completed messages from lease management.
self.drop(requests_completed)

# Retry on a separate thread so the dispatcher thread isn't blocked
# by sleeps.
if requests_to_retry:
self._start_retry_thread(
"Thread-RetryAcks",
functools.partial(self._retry_acks, requests_to_retry),
)

def _start_retry_thread(self, thread_name, thread_target):
# note: if the thread is *not* a daemon, a memory leak exists due to a cpython issue.
# https://github.com/googleapis/python-pubsub/issues/395#issuecomment-829910303
# https://github.com/googleapis/python-pubsub/issues/395#issuecomment-830092418
retry_thread = threading.Thread(
name=thread_name, target=thread_target, daemon=True,
)
# The thread finishes when the requests succeed or eventually fail with
# a back-end timeout error or other permanent failure.
retry_thread.start()

def _retry_acks(self, requests_to_retry):
retry_delay_gen = exponential_sleep_generator(
initial=_MIN_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS,
maximum=_MAX_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS,
)
while requests_to_retry:
time_to_wait = next(retry_delay_gen)
_LOGGER.debug(
"Retrying {len(requests_to_retry)} ack(s) after delay of "
+ str(time_to_wait)
+ " seconds"
)
self._manager.send(request)
time.sleep(time_to_wait)

# Remove the message from lease management.
self.drop(items)
ack_reqs_dict = {req.ack_id: req for req in requests_to_retry}
requests_completed, requests_to_retry = self._manager.send_unary_ack(
ack_ids=[req.ack_id for req in requests_to_retry],
ack_reqs_dict=ack_reqs_dict,
)
assert (
len(requests_to_retry) <= _ACK_IDS_BATCH_SIZE
), "Too many requests to be retried."
# Remove the completed messages from lease management.
self.drop(requests_completed)

def drop(
self,
Expand Down Expand Up @@ -215,16 +274,58 @@ def modify_ack_deadline(self, items: Sequence[requests.ModAckRequest]) -> None:
"""
# We must potentially split the request into multiple smaller requests
# to avoid the server-side max request size limit.
ack_ids = (item.ack_id for item in items)
seconds = (item.seconds for item in items)
items_gen = iter(items)
ack_ids_gen = (item.ack_id for item in items)
deadline_seconds_gen = (item.seconds for item in items)
total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE))

for _ in range(total_chunks):
request = gapic_types.StreamingPullRequest(
modify_deadline_ack_ids=itertools.islice(ack_ids, _ACK_IDS_BATCH_SIZE),
modify_deadline_seconds=itertools.islice(seconds, _ACK_IDS_BATCH_SIZE),
ack_reqs_dict = {
req.ack_id: req
for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE)
}
# no further work needs to be done for `requests_to_retry`
requests_completed, requests_to_retry = self._manager.send_unary_modack(
modify_deadline_ack_ids=list(
itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE)
),
modify_deadline_seconds=list(
itertools.islice(deadline_seconds_gen, _ACK_IDS_BATCH_SIZE)
),
ack_reqs_dict=ack_reqs_dict,
)
assert (
len(requests_to_retry) <= _ACK_IDS_BATCH_SIZE
), "Too many requests to be retried."

# Retry on a separate thread so the dispatcher thread isn't blocked
# by sleeps.
if requests_to_retry:
self._start_retry_thread(
"Thread-RetryModAcks",
functools.partial(self._retry_modacks, requests_to_retry),
)

def _retry_modacks(self, requests_to_retry):
retry_delay_gen = exponential_sleep_generator(
initial=_MIN_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS,
maximum=_MAX_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS,
)
while requests_to_retry:
time_to_wait = next(retry_delay_gen)
_LOGGER.debug(
"Retrying {len(requests_to_retry)} modack(s) after delay of "
+ str(time_to_wait)
+ " seconds"
)
time.sleep(time_to_wait)

ack_reqs_dict = {req.ack_id: req for req in requests_to_retry}
requests_completed, requests_to_retry = self._manager.send_unary_modack(
modify_deadline_ack_ids=[req.ack_id for req in requests_to_retry],
modify_deadline_seconds=[req.seconds for req in requests_to_retry],
ack_reqs_dict=ack_reqs_dict,
)
self._manager.send(request)

def nack(self, items: Sequence[requests.NackRequest]) -> None:
"""Explicitly deny receipt of messages.
Expand All @@ -233,6 +334,20 @@ def nack(self, items: Sequence[requests.NackRequest]) -> None:
items: The items to deny.
"""
self.modify_ack_deadline(
[requests.ModAckRequest(ack_id=item.ack_id, seconds=0) for item in items]
[
requests.ModAckRequest(
ack_id=item.ack_id, seconds=0, future=item.future
)
for item in items
]
)
self.drop(
[
requests.DropRequest(
ack_id=item.ack_id,
byte_size=item.byte_size,
ordering_key=item.ordering_key,
)
for item in items
]
)
self.drop([requests.DropRequest(*item) for item in items])
7 changes: 3 additions & 4 deletions google/cloud/pubsub_v1/subscriber/_protocol/leaser.py
Expand Up @@ -181,7 +181,7 @@ def maintain_leases(self) -> None:
for item in to_drop:
leased_messages.pop(item.ack_id)

# Create a streaming pull request.
# Create a modack request.
# We do not actually call `modify_ack_deadline` over and over
# because it is more efficient to make a single request.
ack_ids = leased_messages.keys()
Expand All @@ -194,9 +194,8 @@ def maintain_leases(self) -> None:
# way for ``send_request`` to fail when the consumer
# is inactive.
assert self._manager.dispatcher is not None
self._manager.dispatcher.modify_ack_deadline(
[requests.ModAckRequest(ack_id, deadline) for ack_id in ack_ids]
)
ack_id_gen = (ack_id for ack_id in ack_ids)
self._manager._send_lease_modacks(ack_id_gen, deadline)

# Now wait an appropriate period of time and do this again.
#
Expand Down
7 changes: 7 additions & 0 deletions google/cloud/pubsub_v1/subscriber/_protocol/requests.py
Expand Up @@ -12,8 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import typing
from typing import NamedTuple, Optional

if typing.TYPE_CHECKING: # pragma: NO COVER
from google.cloud.pubsub_v1.subscriber import futures


# Namedtuples for management requests. Used by the Message class to communicate
# items of work back to the policy.
Expand All @@ -22,6 +26,7 @@ class AckRequest(NamedTuple):
byte_size: int
time_to_ack: float
ordering_key: Optional[str]
future: Optional["futures.Future"]


class DropRequest(NamedTuple):
Expand All @@ -39,9 +44,11 @@ class LeaseRequest(NamedTuple):
class ModAckRequest(NamedTuple):
ack_id: str
seconds: float
future: Optional["futures.Future"]


class NackRequest(NamedTuple):
ack_id: str
byte_size: int
ordering_key: Optional[str]
future: Optional["futures.Future"]

0 comments on commit 2fb6e15

Please sign in to comment.