Skip to content

Commit

Permalink
Merge branch 'release/v2.7.2'
Browse files Browse the repository at this point in the history
  • Loading branch information
wallyqs committed Feb 27, 2024
2 parents 98beca6 + 10d2cc6 commit ae09ea4
Show file tree
Hide file tree
Showing 6 changed files with 392 additions and 107 deletions.
2 changes: 1 addition & 1 deletion nats/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
)
from .transport import TcpTransport, Transport, WebSocketTransport

__version__ = '2.7.0'
__version__ = '2.7.2'
__lang__ = 'python3'
_logger = logging.getLogger(__name__)
PROTOCOL = 1
Expand Down
10 changes: 9 additions & 1 deletion nats/js/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,9 @@ def as_dict(self) -> Dict[str, object]:
if self.sources:
result['sources'] = [src.as_dict() for src in self.sources]
if self.compression and (self.compression != StoreCompression.NONE and self.compression != StoreCompression.S2):
raise ValueError("nats: invalid store compression type: %s" % self.compression)
raise ValueError(
"nats: invalid store compression type: %s" % self.compression
)
if self.metadata and not isinstance(self.metadata, dict):
raise ValueError("nats: invalid metadata format")
return result
Expand Down Expand Up @@ -439,6 +441,7 @@ class ConsumerConfig(Base):
ack_policy: Optional[AckPolicy] = AckPolicy.EXPLICIT
ack_wait: Optional[float] = None # in seconds
max_deliver: Optional[int] = None
backoff: Optional[List[float]] = None # in seconds, overrides ack_wait
filter_subject: Optional[str] = None
filter_subjects: Optional[List[str]] = None
replay_policy: Optional[ReplayPolicy] = ReplayPolicy.INSTANT
Expand All @@ -452,6 +455,7 @@ class ConsumerConfig(Base):

# Push based consumers.
deliver_subject: Optional[str] = None
# Push based queue consumers.
deliver_group: Optional[str] = None

# Ephemeral inactivity threshold
Expand All @@ -472,6 +476,8 @@ def from_response(cls, resp: Dict[str, Any]):
cls._convert_nanoseconds(resp, 'ack_wait')
cls._convert_nanoseconds(resp, 'idle_heartbeat')
cls._convert_nanoseconds(resp, 'inactive_threshold')
if 'backoff' in resp:
resp['backoff'] = [val / _NANOSECOND for val in resp['backoff']]
return super().from_response(resp)

def as_dict(self) -> Dict[str, object]:
Expand All @@ -481,6 +487,8 @@ def as_dict(self) -> Dict[str, object]:
result['inactive_threshold'] = self._to_nanoseconds(
self.inactive_threshold
)
if self.backoff:
result['backoff'] = [self._to_nanoseconds(i) for i in self.backoff]
return result


Expand Down
123 changes: 99 additions & 24 deletions nats/js/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from nats.aio.msg import Msg
from nats.aio.subscription import Subscription
from nats.js import api
from nats.js.errors import BadBucketError, BucketNotFoundError, InvalidBucketNameError, NotFoundError
from nats.js.errors import BadBucketError, BucketNotFoundError, InvalidBucketNameError, NotFoundError, FetchTimeoutError
from nats.js.kv import KeyValue
from nats.js.manager import JetStreamManager
from nats.js.object_store import (
Expand Down Expand Up @@ -547,6 +547,13 @@ def _is_temporary_error(cls, status: Optional[str]) -> bool:
else:
return False

@classmethod
def _is_heartbeat(cls, status: Optional[str]) -> bool:
if status == api.StatusCode.CONTROL_MESSAGE:
return True
else:
return False

@classmethod
def _time_until(cls, timeout: Optional[float],
start_time: float) -> Optional[float]:
Expand Down Expand Up @@ -620,9 +627,7 @@ async def activity_check(self):
self._active = False
if not active:
if self._ordered:
await self.reset_ordered_consumer(
self._sseq + 1
)
await self.reset_ordered_consumer(self._sseq + 1)
except asyncio.CancelledError:
break

Expand Down Expand Up @@ -882,14 +887,18 @@ async def consumer_info(self) -> api.ConsumerInfo:
)
return info

