Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature batch receive #1464

Closed
OfferLifted opened this issue May 12, 2024 · 2 comments
Closed

Feature batch receive #1464

OfferLifted opened this issue May 12, 2024 · 2 comments

Comments

@OfferLifted
Copy link

Currently there is no way of the client receiving more than 1 message at a time from the .messages queue (as far as I'm aware).
In the documentation I couldn't find anything related to the deque used by .messages on the WebSocketCommonProtocol. Probably because it's used under the hood and shouldn't be used as a public attribute.
Consider a scenario where bursts of messages are sent by the server and are received by the client using:

async for msg in websocket:
    process(msg)

Under the hood this uses the __aiter__ which in turn yields await self.recv().
This is all completely fine however it would be nice to have the option to receive a batch of messages if the client just received a burst of messages in its queue so that a batch/burst can be processed at once.
Now you probably don't want to always dump out all data in the queue. If you application got backed up and the queue is full it's probably a bad idea to get out the completely filled queue at once so perhaps a max batch size argument can be provided somewhere to control it (maybe using the max_queue parameter).

I hacked together something primitive that kinda works (no max batch size it just dumps out the entire queue). Consider it a proof of concept.

Add a recv_batch function:

        
    async def recv_batch(self) -> List[Data]:
        """
        Receive all available messages.

        This method collects all messages available in the queue at the time of call and returns them as a list.
        If the queue is empty and the connection is open, it waits for at least one message.

        Like the original `recv` method, when the connection is closed, this method raises
        :exc:`~websockets.exceptions.ConnectionClosed`.

        Canceling this method is safe. There's no risk of losing messages as the next invocation will return them.

        This can still be used with :func:`~asyncio.timeout` or :func:`~asyncio.wait_for` to enforce timeouts.

        Returns:
            List[Data]: A list of messages, each being a string (:class:`str`) for a Text frame or a bytestring (:class:`bytes`)
            for a Binary frame.

        Raises:
            ConnectionClosed: When the connection is closed.
            RuntimeError: If two coroutines call this method concurrently.
        """
        if self._pop_message_waiter is not None:
            raise RuntimeError(
                "cannot call recv_batch while another coroutine "
                "is already waiting for the next message"
            )

        messages = []
        while len(self.messages) <= 0:
            pop_message_waiter: asyncio.Future[None] = self.loop.create_future()
            self._pop_message_waiter = pop_message_waiter
            try:
                await asyncio.wait(
                    [pop_message_waiter, self.transfer_data_task],
                    return_when=asyncio.FIRST_COMPLETED,
                )
            finally:
                self._pop_message_waiter = None

            if not pop_message_waiter.done():
                if self.legacy_recv:
                    return messages
                else:
                    await self.ensure_open()

        # Collect all available messages from the queue.
        while self.messages:
            message = self.messages.popleft()
            messages.append(message)

            if self._put_message_waiter is not None:
                self._put_message_waiter.set_result(None)
                self._put_message_waiter = None

        return messages

change the __aiter__ to yield await the new batch_recv() instead of self.recv()


    async def __aiter__(self) -> AsyncIterator[Data]:
        """
        Iterate on incoming messages.

        The iterator exits normally when the connection is closed with the close
        code 1000 (OK) or 1001 (going away) or without a close code.

        It raises a :exc:`~websockets.exceptions.ConnectionClosedError`
        exception when the connection is closed with any other code.

        """
        try:
            while True:
                yield await self.recv_batch()
        except ConnectionClosedOK:
            return

Again this is a temporary hacked together solution and might break all sorts of things I didn't think about or have some big performance drawbacks.

Having this new __aiter__ we can now again use the following syntax:

async for msg in websocket:
    process(msg)

Instead of getting out 1 message (data) at a time we now get a list of messages (data).

@aaugustin
Copy link
Member

I'm not quite sure how this API would work in practice. You could easily get counter-intuitive results.

Let's say the server starts sending a "batch" of events. websockets starts receiving them. How does it know that it has reached the end of the batch?

If you know how many events are in the batch, just receive that many events with a loop.

If you don't know, well... I'm quite unsure about what the behavior should be and I don't want to provide an API with ill-defined or potentially confusing semantics.

If you really want to receive "whatever has already arrived", you can receive events in a loop with a very short timeout.

(untested code -- treat it as pseudo-code)

messages = []
try:
    while True:
        messages.append(await asyncio.wait_for(websocket.recv(), timeout=0.01))
except TimeoutError:
    pass

@aaugustin
Copy link
Member

To be clear: I'm leaning against adding this API because I don't think that the semantics are obvious enough.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants