Skip to content

Commit

Permalink
Merge pull request #332 from nats-io/js-subscribe-limits
Browse files Browse the repository at this point in the history
Fixes to pending_bytes_limit handling
  • Loading branch information
wallyqs committed Aug 15, 2022
2 parents 8703520 + c98efaa commit 4269801
Show file tree
Hide file tree
Showing 5 changed files with 241 additions and 25 deletions.
3 changes: 2 additions & 1 deletion nats/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
Subscription,
)

__version__ = '2.1.4'
__version__ = '2.1.5'
__lang__ = 'python3'
_logger = logging.getLogger(__name__)
PROTOCOL = 1
Expand Down Expand Up @@ -1644,6 +1644,7 @@ async def _process_msg(
return
sub._pending_queue.put_nowait(msg)
except asyncio.QueueFull:
sub._pending_size -= len(msg.data)
await self._error_cb(
errors.SlowConsumerError(
subject=msg.subject, reply=msg.reply, sid=sid, sub=sub
Expand Down
11 changes: 11 additions & 0 deletions nats/aio/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@ def pending_msgs(self) -> int:
"""
return self._pending_queue.qsize()

@property
def pending_bytes(self) -> int:
"""
Size of data sent by the NATS Server that is being buffered
in the pending queue.
"""
return self._pending_size

@property
def delivered(self) -> int:
"""
Expand All @@ -152,6 +160,7 @@ async def next_msg(self, timeout: Optional[float] = 1.0) -> Msg:

async def _next_msg() -> None:
msg = await self._pending_queue.get()
self._pending_size -= len(msg.data)
future.set_result(msg)

task = asyncio.get_running_loop().create_task(_next_msg())
Expand All @@ -165,6 +174,8 @@ async def _next_msg() -> None:
except asyncio.CancelledError:
future.cancel()
task.cancel()
# Call timeout otherwise would get an empty message.
raise errors.TimeoutError

def _start(self, error_cb):
"""
Expand Down
51 changes: 49 additions & 2 deletions nats/js/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
KV_PRE_TEMPLATE = "$KV.{bucket}."
Callback = Callable[['Msg'], Awaitable[None]]

# For JetStream the default pending limits are larger.
DEFAULT_JS_SUB_PENDING_MSGS_LIMIT = 512 * 1024
DEFAULT_JS_SUB_PENDING_BYTES_LIMIT = 256 * 1024 * 1024


class JetStreamContext(JetStreamManager):
"""
Expand Down Expand Up @@ -136,6 +140,9 @@ async def subscribe(
ordered_consumer: bool = False,
idle_heartbeat: Optional[float] = None,
flow_control: bool = False,
pending_msgs_limit: Optional[int] = DEFAULT_JS_SUB_PENDING_MSGS_LIMIT,
pending_bytes_limit: Optional[int
] = DEFAULT_JS_SUB_PENDING_BYTES_LIMIT,
) -> Subscription:
"""Create consumer if needed and push-subscribe to it.
Expand Down Expand Up @@ -289,6 +296,8 @@ async def cb(msg):
manual_ack=manual_ack,
ordered_consumer=ordered_consumer,
consumer=consumer,
pending_msgs_limit=pending_msgs_limit,
pending_bytes_limit=pending_bytes_limit,
)

async def subscribe_bind(
Expand All @@ -299,6 +308,9 @@ async def subscribe_bind(
cb: Optional[Callback] = None,
manual_ack: bool = False,
ordered_consumer: bool = False,
pending_msgs_limit: Optional[int] = DEFAULT_JS_SUB_PENDING_MSGS_LIMIT,
pending_bytes_limit: Optional[int
] = DEFAULT_JS_SUB_PENDING_BYTES_LIMIT,
) -> Subscription:
"""Push-subscribe to an existing consumer.
"""
Expand All @@ -312,6 +324,8 @@ async def subscribe_bind(
subject=config.deliver_subject,
queue=config.deliver_group or "",
cb=cb,
pending_msgs_limit=pending_msgs_limit,
pending_bytes_limit=pending_bytes_limit,
)
psub = JetStreamContext.PushSubscription(self, sub, stream, consumer)

Expand Down Expand Up @@ -345,6 +359,9 @@ async def pull_subscribe(
durable: str,
stream: Optional[str] = None,
config: Optional[api.ConsumerConfig] = None,
pending_msgs_limit: Optional[int] = DEFAULT_JS_SUB_PENDING_MSGS_LIMIT,
pending_bytes_limit: Optional[int
] = DEFAULT_JS_SUB_PENDING_BYTES_LIMIT,
) -> "JetStreamContext.PullSubscription":
"""Create consumer and pull subscription.
Expand Down Expand Up @@ -389,13 +406,21 @@ async def main():
config.durable_name = durable
await self._jsm.add_consumer(stream, config=config)

return await self.pull_subscribe_bind(durable=durable, stream=stream)
return await self.pull_subscribe_bind(
durable=durable,
stream=stream,
pending_bytes_limit=pending_bytes_limit,
pending_msgs_limit=pending_msgs_limit,
)

async def pull_subscribe_bind(
self,
durable: str,
stream: str,
inbox_prefix: bytes = api.INBOX_PREFIX,
pending_msgs_limit: Optional[int] = DEFAULT_JS_SUB_PENDING_MSGS_LIMIT,
pending_bytes_limit: Optional[int
] = DEFAULT_JS_SUB_PENDING_BYTES_LIMIT,
) -> "JetStreamContext.PullSubscription":
"""
pull_subscribe returns a `PullSubscription` that can be delivered messages
Expand Down Expand Up @@ -424,7 +449,11 @@ async def main():
"""
deliver = inbox_prefix + self._nc._nuid.next()
sub = await self._nc.subscribe(deliver.decode())
sub = await self._nc.subscribe(
deliver.decode(),
pending_msgs_limit=pending_msgs_limit,
pending_bytes_limit=pending_bytes_limit
)
return JetStreamContext.PullSubscription(
js=self,
sub=sub,
Expand Down Expand Up @@ -651,6 +680,22 @@ def __init__(
self._nms = f'{prefix}.CONSUMER.MSG.NEXT.{stream}.{consumer}'
self._deliver = deliver.decode()

@property
def pending_msgs(self) -> int:
"""
Number of delivered messages by the NATS Server that are being buffered
in the pending queue.
"""
return self._sub._pending_queue.qsize()

@property
def pending_bytes(self) -> int:
"""
Size of data sent by the NATS Server that is being buffered
in the pending queue.
"""
return self._sub._pending_size

async def unsubscribe(self) -> None:
"""
unsubscribe destroys de inboxes of the pull subscription making it
Expand Down Expand Up @@ -729,6 +774,7 @@ async def _fetch_one(
while not queue.empty():
try:
msg = queue.get_nowait()
self._sub._pending_size -= len(msg.data)
status = JetStreamContext.is_status_msg(msg)
if status:
# Discard status messages at this point since were meant
Expand Down Expand Up @@ -783,6 +829,7 @@ async def _fetch_n(
while not queue.empty():
try:
msg = queue.get_nowait()
self._sub._pending_size -= len(msg.data)
status = JetStreamContext.is_status_msg(msg)
if status:
# Discard status messages at this point since were meant
Expand Down
49 changes: 49 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,14 +575,24 @@ async def test_subscribe_next_msg(self):

for i in range(0, 2):
await nc.publish(f"tests.{i}", b'bar')
await nc.flush()

# A couple of messages would be received then this will unblock.
await asyncio.sleep(1)
assert sub.pending_msgs == 2
assert sub.pending_bytes == 6
msg = await sub.next_msg()
self.assertEqual("tests.0", msg.subject)

assert sub.pending_msgs == 1
assert sub.pending_bytes == 3

msg = await sub.next_msg()
self.assertEqual("tests.1", msg.subject)

assert sub.pending_msgs == 0
assert sub.pending_bytes == 0

# Nothing retrieved this time.
with self.assertRaises(nats.errors.TimeoutError):
await sub.next_msg(timeout=0.5)
Expand All @@ -606,6 +616,45 @@ async def test_subscribe_next_msg(self):

# await future

@async_test
async def test_subscribe_next_msg_custom_limits(self):
errors = []

async def error_cb(err):
errors.append(err)

nc = await nats.connect(error_cb=error_cb)
sub = await nc.subscribe(
'tests.>',
pending_msgs_limit=5,
pending_bytes_limit=-1,
)
await nc.flush()

for i in range(0, 6):
await nc.publish(f"tests.{i}", b'bar')
await nc.flush()

# A couple of messages would be received then this will unblock.
await asyncio.sleep(1)

# There should be one slow consumer error
assert len(errors) == 1
assert type(errors[0]) is nats.errors.SlowConsumerError

assert sub.pending_msgs == 5
assert sub.pending_bytes == 15
msg = await sub.next_msg()
self.assertEqual("tests.0", msg.subject)
assert sub.pending_msgs == 4
assert sub.pending_bytes == 12

for i in range(0, sub.pending_msgs):
await sub.next_msg()
assert sub.pending_msgs == 0
assert sub.pending_bytes == 0
await nc.close()

@async_test
async def test_subscribe_without_coroutine_unsupported(self):
nc = NATS()
Expand Down

0 comments on commit 4269801

Please sign in to comment.