async def fetch(self,
batch: int = 1,
timeout: Optional[float] = 5) -> List[Msg]:
async def fetch(
self,
batch: int = 1,
timeout: Optional[float] = 5,
heartbeat: Optional[float] = None
) -> List[Msg]:
"""
fetch makes a request to JetStream to be delivered a set of messages.
:param batch: Number of messages to fetch from server.
:param timeout: Max duration of the fetch request before it expires.
:param heartbeat: Idle Heartbeat interval in seconds for the fetch request.
::
Expand Down Expand Up @@ -925,15 +934,16 @@ async def main():
timeout * 1_000_000_000
) - 100_000 if timeout else None
if batch == 1:
msg = await self._fetch_one(expires, timeout)
msg = await self._fetch_one(expires, timeout, heartbeat)
return [msg]
msgs = await self._fetch_n(batch, expires, timeout)
msgs = await self._fetch_n(batch, expires, timeout, heartbeat)
return msgs

async def _fetch_one(
self,
expires: Optional[int],
timeout: Optional[float],
heartbeat: Optional[float] = None
) -> Msg:
queue = self._sub._pending_queue

Expand All @@ -957,37 +967,66 @@ async def _fetch_one(
next_req['batch'] = 1
if expires:
next_req['expires'] = int(expires)
if heartbeat:
next_req['idle_heartbeat'] = int(
heartbeat * 1_000_000_000
) # to nanoseconds

await self._nc.publish(
self._nms,
json.dumps(next_req).encode(),
self._deliver,
)

# Wait for the response or raise timeout.
msg = await self._sub.next_msg(timeout)

# Should have received at least a processable message at this point,
status = JetStreamContext.is_status_msg(msg)
start_time = time.monotonic()
got_any_response = False
while True:
try:
deadline = JetStreamContext._time_until(
timeout, start_time
)
# Wait for the response or raise timeout.
msg = await self._sub.next_msg(timeout=deadline)

if status:
# In case of a temporary error, treat it as a timeout to retry.
if JetStreamContext._is_temporary_error(status):
raise nats.errors.TimeoutError
else:
# Any other type of status message is an error.
raise nats.js.errors.APIError.from_msg(msg)
return msg
# Should have received at least a processable message at this point,
status = JetStreamContext.is_status_msg(msg)
if status:
if JetStreamContext._is_heartbeat(status):
got_any_response = True
continue

# In case of a temporary error, treat it as a timeout to retry.
if JetStreamContext._is_temporary_error(status):
raise nats.errors.TimeoutError
else:
# Any other type of status message is an error.
raise nats.js.errors.APIError.from_msg(msg)
else:
return msg
except asyncio.TimeoutError:
deadline = JetStreamContext._time_until(
timeout, start_time
)
if deadline is not None and deadline < 0:
# No response from the consumer could have been
# due to a reconnect while the fetch request,
# the JS API not responding on time, or maybe
# there were no messages yet.
if got_any_response:
raise FetchTimeoutError
raise

async def _fetch_n(
self,
batch: int,
expires: Optional[int],
timeout: Optional[float],
heartbeat: Optional[float] = None
) -> List[Msg]:
msgs = []
queue = self._sub._pending_queue
start_time = time.monotonic()
got_any_response = False
needed = batch

# Fetch as many as needed from the internal pending queue.
Expand All @@ -1013,6 +1052,10 @@ async def _fetch_n(
next_req['batch'] = needed
if expires:
next_req['expires'] = expires
if heartbeat:
next_req['idle_heartbeat'] = int(
heartbeat * 1_000_000_000
) # to nanoseconds
next_req['no_wait'] = True
await self._nc.publish(
self._nms,
Expand All @@ -1024,12 +1067,20 @@ async def _fetch_n(
try:
msg = await self._sub.next_msg(timeout)
except asyncio.TimeoutError:
# Return any message that was already available in the internal queue.
if msgs:
return msgs
raise

got_any_response = False

status = JetStreamContext.is_status_msg(msg)
if JetStreamContext._is_processable_msg(status, msg):
if JetStreamContext._is_heartbeat(status):
# Mark that we got any response from the server so this is not
# a possible i/o timeout error or due to a disconnection.
got_any_response = True
pass
elif JetStreamContext._is_processable_msg(status, msg):
# First processable message received, do not raise error from now.
msgs.append(msg)
needed -= 1
Expand All @@ -1045,6 +1096,10 @@ async def _fetch_n(
# No more messages after this so fallthrough
# after receiving the rest.
break
elif JetStreamContext._is_heartbeat(status):
# Skip heartbeats.
got_any_response = True
continue
elif JetStreamContext._is_processable_msg(status, msg):
needed -= 1
msgs.append(msg)
Expand All @@ -1063,6 +1118,11 @@ async def _fetch_n(
next_req['batch'] = needed
if expires:
next_req['expires'] = expires
if heartbeat:
next_req['idle_heartbeat'] = int(
heartbeat * 1_000_000_000
) # to nanoseconds

await self._nc.publish(
self._nms,
json.dumps(next_req).encode(),
Expand All @@ -1083,7 +1143,12 @@ async def _fetch_n(
if len(msgs) == 0:
# Not a single processable message has been received so far,
# if this timed out then let the error be raised.
msg = await self._sub.next_msg(timeout=deadline)
try:
msg = await self._sub.next_msg(timeout=deadline)
except asyncio.TimeoutError:
if got_any_response:
raise FetchTimeoutError
raise
else:
try:
msg = await self._sub.next_msg(timeout=deadline)
Expand All @@ -1093,6 +1158,10 @@ async def _fetch_n(

if msg:
status = JetStreamContext.is_status_msg(msg)
if JetStreamContext._is_heartbeat(status):
got_any_response = True
continue

if not status:
needed -= 1
msgs.append(msg)
Expand All @@ -1116,6 +1185,9 @@ async def _fetch_n(

msg = await self._sub.next_msg(timeout=deadline)
status = JetStreamContext.is_status_msg(msg)
if JetStreamContext._is_heartbeat(status):
got_any_response = True
continue
if JetStreamContext._is_processable_msg(status, msg):
needed -= 1
msgs.append(msg)
Expand All @@ -1124,6 +1196,9 @@ async def _fetch_n(
# at least one message has already arrived.
pass

if len(msgs) == 0 and got_any_response:
raise FetchTimeoutError

return msgs

######################
Expand Down
11 changes: 10 additions & 1 deletion nats/js/errors.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2016-2022 The NATS Authors
# Copyright 2016-2024 The NATS Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -133,6 +133,15 @@ def __str__(self) -> str:
return "nats: no response from stream"


class FetchTimeoutError(nats.errors.TimeoutError):
"""
Raised if the consumer timed out waiting for messages.
"""

def __str__(self) -> str:
return "nats: fetch timeout"


class ConsumerSequenceMismatchError(Error):
"""
Async error raised by the client with idle_heartbeat mode enabled
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# These are here for GitHub's dependency graph and help with setuptools support in some environments.
setup(
name="nats-py",
version='2.7.0',
version='2.7.2',
license='Apache 2 License',
extras_require={
'nkeys': ['nkeys'],
Expand Down

0 comments on commit ae09ea4

Please sign in to comment.