Skip to content

Commit

Permalink
Merge pull request #358 from nats-io/consumer-name
Browse files Browse the repository at this point in the history
js: support consumer with name
  • Loading branch information
wallyqs committed Sep 28, 2022
2 parents 517f480 + 0b35e50 commit 0fcfaf2
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 66 deletions.
8 changes: 8 additions & 0 deletions nats/js/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ class ConsumerConfig(Base):
References:
* `Consumers <https://docs.nats.io/jetstream/concepts/consumers>`_
"""
name: Optional[str] = None
durable_name: Optional[str] = None
description: Optional[str] = None
deliver_policy: Optional[DeliverPolicy] = DeliverPolicy.ALL
Expand All @@ -393,6 +394,9 @@ class ConsumerConfig(Base):
deliver_subject: Optional[str] = None
deliver_group: Optional[str] = None

# Ephemeral inactivity threshold
inactive_threshold: Optional[float] = None # in seconds

# Generally inherited by parent stream and other markers, now can
# be configured directly.
num_replicas: Optional[int] = None
Expand All @@ -404,12 +408,16 @@ class ConsumerConfig(Base):
def from_response(cls, resp: Dict[str, Any]):
cls._convert_nanoseconds(resp, 'ack_wait')
cls._convert_nanoseconds(resp, 'idle_heartbeat')
cls._convert_nanoseconds(resp, 'inactive_threshold')
return super().from_response(resp)

def as_dict(self) -> Dict[str, object]:
result = super().as_dict()
result['ack_wait'] = self._to_nanoseconds(self.ack_wait)
result['idle_heartbeat'] = self._to_nanoseconds(self.idle_heartbeat)
result['inactive_threshold'] = self._to_nanoseconds(
self.inactive_threshold
)
return result


Expand Down
27 changes: 16 additions & 11 deletions nats/js/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,18 +193,23 @@ async def add_consumer(
req_data = json.dumps(req).encode()

resp = None
if durable_name is not None:
resp = await self._api_request(
f"{self._prefix}.CONSUMER.DURABLE.CREATE.{stream}.{durable_name}",
req_data,
timeout=timeout
)
subject = ''
version = self._nc.connected_server_version
consumer_name_supported = version.major >= 2 and version.minor >= 9
if consumer_name_supported and config.name:
# NOTE: Only supported after nats-server v2.9.0
if config.filter_subject and config.filter_subject != ">":
subject = f"{self._prefix}.CONSUMER.CREATE.{stream}.{config.name}.{config.filter_subject}"
else:
subject = f"{self._prefix}.CONSUMER.CREATE.{stream}.{config.name}"
elif durable_name:
# NOTE: Legacy approach to create consumers. After nats-server v2.9
# name option can be used instead.
subject = f"{self._prefix}.CONSUMER.DURABLE.CREATE.{stream}.{durable_name}"
else:
resp = await self._api_request(
f"{self._prefix}.CONSUMER.CREATE.{stream}",
req_data,
timeout=timeout
)
subject = f"{self._prefix}.CONSUMER.CREATE.{stream}"

resp = await self._api_request(subject, req_data, timeout=timeout)
return api.ConsumerInfo.from_response(resp)

async def delete_consumer(self, stream: str, consumer: str) -> bool:
Expand Down
1 change: 0 additions & 1 deletion tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1840,7 +1840,6 @@ async def reconnected_cb():
reconnected.set_result(True)

async def err_cb(e):
print("ERROR: ", e)
nonlocal errors
errors.append(e)

Expand Down
241 changes: 187 additions & 54 deletions tests/test_js.py
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,139 @@ async def test_number_of_consumer_replicas(self):

await nc.close()

@async_test
async def test_consumer_with_name(self):
nc = NATS()
await nc.connect()
js = nc.jetstream()
jsm = nc.jsm()

tsub = await nc.subscribe("$JS.API.CONSUMER.>")

# Create stream.
await jsm.add_stream(name="ctests", subjects=["a", "b", "c.>"])
await js.publish("a", b'hello world!')
await js.publish("b", b'hello world!!')
await js.publish("c.d", b'hello world!!!')
await js.publish("c.d.e", b'hello world!!!!')

# Create ephemeral pull consumer with a name.
stream_name = "ctests"
consumer_name = "ephemeral"
cinfo = await jsm.add_consumer(
stream_name,
name=consumer_name,
ack_policy="explicit",
)
assert cinfo.config.name == consumer_name

msg = await tsub.next_msg()
assert msg.subject == '$JS.API.CONSUMER.CREATE.ctests.ephemeral'

sub = await js.pull_subscribe_bind(consumer_name, stream_name)
msgs = await sub.fetch(1)
assert msgs[0].data == b'hello world!'
ok = await msgs[0].ack_sync()
assert ok

msg = await tsub.next_msg()
assert msg.subject == '$JS.API.CONSUMER.MSG.NEXT.ctests.ephemeral'

# Create durable pull consumer with a name.
consumer_name = "durable"
cinfo = await jsm.add_consumer(
stream_name,
name=consumer_name,
durable_name=consumer_name,
ack_policy="explicit",
)
assert cinfo.config.name == consumer_name
msg = await tsub.next_msg()
assert msg.subject == '$JS.API.CONSUMER.CREATE.ctests.durable'

sub = await js.pull_subscribe_bind(consumer_name, stream_name)
msgs = await sub.fetch(1)
assert msgs[0].data == b'hello world!'
ok = await msgs[0].ack_sync()
assert ok
msg = await tsub.next_msg()
assert msg.subject == '$JS.API.CONSUMER.MSG.NEXT.ctests.durable'

# Create durable pull consumer with a name and a filter_subject
consumer_name = "durable2"
cinfo = await jsm.add_consumer(
stream_name,
name=consumer_name,
durable_name=consumer_name,
filter_subject="b",
ack_policy="explicit",
)
assert cinfo.config.name == consumer_name
msg = await tsub.next_msg()
assert msg.subject == '$JS.API.CONSUMER.CREATE.ctests.durable2.b'

sub = await js.pull_subscribe_bind(consumer_name, stream_name)
msgs = await sub.fetch(1)
assert msgs[0].data == b'hello world!!'
ok = await msgs[0].ack_sync()
assert ok
msg = await tsub.next_msg()
assert msg.subject == '$JS.API.CONSUMER.MSG.NEXT.ctests.durable2'

# Create durable pull consumer with a name and a filter_subject
consumer_name = "durable3"
cinfo = await jsm.add_consumer(
stream_name,
name=consumer_name,
durable_name=consumer_name,
filter_subject=">",
ack_policy="explicit",
)
assert cinfo.config.name == consumer_name
msg = await tsub.next_msg()
assert msg.subject == '$JS.API.CONSUMER.CREATE.ctests.durable3'

sub = await js.pull_subscribe_bind(consumer_name, stream_name)
msgs = await sub.fetch(1)
assert msgs[0].data == b'hello world!'
ok = await msgs[0].ack_sync()
assert ok
msg = await tsub.next_msg()
assert msg.subject == '$JS.API.CONSUMER.MSG.NEXT.ctests.durable3'

# name and durable must match if both present.
with pytest.raises(BadRequestError) as err:
await jsm.add_consumer(
stream_name,
name="name1",
durable_name="name2",
ack_policy="explicit",
)
assert err.value.err_code == 10017
assert err.value.description == 'consumer name in subject does not match durable name in request'

# Create ephemeral pull consumer with a name and inactive threshold.
stream_name = "ctests"
consumer_name = "inactive"
cinfo = await jsm.add_consumer(
stream_name,
name=consumer_name,
ack_policy="explicit",
inactive_threshold=2, # seconds
)
assert cinfo.config.name == consumer_name

sub = await js.pull_subscribe_bind(consumer_name, stream_name)
msgs = await sub.fetch(1)
assert msgs[0].data == b'hello world!'
ok = await msgs[0].ack_sync()
assert ok

cinfo = await sub.consumer_info()
assert cinfo.config.inactive_threshold == 2.0

await nc.close()


class SubscribeTest(SingleJetStreamServerTestCase):

Expand Down Expand Up @@ -2634,60 +2767,60 @@ async def test_account_limits(self):

# Check unmarshalling response with Tiers:
blob = """{
"type": "io.nats.jetstream.api.v1.account_info_response",
"memory": 0,
"storage": 6829550,
"streams": 1,
"consumers": 0,
"limits": {
"max_memory": 0,
"max_storage": 0,
"max_streams": 0,
"max_consumers": 0,
"max_ack_pending": 0,
"memory_max_stream_bytes": 0,
"storage_max_stream_bytes": 0,
"max_bytes_required": false
},
"domain": "ngs",
"api": {
"total": 6,
"errors": 0
},
"tiers": {
"R1": {
"memory": 0,
"storage": 6829550,
"streams": 1,
"consumers": 0,
"limits": {
"max_memory": 0,
"max_storage": 2000000000000,
"max_streams": 100,
"max_consumers": 1000,
"max_ack_pending": -1,
"memory_max_stream_bytes": -1,
"storage_max_stream_bytes": -1,
"max_bytes_required": true
}
},
"R3": {
"memory": 0,
"storage": 0,
"streams": 0,
"consumers": 0,
"limits": {
"max_memory": 0,
"max_storage": 500000000000,
"max_streams": 25,
"max_consumers": 250,
"max_ack_pending": -1,
"memory_max_stream_bytes": -1,
"storage_max_stream_bytes": -1,
"max_bytes_required": true
}
}
}}
"type": "io.nats.jetstream.api.v1.account_info_response",
"memory": 0,
"storage": 6829550,
"streams": 1,
"consumers": 0,
"limits": {
"max_memory": 0,
"max_storage": 0,
"max_streams": 0,
"max_consumers": 0,
"max_ack_pending": 0,
"memory_max_stream_bytes": 0,
"storage_max_stream_bytes": 0,
"max_bytes_required": false
},
"domain": "ngs",
"api": {
"total": 6,
"errors": 0
},
"tiers": {
"R1": {
"memory": 0,
"storage": 6829550,
"streams": 1,
"consumers": 0,
"limits": {
"max_memory": 0,
"max_storage": 2000000000000,
"max_streams": 100,
"max_consumers": 1000,
"max_ack_pending": -1,
"memory_max_stream_bytes": -1,
"storage_max_stream_bytes": -1,
"max_bytes_required": true
}
},
"R3": {
"memory": 0,
"storage": 0,
"streams": 0,
"consumers": 0,
"limits": {
"max_memory": 0,
"max_storage": 500000000000,
"max_streams": 25,
"max_consumers": 250,
"max_ack_pending": -1,
"memory_max_stream_bytes": -1,
"storage_max_stream_bytes": -1,
"max_bytes_required": true
}
}
}}
"""

expected = nats.js.api.AccountInfo(
Expand Down

0 comments on commit 0fcfaf2

Please sign in to comment.