You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Currently there is no way of the client receiving more than 1 message at a time from the .messages queue (as far as I'm aware).
In the documentation I couldn't find anything related to the deque used by .messages on the WebSocketCommonProtocol. Probably because it's used under the hood and shouldn't be used as a public attribute.
Consider a scenario where bursts of messages are sent by the server and are received by the client using:
async for msg in websocket:
process(msg)
Under the hood this uses the __aiter__ which in turn yields await self.recv().
This is all completely fine however it would be nice to have the option to receive a batch of messages if the client just received a burst of messages in its queue so that a batch/burst can be processed at once.
Now you probably don't want to always dump out all data in the queue. If you application got backed up and the queue is full it's probably a bad idea to get out the completely filled queue at once so perhaps a max batch size argument can be provided somewhere to control it (maybe using the max_queue parameter).
I hacked together something primitive that kinda works (no max batch size it just dumps out the entire queue). Consider it a proof of concept.
Add a recv_batch function:
async def recv_batch(self) -> List[Data]:
"""
Receive all available messages.
This method collects all messages available in the queue at the time of call and returns them as a list.
If the queue is empty and the connection is open, it waits for at least one message.
Like the original `recv` method, when the connection is closed, this method raises
:exc:`~websockets.exceptions.ConnectionClosed`.
Canceling this method is safe. There's no risk of losing messages as the next invocation will return them.
This can still be used with :func:`~asyncio.timeout` or :func:`~asyncio.wait_for` to enforce timeouts.
Returns:
List[Data]: A list of messages, each being a string (:class:`str`) for a Text frame or a bytestring (:class:`bytes`)
for a Binary frame.
Raises:
ConnectionClosed: When the connection is closed.
RuntimeError: If two coroutines call this method concurrently.
"""
if self._pop_message_waiter is not None:
raise RuntimeError(
"cannot call recv_batch while another coroutine "
"is already waiting for the next message"
)
messages = []
while len(self.messages) <= 0:
pop_message_waiter: asyncio.Future[None] = self.loop.create_future()
self._pop_message_waiter = pop_message_waiter
try:
await asyncio.wait(
[pop_message_waiter, self.transfer_data_task],
return_when=asyncio.FIRST_COMPLETED,
)
finally:
self._pop_message_waiter = None
if not pop_message_waiter.done():
if self.legacy_recv:
return messages
else:
await self.ensure_open()
# Collect all available messages from the queue.
while self.messages:
message = self.messages.popleft()
messages.append(message)
if self._put_message_waiter is not None:
self._put_message_waiter.set_result(None)
self._put_message_waiter = None
return messages
change the __aiter__ to yield await the new batch_recv() instead of self.recv()
async def __aiter__(self) -> AsyncIterator[Data]:
"""
Iterate on incoming messages.
The iterator exits normally when the connection is closed with the close
code 1000 (OK) or 1001 (going away) or without a close code.
It raises a :exc:`~websockets.exceptions.ConnectionClosedError`
exception when the connection is closed with any other code.
"""
try:
while True:
yield await self.recv_batch()
except ConnectionClosedOK:
return
Again this is a temporary hacked together solution and might break all sorts of things I didn't think about or have some big performance drawbacks.
Having this new __aiter__ we can now again use the following syntax:
async for msg in websocket:
process(msg)
Instead of getting out 1 message (data) at a time we now get a list of messages (data).
The text was updated successfully, but these errors were encountered:
I'm not quite sure how this API would work in practice. You could easily get counter-intuitive results.
Let's say the server starts sending a "batch" of events. websockets starts receiving them. How does it know that it has reached the end of the batch?
If you know how many events are in the batch, just receive that many events with a loop.
If you don't know, well... I'm quite unsure about what the behavior should be and I don't want to provide an API with ill-defined or potentially confusing semantics.
If you really want to receive "whatever has already arrived", you can receive events in a loop with a very short timeout.
Currently there is no way of the client receiving more than 1 message at a time from the .messages queue (as far as I'm aware).
In the documentation I couldn't find anything related to the deque used by .messages on the
WebSocketCommonProtocol
. Probably because it's used under the hood and shouldn't be used as a public attribute.Consider a scenario where bursts of messages are sent by the server and are received by the client using:
Under the hood this uses the
__aiter__
which in turn yieldsawait self.recv()
.This is all completely fine however it would be nice to have the option to receive a batch of messages if the client just received a burst of messages in its queue so that a batch/burst can be processed at once.
Now you probably don't want to always dump out all data in the queue. If you application got backed up and the queue is full it's probably a bad idea to get out the completely filled queue at once so perhaps a max batch size argument can be provided somewhere to control it (maybe using the max_queue parameter).
I hacked together something primitive that kinda works (no max batch size it just dumps out the entire queue). Consider it a proof of concept.
Add a
recv_batch
function:change the
__aiter__
to yield await the new batch_recv() instead of self.recv()Again this is a temporary hacked together solution and might break all sorts of things I didn't think about or have some big performance drawbacks.
Having this new
__aiter__
we can now again use the following syntax:Instead of getting out 1 message (data) at a time we now get a list of messages (data).
The text was updated successfully, but these errors were encountered: