Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial draft on PUBSUB API and tests #879

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open

Conversation

ikolomi
Copy link
Contributor

@ikolomi ikolomi commented Jan 30, 2024

Issue #, if available:
#218

Description of changes:
This is not compilable draft on PUBSUB APIs (which we already had couple of iteration on with the team) and proposed tests that formalize and validate the semantics of the functionality

The main focus should be put on calculate_expected_messages() method that calculates the expected messages propagation for a given configuration

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

request=request))
all_clients[-1].connect_and_subscribe()

TestPubSub.publish_and_assert(all_clients)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Add unsubscription tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


@dataclass
class PubSubMsg:
""" Describes incomming message """
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incomming -> incoming

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done



MessagesByExactChannel = Dict[str, Dict[str, int]] # channel -> {msg -> cnt}
MessagesByChannelAndChannel = Dict[Tuple[str, Optional[str]], Dict[str, int]] # (channel, pattern) -> {msg -> cnt}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ByChannelAndPattern?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -1232,3 +1234,104 @@ async def zrem(
int,
await self._execute_command(RequestType.Zrem, [key] + members),
)

@dataclass
class PubSubMsg:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docstring example for class' args in python:

@dataclass
class TestClass:
    """This is a test class for dataclasses.

    This is the body of the docstring description.

    Args:
        var_int (int): An integer.
        var_str (str): A string.

    """

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what style it it? I find it more complicated then the proposed form. What is the difference?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is one of python's most common docstring formats that we use throughout the project
https://www.sphinx-doc.org/en/master/usage/extensions/example_google.html#example-google

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no example for the dataclass in the sphinx doc, "args" are also not applicable to dataclass. I`ll follow the chat-gtp proposed style

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Pattern = 2
""" Use channel name patterns """
Sharded = 3
""" Use sharded PUBSUB. See https://redis.io/docs/interact/pubsub/#sharded-pubsub for more details """
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: remove redundant space

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

channels_or_patterns: Set[str],
channel_mode: ChannelModes,
callback: Callable[[PubSubMsg, Optional[Any]]],
context: Optional[Any]) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what should the context be?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

anything to pass to callback together with the new message

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so it isn't clear from this documentation:

context: User-provided context for this subscription

also the typing seems off, it should be Callable[[arg_type1, arg_type2], return_type]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

context: User-provided context for this subscription

Examples:
>>> await client.subscribe({"local-news"}, ChannelModes.Exact, process_news_message, None)
Copy link
Contributor

@barshaul barshaul Jan 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can add a simple example for process_news_message too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

See https://redis.io/docs/interact/pubsub for more details.

Args:
channels_or_patterns: Set of channels or patterns to subscibe to.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the args docstring should include typing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

async def publish(self,
message: str,
channels: Set[str],
sharded: bool = False) -> int:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the API different here? with sharded as a bool and all

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we review this file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not following - we are reviewing this file :) Different from what

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, my bad. publish can't be done with patterns. this is why the api is different from subscribe :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

self.cluster_mode = cluster_mode
self.request = request

self.redis_client: CoreCommands = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CoreCommands? should be TRedisClient

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do I need union? - I am only using the CoreCommands methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@staticmethod
def handle_new_message(message: CoreCommands.PubSubMsg, client: TestPubSub.PubSubClient) -> None:
key = (message.channel, message.pattern)
if key not in client.messages_received:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

key = (message.channel, message.pattern)
channel_or_pattern_msgs = client.messages_received.setdefault(key, {})
if message not in channel_or_pattern_msgs:
   ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

niiice TY

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

channels_or_patterns: Set[str],
channel_mode: ChannelModes,
callback: Callable[[PubSubMsg, Optional[Any]]],
context: Optional[Any]) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why returning None and not OK?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dont we use Exceptions to signal errors?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just saw redis' doc "When successful, this command doesn't return anything", so ignore this comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

client.messages_received[key][message] += 1


async def connect_and_subscribe(self) -> List[Any]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix return type

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

async def connect_and_subscribe(self) -> List[Any]:
self.redis_client = await create_client(self.request, self.cluster_mode)

for channel_mode, channels_or_patterns in self.subsciptions:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in self.subsciptions.items()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

@shachlanAmazon shachlanAmazon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing tests for re-subscription -

  1. subscribe to channel foo with callback a & context a', subscribe to channel foo with callback b & context b', verify that only b is called with b'.
  2. subscribe to channels foo & bar, subscribe to channel foo, check that callback a is called for messages on bar, callback b for messages on foo.
  3. what should happen if you subscribe to pattern foo, and then subscribe to channel foo?

probably more combination of actions that I didn't consider now.

python/python/glide/async_commands/core.py Show resolved Hide resolved
meaning subsequent subsciption for the same channel/pattern will override the previous association, dereferencing the original callback object
Note1: Overlapping subscriptions produced by combinations of Exact and Pattern modes
count as distinc subscriptions and will produce duplicated messages.
Note2: Patterns are not applicable in Sharded mode.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't very clear, for anyone not familiar with the details. If you want to say this, it needs to be explained better.
Maybe
"Patterns are not applicable in Sharded mode. That is, it is not possible to create a sharded subscription with a pattern, and pattern/exact subscriptions won't receive messages which were published with `spublish`"

or some similar phrasing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


Args:
message: Message to publish
channels: Set of channels or patterns to publish the message on.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIS Both PUBLISH & SPUBLISH only publish on a single channel, not a set. neither publishes on patterns. Do you want to expand our functionality beyond what Redis offers? why?

https://redis.io/commands/publish/
https://redis.io/commands/spublish/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I dont want, the arg should be a single channel

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

sharded: Use sharded PUBSUB mode.

Returns:
int: Number of clients that received the message. //TODO: Consider None, do we really want it?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the Redis API, why give less?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a weak hint, but I agree

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

"""
pass

# async def get_pubsub_message(self) -> PubSubMsg:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


def calculate_expected_messages(self, all_clients: List[TestPubSub.PubSubClient]) -> None:
for sender in all_clients:
for sharded_channel, sharded_channel_messages in sender.sharded_messages_to_publish.items():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the nested loops can be flattened using flat_map or list comprehensions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it becomes harder to understand the code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

""" PUBSUB basic happy case using exact channel names - tests that clients receive all the messages using exact subscribe to a single channel """

CHANNEL_NAME = "test-channel"
CLIENTS_COUNT = 5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the simple tests should have only a single client - if they fail, we want debugging to also be simple.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The easiest functional setting is 2, will change to that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is 2 easier than 1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

of course it is not easier, it is more functional

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does "more functional" mean here?
it does more? I agree, but a test should do as little as possible, in order to focus the coder on the source of the error if the test fails. In that sense, adding also gets, sets, and other calls to this test would make it more functional, but it doesn't help the test be focused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not agree that the test has to do as little as possible. The value of such tests are pretty low.

Copy link
Contributor

@barshaul barshaul Feb 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think we shall have a basic and simple test for each option (sharded/pattern/exact), a simple subscribe publish unsbuscribe. We shall have more complicated tests as well, but having a straightforward simple test will give us better understanding with failures

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value of simple tests is that if they fail, it's easy to understand why they fail. The value of complex tests is that they can catch multiple points of failure. The value of a combination of complex & simple tests is that if the simple test failed, you can start by fixing it, and that will save you time debugging a more complex test - but if the simple test didn't fail, it still saves you time debugging the complex test, because it removes at least one scenario that needs to be considered. If you have multiple simple tests, this value aggregates.

So, we're not saying that you should have only simple tests, but please - ALSO add simple tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added based test test_pubsub_basic()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("sharded_pubsub", [True, False])
async def test_pubsub_channel_boundary_exact(self, request, cluster_mode, sharded_pubsub):
""" Tests that messages do not cross channel boundaries by exact subsctiption """
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what this test does

Copy link
Contributor Author

@ikolomi ikolomi Feb 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clients listening on "channel-x" wont get messages published on "channel-y"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this test? this is enforced by the server.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we might have bugs...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what bugs do you expect this test to catch? "because we might have bugs" doesn't explain which tests are right and which aren't. The number of tests that haven't been written far exceeds those that have been - there's some kind of criteria that explain which tests are meaningful and which aren't.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bugs that may lead to leakage of messages across the channels. This is something we have to test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


@pytest.mark.parametrize("cluster_mode", [True, False])
async def test_pubsub_channel_boundary_pattern(self, request, cluster_mode):
""" Tests that messages do not cross channel boundaries by pattern subsctiption """
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above but involving patterns

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("sharded_pubsub", [True, False])
async def test_pubsub_callbacks_per_channel(self, request, cluster_mode, sharded_pubsub):
""" Tests that callbacks are per channel """
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should also check that the correct context is passed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@shachlanAmazon
Copy link
Contributor

we also need tests for resubscription after disconnects / partial disconnects / failover.

Args:
message: Message to publish
channels: Set of channels or patterns to publish the message on.
sharded: Use sharded PUBSUB mode.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a link to https://redis.io/commands/spublish/, for users who don't know what sharded pubsub is and what this option really says

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

elaborated, done

callback=TestPubSub.PubSubClient.handle_new_message,
context=self)

def assert_client(self) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert_client => verify_received_expected_messages

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rejecting, assert_client is more suitable in my opinion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert_client - what? the name is doesn't explain what the function does.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it asserts client. Explanatory enough, done

channel_and_pattern = sharded_channel, None
if channel_and_pattern not in self.messages_to_receive:
self.messages_to_receive[channel_and_pattern] = {}
for message, cnt in sharded_channel_messages.items():
Copy link
Contributor

@barshaul barshaul Feb 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...
channel_and_pattern = sharded_channel, None
for message, cnt in sharded_channel_messages.items():
    channel_messages = self.messages_to_receive.setdefault(channel_and_pattern, {})
    channel_messages.setdefault(message, 0)
    channel_messages[message] += cnt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, done

coroutines = []
for client in all_clients:
coroutines += client.sched_publish_messages()
asyncio.wait(coroutines)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed for now, done

@staticmethod
async def publish_and_assert(all_clients: List[TestPubSub.PubSubClient]):
for client in all_clients:
client.publish_messages()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing await

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


# TODO: Allow async fanning out?
def sched_publish_messages(self) -> List[Awaitable[None]]:
# coroutines = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why commented out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed, done

context=None)

await redis_client.publish(message="hi", channels={"test-channel-1"}, sharded=sharded_pubsub)
# FIXME - wait for receive with TO
Copy link
Contributor

@barshaul barshaul Feb 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

take a look on the init_future and init_callback in the create function in redis_client.py, you can use this pattern

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i will use a simple async sleep to allow the backend to complete the sending

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

…assing. 3. Python DEVELOPER.md fixes to ignore .env folder when running linters
message (str): Incoming message.
channel (str): Name of an channel that triggered the message.
pattern (Optional[str]): Pattern that triggered the message.
context (Optional[str]): User-provided context for this subscription.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the context part of the message? IMO it's a break of encapsulation, and not a part of the message concept. The context should be passed as a second argument to the callback, not hidden inside the message.
Also, why is the context a string?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes the usage more convenient - no need to access 2 params in the callback. Redis message concept does not mean it should be bit-exact to the API.
Optional[str] is a bug, will change to Optional[Any]
Done

context: Optional[Any],
) -> None:
"""
Add specific channels or set of channels defined by a pattern to active subscriptions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

specific channels or set of channels - what does this mean? what's the difference between specific channels and a set of channels?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read the whole sentence - "...or set of channels defined by a pattern"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rephrased - "Add specific channels or channels patterns to active subscriptions."
Done

meaning subsequent subsciption for the same channel/pattern will override the previous association,
dereferencing the original callback object.
Note1: Overlapping subscriptions produced by combinations of Exact and Pattern modes
count as distinc subscriptions and will produce duplicated messages.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

distinc -> distinct

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TY, done

Note1: Overlapping subscriptions produced by combinations of Exact and Pattern modes
count as distinc subscriptions and will produce duplicated messages.
Note2: Patterns are not applicable in Sharded mode. That is, it is not possible to create a sharded subscription with a pattern,
and pattern/exact subscriptions won't receive messages which were published with using Sharded mode.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

published with using Sharded mode -> published in Sharded mode

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@shachlanAmazon
Copy link
Contributor

missing test: client using RESP2 receives informative error when trying to subscribe, no subscription is created, and connection remains valid.


@dataclass
class PubSubMsg:
"""Describes incoming message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: Describes incoming message => Describes incoming PubSub message


Attributes:
message (str): Incoming message.
channel (str): Name of an channel that triggered the message.
Copy link
Contributor

@barshaul barshaul Feb 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Name of an channel that triggered the message. => The name of the channel that triggered the message.

"""Describes incoming message

Attributes:
message (str): Incoming message.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incoming message. => Incoming message content

Attributes:
message (str): Incoming message.
channel (str): Name of an channel that triggered the message.
pattern (Optional[str]): Pattern that triggered the message.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The channel pattern that triggered the message.

Copy link
Contributor

@barshaul barshaul Feb 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we have both the channel name and a pattern? shouldn't it be only one of pattern/channel?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When subscribed to a pattern, the channel name is being received with new messages?

@@ -1368,3 +1373,111 @@ async def invoke_script(
["foo", "bar"]
"""
return await self._execute_script(script.get_hash(), keys, args)

@dataclass
class PubSubMsg:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General note: lets move all pubsub code to a dedicated pubusb.py file under the async_commands folder

channels_or_patterns (Set[str]): Set of channels or patterns to subscibe to.
channel_mode (ChannelModes): Mode of operation. See ChannelModes.
callback (Callable[[CoreCommands.PubSubMsg], None]): Callback to be called for each new message.
For each channel or pattern there is exactly one active callback.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix indent

context (Optional[Any]): User-provided context for this subscription. Will be passed to callback as part of PubSubMsg.

Examples:
>>> def process_news_message(message: PubSubMsg) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

process_news_message => process_new_message


Examples:
>>> def process_news_message(message: PubSubMsg) -> None:
>>> print(f"'{message.context}' received new message '{message.message}' from channel '{message.channel}')
Copy link
Contributor

@barshaul barshaul Feb 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't clear from the documentation that the context will be provided as a member of PubSubMsg, and that the member's name is context and so on.. without the example I wouldn't know this is how it should be used.
Why not providing the context as an argument of the callback function?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def process_news_message(message: PubSubMsg, context: Optional[Any]) -> None:
   print(f"'{context}' received new message '{message.message}' from channel '{message.channel}')

Copy link
Contributor

@barshaul barshaul Feb 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would do this a more explanatory example:

def process_new_message(message: PubSubMsg, context: Dict[str, List[str]]):
    context.set_default(message.channel, [])
    context[message.channel].append(message.message)

Args:
channels_or_patterns (Set[str]): Set of channels or patterns to unsubscibe from.
Empty set unsubscribes from all channels.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove newline

"""
...

async def unsubscribe(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are adding the redis documentation for all commands, I see you added only a link to https://redis.io/docs/interact/pubsub. We should have a like to https://redis.io/commands/unsubscribe/ (or other commands) as well

await redis_client.subscribe(
channels_or_patterns={CHANNEL_A_NAME},
channel_mode=(
CoreCommands.ChannelModes.Sharded
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: Can you import ChannelModes, so the code would be cleaner?

await redis_client.publish(
message="hi", channel=CHANNEL_A_NAME, sharded=sharded_pubsub
)
await asyncio.sleep(1)
Copy link
Contributor

@barshaul barshaul Feb 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from past experience: these pubsub tests with sleep tend to be flakey. Instead, can you wait on a future and pass it as a context?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants