New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial draft on PUBSUB API and tests #879
base: main
Are you sure you want to change the base?
Conversation
python/python/tests/test_pubsub.py
Outdated
request=request)) | ||
all_clients[-1].connect_and_subscribe() | ||
|
||
TestPubSub.publish_and_assert(all_clients) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: Add unsubscription tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
@dataclass | ||
class PubSubMsg: | ||
""" Describes incomming message """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
incomming -> incoming
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
python/python/tests/test_pubsub.py
Outdated
|
||
|
||
MessagesByExactChannel = Dict[str, Dict[str, int]] # channel -> {msg -> cnt} | ||
MessagesByChannelAndChannel = Dict[Tuple[str, Optional[str]], Dict[str, int]] # (channel, pattern) -> {msg -> cnt} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ByChannelAndPattern?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -1232,3 +1234,104 @@ async def zrem( | |||
int, | |||
await self._execute_command(RequestType.Zrem, [key] + members), | |||
) | |||
|
|||
@dataclass | |||
class PubSubMsg: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
docstring example for class' args in python:
@dataclass
class TestClass:
"""This is a test class for dataclasses.
This is the body of the docstring description.
Args:
var_int (int): An integer.
var_str (str): A string.
"""
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what style it it? I find it more complicated then the proposed form. What is the difference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is one of python's most common docstring formats that we use throughout the project
https://www.sphinx-doc.org/en/master/usage/extensions/example_google.html#example-google
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no example for the dataclass in the sphinx doc, "args" are also not applicable to dataclass. I`ll follow the chat-gtp proposed style
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Pattern = 2 | ||
""" Use channel name patterns """ | ||
Sharded = 3 | ||
""" Use sharded PUBSUB. See https://redis.io/docs/interact/pubsub/#sharded-pubsub for more details """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: remove redundant space
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
channels_or_patterns: Set[str], | ||
channel_mode: ChannelModes, | ||
callback: Callable[[PubSubMsg, Optional[Any]]], | ||
context: Optional[Any]) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what should the context be?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
anything to pass to callback together with the new message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so it isn't clear from this documentation:
context: User-provided context for this subscription
also the typing seems off, it should be Callable[[arg_type1, arg_type2], return_type]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
context: User-provided context for this subscription | ||
|
||
Examples: | ||
>>> await client.subscribe({"local-news"}, ChannelModes.Exact, process_news_message, None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can add a simple example for process_news_message too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
See https://redis.io/docs/interact/pubsub for more details. | ||
|
||
Args: | ||
channels_or_patterns: Set of channels or patterns to subscibe to. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the args docstring should include typing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
async def publish(self, | ||
message: str, | ||
channels: Set[str], | ||
sharded: bool = False) -> int: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is the API different here? with sharded as a bool and all
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we review this file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not following - we are reviewing this file :) Different from what
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, my bad. publish can't be done with patterns. this is why the api is different from subscribe :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
python/python/tests/test_pubsub.py
Outdated
self.cluster_mode = cluster_mode | ||
self.request = request | ||
|
||
self.redis_client: CoreCommands = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CoreCommands? should be TRedisClient
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do I need union? - I am only using the CoreCommands methods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
python/python/tests/test_pubsub.py
Outdated
@staticmethod | ||
def handle_new_message(message: CoreCommands.PubSubMsg, client: TestPubSub.PubSubClient) -> None: | ||
key = (message.channel, message.pattern) | ||
if key not in client.messages_received: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
key = (message.channel, message.pattern)
channel_or_pattern_msgs = client.messages_received.setdefault(key, {})
if message not in channel_or_pattern_msgs:
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
niiice TY
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
channels_or_patterns: Set[str], | ||
channel_mode: ChannelModes, | ||
callback: Callable[[PubSubMsg, Optional[Any]]], | ||
context: Optional[Any]) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why returning None and not OK?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dont we use Exceptions to signal errors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just saw redis' doc "When successful, this command doesn't return anything", so ignore this comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
python/python/tests/test_pubsub.py
Outdated
client.messages_received[key][message] += 1 | ||
|
||
|
||
async def connect_and_subscribe(self) -> List[Any]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix return type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
python/python/tests/test_pubsub.py
Outdated
async def connect_and_subscribe(self) -> List[Any]: | ||
self.redis_client = await create_client(self.request, self.cluster_mode) | ||
|
||
for channel_mode, channels_or_patterns in self.subsciptions: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in self.subsciptions.items()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing tests for re-subscription -
- subscribe to channel foo with callback a & context a', subscribe to channel foo with callback b & context b', verify that only b is called with b'.
- subscribe to channels foo & bar, subscribe to channel foo, check that callback a is called for messages on bar, callback b for messages on foo.
- what should happen if you subscribe to pattern foo, and then subscribe to channel foo?
probably more combination of actions that I didn't consider now.
meaning subsequent subsciption for the same channel/pattern will override the previous association, dereferencing the original callback object | ||
Note1: Overlapping subscriptions produced by combinations of Exact and Pattern modes | ||
count as distinc subscriptions and will produce duplicated messages. | ||
Note2: Patterns are not applicable in Sharded mode. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't very clear, for anyone not familiar with the details. If you want to say this, it needs to be explained better.
Maybe
"Patterns are not applicable in Sharded mode. That is, it is not possible to create a sharded subscription with a pattern, and pattern/exact subscriptions won't receive messages which were published with `spublish`"
or some similar phrasing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
Args: | ||
message: Message to publish | ||
channels: Set of channels or patterns to publish the message on. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIS Both PUBLISH
& SPUBLISH
only publish on a single channel, not a set. neither publishes on patterns. Do you want to expand our functionality beyond what Redis offers? why?
https://redis.io/commands/publish/
https://redis.io/commands/spublish/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I dont want, the arg should be a single channel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
sharded: Use sharded PUBSUB mode. | ||
|
||
Returns: | ||
int: Number of clients that received the message. //TODO: Consider None, do we really want it? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the Redis API, why give less?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its a weak hint, but I agree
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
""" | ||
pass | ||
|
||
# async def get_pubsub_message(self) -> PubSubMsg: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
python/python/tests/test_pubsub.py
Outdated
|
||
def calculate_expected_messages(self, all_clients: List[TestPubSub.PubSubClient]) -> None: | ||
for sender in all_clients: | ||
for sharded_channel, sharded_channel_messages in sender.sharded_messages_to_publish.items(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the nested loops can be flattened using flat_map or list comprehensions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it becomes harder to understand the code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
python/python/tests/test_pubsub.py
Outdated
""" PUBSUB basic happy case using exact channel names - tests that clients receive all the messages using exact subscribe to a single channel """ | ||
|
||
CHANNEL_NAME = "test-channel" | ||
CLIENTS_COUNT = 5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the simple tests should have only a single client - if they fail, we want debugging to also be simple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The easiest functional setting is 2, will change to that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is 2 easier than 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
of course it is not easier, it is more functional
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does "more functional" mean here?
it does more? I agree, but a test should do as little as possible, in order to focus the coder on the source of the error if the test fails. In that sense, adding also gets, sets, and other calls to this test would make it more functional, but it doesn't help the test be focused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not agree that the test has to do as little as possible. The value of such tests are pretty low.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also think we shall have a basic and simple test for each option (sharded/pattern/exact), a simple subscribe publish unsbuscribe. We shall have more complicated tests as well, but having a straightforward simple test will give us better understanding with failures
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The value of simple tests is that if they fail, it's easy to understand why they fail. The value of complex tests is that they can catch multiple points of failure. The value of a combination of complex & simple tests is that if the simple test failed, you can start by fixing it, and that will save you time debugging a more complex test - but if the simple test didn't fail, it still saves you time debugging the complex test, because it removes at least one scenario that needs to be considered. If you have multiple simple tests, this value aggregates.
So, we're not saying that you should have only simple tests, but please - ALSO add simple tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added based test test_pubsub_basic()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
python/python/tests/test_pubsub.py
Outdated
@pytest.mark.parametrize("cluster_mode", [True, False]) | ||
@pytest.mark.parametrize("sharded_pubsub", [True, False]) | ||
async def test_pubsub_channel_boundary_exact(self, request, cluster_mode, sharded_pubsub): | ||
""" Tests that messages do not cross channel boundaries by exact subsctiption """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand what this test does
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clients listening on "channel-x" wont get messages published on "channel-y"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this test? this is enforced by the server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because we might have bugs...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what bugs do you expect this test to catch? "because we might have bugs" doesn't explain which tests are right and which aren't. The number of tests that haven't been written far exceeds those that have been - there's some kind of criteria that explain which tests are meaningful and which aren't.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bugs that may lead to leakage of messages across the channels. This is something we have to test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
python/python/tests/test_pubsub.py
Outdated
|
||
@pytest.mark.parametrize("cluster_mode", [True, False]) | ||
async def test_pubsub_channel_boundary_pattern(self, request, cluster_mode): | ||
""" Tests that messages do not cross channel boundaries by pattern subsctiption """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above but involving patterns
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
python/python/tests/test_pubsub.py
Outdated
@pytest.mark.parametrize("cluster_mode", [True, False]) | ||
@pytest.mark.parametrize("sharded_pubsub", [True, False]) | ||
async def test_pubsub_callbacks_per_channel(self, request, cluster_mode, sharded_pubsub): | ||
""" Tests that callbacks are per channel """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should also check that the correct context is passed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
we also need tests for resubscription after disconnects / partial disconnects / failover. |
Args: | ||
message: Message to publish | ||
channels: Set of channels or patterns to publish the message on. | ||
sharded: Use sharded PUBSUB mode. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add a link to https://redis.io/commands/spublish/, for users who don't know what sharded pubsub is and what this option really says
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
elaborated, done
callback=TestPubSub.PubSubClient.handle_new_message, | ||
context=self) | ||
|
||
def assert_client(self) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert_client => verify_received_expected_messages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rejecting, assert_client is more suitable in my opinion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert_client
- what? the name is doesn't explain what the function does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it asserts client. Explanatory enough, done
channel_and_pattern = sharded_channel, None | ||
if channel_and_pattern not in self.messages_to_receive: | ||
self.messages_to_receive[channel_and_pattern] = {} | ||
for message, cnt in sharded_channel_messages.items(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...
channel_and_pattern = sharded_channel, None
for message, cnt in sharded_channel_messages.items():
channel_messages = self.messages_to_receive.setdefault(channel_and_pattern, {})
channel_messages.setdefault(message, 0)
channel_messages[message] += cnt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice, done
python/python/tests/test_pubsub.py
Outdated
coroutines = [] | ||
for client in all_clients: | ||
coroutines += client.sched_publish_messages() | ||
asyncio.wait(coroutines) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think https://docs.python.org/3/library/asyncio-task.html#asyncio.gather is what you need here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed for now, done
python/python/tests/test_pubsub.py
Outdated
@staticmethod | ||
async def publish_and_assert(all_clients: List[TestPubSub.PubSubClient]): | ||
for client in all_clients: | ||
client.publish_messages() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing await
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
python/python/tests/test_pubsub.py
Outdated
|
||
# TODO: Allow async fanning out? | ||
def sched_publish_messages(self) -> List[Awaitable[None]]: | ||
# coroutines = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why commented out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed, done
python/python/tests/test_pubsub.py
Outdated
context=None) | ||
|
||
await redis_client.publish(message="hi", channels={"test-channel-1"}, sharded=sharded_pubsub) | ||
# FIXME - wait for receive with TO |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
take a look on the init_future and init_callback in the create function in redis_client.py, you can use this pattern
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i will use a simple async sleep to allow the backend to complete the sending
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
…assing. 3. Python DEVELOPER.md fixes to ignore .env folder when running linters
message (str): Incoming message. | ||
channel (str): Name of an channel that triggered the message. | ||
pattern (Optional[str]): Pattern that triggered the message. | ||
context (Optional[str]): User-provided context for this subscription. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is the context part of the message? IMO it's a break of encapsulation, and not a part of the message concept. The context should be passed as a second argument to the callback, not hidden inside the message.
Also, why is the context a string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes the usage more convenient - no need to access 2 params in the callback. Redis message concept does not mean it should be bit-exact to the API.
Optional[str] is a bug, will change to Optional[Any]
Done
context: Optional[Any], | ||
) -> None: | ||
""" | ||
Add specific channels or set of channels defined by a pattern to active subscriptions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
specific channels or set of channels
- what does this mean? what's the difference between specific channels and a set of channels?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
read the whole sentence - "...or set of channels defined by a pattern"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rephrased - "Add specific channels or channels patterns to active subscriptions."
Done
meaning subsequent subsciption for the same channel/pattern will override the previous association, | ||
dereferencing the original callback object. | ||
Note1: Overlapping subscriptions produced by combinations of Exact and Pattern modes | ||
count as distinc subscriptions and will produce duplicated messages. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
distinc -> distinct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TY, done
Note1: Overlapping subscriptions produced by combinations of Exact and Pattern modes | ||
count as distinc subscriptions and will produce duplicated messages. | ||
Note2: Patterns are not applicable in Sharded mode. That is, it is not possible to create a sharded subscription with a pattern, | ||
and pattern/exact subscriptions won't receive messages which were published with using Sharded mode. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
published with using Sharded mode -> published in Sharded mode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
missing test: client using RESP2 receives informative error when trying to subscribe, no subscription is created, and connection remains valid. |
|
||
@dataclass | ||
class PubSubMsg: | ||
"""Describes incoming message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: Describes incoming message => Describes incoming PubSub message
|
||
Attributes: | ||
message (str): Incoming message. | ||
channel (str): Name of an channel that triggered the message. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Name of an channel that triggered the message. => The name of the channel that triggered the message.
"""Describes incoming message | ||
|
||
Attributes: | ||
message (str): Incoming message. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incoming message. => Incoming message content
Attributes: | ||
message (str): Incoming message. | ||
channel (str): Name of an channel that triggered the message. | ||
pattern (Optional[str]): Pattern that triggered the message. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The channel pattern that triggered the message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can we have both the channel name and a pattern? shouldn't it be only one of pattern/channel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When subscribed to a pattern, the channel name is being received with new messages?
@@ -1368,3 +1373,111 @@ async def invoke_script( | |||
["foo", "bar"] | |||
""" | |||
return await self._execute_script(script.get_hash(), keys, args) | |||
|
|||
@dataclass | |||
class PubSubMsg: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General note: lets move all pubsub code to a dedicated pubusb.py file under the async_commands folder
channels_or_patterns (Set[str]): Set of channels or patterns to subscibe to. | ||
channel_mode (ChannelModes): Mode of operation. See ChannelModes. | ||
callback (Callable[[CoreCommands.PubSubMsg], None]): Callback to be called for each new message. | ||
For each channel or pattern there is exactly one active callback. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix indent
context (Optional[Any]): User-provided context for this subscription. Will be passed to callback as part of PubSubMsg. | ||
|
||
Examples: | ||
>>> def process_news_message(message: PubSubMsg) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
process_news_message => process_new_message
|
||
Examples: | ||
>>> def process_news_message(message: PubSubMsg) -> None: | ||
>>> print(f"'{message.context}' received new message '{message.message}' from channel '{message.channel}') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It isn't clear from the documentation that the context will be provided as a member of PubSubMsg, and that the member's name is context and so on.. without the example I wouldn't know this is how it should be used.
Why not providing the context as an argument of the callback function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def process_news_message(message: PubSubMsg, context: Optional[Any]) -> None:
print(f"'{context}' received new message '{message.message}' from channel '{message.channel}')
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would do this a more explanatory example:
def process_new_message(message: PubSubMsg, context: Dict[str, List[str]]):
context.set_default(message.channel, [])
context[message.channel].append(message.message)
Args: | ||
channels_or_patterns (Set[str]): Set of channels or patterns to unsubscibe from. | ||
Empty set unsubscribes from all channels. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove newline
""" | ||
... | ||
|
||
async def unsubscribe( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are adding the redis documentation for all commands, I see you added only a link to https://redis.io/docs/interact/pubsub. We should have a like to https://redis.io/commands/unsubscribe/ (or other commands) as well
await redis_client.subscribe( | ||
channels_or_patterns={CHANNEL_A_NAME}, | ||
channel_mode=( | ||
CoreCommands.ChannelModes.Sharded |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: Can you import ChannelModes, so the code would be cleaner?
await redis_client.publish( | ||
message="hi", channel=CHANNEL_A_NAME, sharded=sharded_pubsub | ||
) | ||
await asyncio.sleep(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from past experience: these pubsub tests with sleep tend to be flakey. Instead, can you wait on a future and pass it as a context?
Issue #, if available:
#218
Description of changes:
This is not compilable draft on PUBSUB APIs (which we already had couple of iteration on with the team) and proposed tests that formalize and validate the semantics of the functionality
The main focus should be put on calculate_expected_messages() method that calculates the expected messages propagation for a given configuration
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.