Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MemoryObjectStream can drop items when the receiving end is cancelled #728

Open
2 tasks done
agronholm opened this issue May 9, 2024 · 7 comments · May be fixed by #735
Open
2 tasks done

MemoryObjectStream can drop items when the receiving end is cancelled #728

agronholm opened this issue May 9, 2024 · 7 comments · May be fixed by #735
Labels
bug Something isn't working

Comments

@agronholm
Copy link
Owner

agronholm commented May 9, 2024

Things to check first

  • I have searched the existing issues and didn't find my bug already reported there

  • I have checked that my bug is still present in the latest release

AnyIO version

4.3.0

Python version

3.8

What happened?

If a task (A) sends an item to another task (B) via a memory object stream, and task B is in a state of waiting for an item, and has a pending cancellation, the item is still sent to B but as cancellation is then delivered to B, that item is essentially dropped on the floor.

A similar issue was reported in #146, but it seems that it wasn't fixed as thoroughly as I had hoped.

How can we reproduce the bug?

import anyio
from anyio import (
    CancelScope, create_memory_object_stream, create_task_group,
    wait_all_tasks_blocked
)

async def receiver(receive, task_status):
    with CancelScope() as cancel_scope:
        task_status.started(cancel_scope)
        await receive.receive()

async def main():
    send, receive = create_memory_object_stream(1)
    with send, receive:
        async with create_task_group() as tg:
            cancel_scope = await tg.start(receiver, receive)
            await wait_all_tasks_blocked()
            cancel_scope.cancel()
            send.send_nowait("item")

        assert receive.receive_nowait() == "item"

anyio.run(main)

The above snippet reproduces the problem (WouldBlock is raised) on both asyncio and Trio. On Trio, if create_memory_object_stream() is replaced with trio.open_memory_channel(), the snippet no longer raises an exception.

@agronholm agronholm added the bug Something isn't working label May 9, 2024
@gschaffner
Copy link
Collaborator

did you mean to include that assert in your reproducer? I would expect assert receive.receive_nowait() == "item", if reachable, to raise WouldBlock:

  • receive_nowait() can only receive an item from a buffer or from a waiting sender, but in your reproducer there is neither: the stream is unbuffered and there are no await send() calls, only send_nowait() calls.

  • looking at history, there's a test relevant to this: I think that your reproducer's assertion is asserting the same thing that test_cancel_during_receive_last_receiver did. test_cancel_during_receive_last_receiver was removed in a3af1da because it was deemed incorrect. IIUC it was incorrect because closing an unbuffered memory object stream pair is supposed to be guaranteed to not drop items (there is no buffer, so there are no items to possibly drop). in the ~day between bd9a310 and a3af1da, AnyIO could push items beyond the buffer's limit, meaning that closing an unbuffered stream pair could incorrectly cause items to be dropped. but a3af1da fixed this and removed the incorrect test.

@gschaffner
Copy link
Collaborator

gschaffner commented May 11, 2024

if I remove that assertion from your reproducer, then I believe your reproducer demonstrates at least one bug, but possibly two:

  1. the bug of AnyIO dropping the item in your reproducer.

  2. the (possible) bug of send_nowait() raising WouldBlock when using trio_memory_channel.send_nowait() but not raising WouldBlock when using anyio_memory_stream.send_nowait().

    unlike (1), this does not cause items to be lost.

I think it's worth discussing these two things separately because (1) is the urgent issue (data loss), and it can be fixed first without doing the harder thing of also fixing (2). in more detail:

  1. this was a regression that was introduced in Added an alternate fix for MemoryObjectReceiveStream.receive()` on asyncio #595. Added an alternate fix for MemoryObjectReceiveStream.receive()` on asyncio #595 as whole should not be totally reverted, as it fixed On asyncio, Event.set() sometimes fails to notify all waiting tasks #536. but partially reverting it would fix (1) and prevent item loss: here's a PR for that: Fix memory streams incorrectly raising cancelled when *_nowait() is called immediately after cancelling send()/receive() #729

  2. it's not clear to me at the moment if this is a bug or not. regarding the send_nowait() and await receive() calls, I'd expect one of two things to happen. either of these options would be fine in the sense that neither of them can cause AnyIO to drop items:

    1. send_nowait() returns success and await receive.receive() receives the item.

      this is what AnyIO did prior to Added an alternate fix for MemoryObjectReceiveStream.receive()` on asyncio #595, when (1) regressed.

      currently this is almost what AnyIO is doing, except that currently send_nowait() can go through while await receive.receive() raises cancelled, losing an item (i.e. (1)).

      this behavior can be explained as: while the await receive() call's scope gets a cancellation request before send_nowait(), there's no checkpoint between cancel_scope.cancel() and send_nowait(), so the send_nowait() call will happen and will succeed before await receive() has a chance to react to the request for cancellation. at that point, receive() has already received an item, so it's too late for it to raise cancelled (see also Pass along the received item to the next receiver if the task was cancelled #147 (comment)).

    2. send_nowait() raises WouldBlock and await receive.receive() raises cancelled.

      this is the behavior when replacing the AnyIO memory stream with a Trio memory channel. I don't know what Trio's design reason for this is (assuming this was explicitly intentional), but the technical reason is that Trio memory channels are not implemented using Event; they're implemented using wait_task_rescheduled. specifically, Object stream randomly drops items #146 (comment).

    question: is this behavior of Trio intentional/documented? i.e., does Trio document that (ii) will happen and (i) will not? or does Trio leave it undocumented, and it is just an implementation detail whether open_memory_channel is using Event or wait_task_rescheduled under the hood?

    in the case that this is a deemed to be a bug (i.e. if Trio does document this behavior and so AnyIO should have identical behavior to Trio's documented behavior), then i agree with your gitter message that

    I think this could be solved if the sender could skip recipients that have been cancelled

    doing that in a backend agnostic manner is the tough part though :)

    I think that one backend-agnostic manner to do this could be to implement anyio.lowlevel.wait_task_rescheduled and have memory streams use that instead of Event.

    also, in the case that this is deemed to be a bug, here's a test: e70b0e4

(1) is the urgent problem (dropped items). since it's straightforward to fix, I think that it should be fixed first, and whether (2) is a bug or not (and, if so, how to resolve it in a backend-agnostic manner) can be figured out later (perhaps in a separate issue).

@agronholm
Copy link
Owner Author

Ah, my intention was to create a memory object stream with a buffer. Then the assert makes sense, yes?

@agronholm
Copy link
Owner Author

I've adjusted the snippet accordingly.

@agronholm
Copy link
Owner Author

Compare to the use of open_memory_channel():

import anyio
from anyio import (
    CancelScope, create_task_group,
    wait_all_tasks_blocked,
)
from trio import open_memory_channel

async def receiver(receive, task_status):
    with CancelScope() as cancel_scope:
        task_status.started(cancel_scope)
        await receive.receive()

async def main():
    send, receive = open_memory_channel(1)
    with send, receive:
        async with create_task_group() as tg:
            cancel_scope = await tg.start(receiver, receive)
            await wait_all_tasks_blocked()
            cancel_scope.cancel()
            send.send_nowait("item")

        assert receive.receive_nowait() == "item"

anyio.run(main, backend="trio")

@gschaffner
Copy link
Collaborator

Ah, my intention was to create a memory object stream with a buffer. Then the assert makes sense, yes?

yes, agreed, assuming that your intention is to test for (2). but only (1) is needed to ensure that the stream isn't dropping items. (2) is a stricter criterion.

it's not clear to me whether (2) is actually a bug or problematic. what did you think about

question: is this behavior of Trio intentional/documented? i.e., does Trio document that (ii) will happen and (i) will not? or does Trio leave it undocumented, and it is just an implementation detail whether open_memory_channel is using Event or wait_task_rescheduled under the hood?

@gschaffner
Copy link
Collaborator

gschaffner commented May 12, 2024

i have just written some more tests that are also closely related, plus a closely related fix. (these tests are currently in branch #729 (2b09585).) here is a summary of all of these variations of #146 that that branch is currently testing for (I think that this list makes a good summary of the relationships between these issues):

  • misbehavior when MemoryObjectReceiveStream.receive() is cancelled. the misbehavior is in the form of dropped items, i.e. neither the attempted sender nor the attempted receiver think they are responsible for the item.

    this is #146. it has two cases:

    • test_cancel_during_receive_after_send_nowait: this has been passing since 2020 (although it no longer supports native cancellations since #595, as mentioned in #595 (comment)).

    • test_cancel_during_receive_before_send_nowait: this is #728 (sense (1)). it had been passing since 2020 but started failing in #595.

  • misbehavior when MemoryObjectSendStream.send() is cancelled. the misbehavior is in the form of simultaneously-sent-and-not-sent items, i.e. both the attempted sender and the attempted receiver think they are responsible for the item.

    this issue was mentioned in #536:

    MemoryObjectSendStream.send(item) can raise cancelled after item has been delivered to a receiver! this MemoryObjectSendStream.send issue is just the send_event.wait() case to #146's receive_event.wait() case, i believe

    this also has two cases. they are analogous to #146's cases:

    • test_cancel_during_send_after_receive_nowait: this has been passing since #595.

    • test_cancel_during_send_before_receive_nowait: this is analogous to #728 (sense (1)). it is failing on master.

  • misbehavior when Event.wait() is cancelled: there are also two cases here:

    • test_event_wait_before_set_before_cancel: this was #536; it was fixed by #595 (although it remains broken with native cancellations).

    • test_event_wait_before_cancel_before_set: this has been behaving correctly for a while.

      this is closely related to #728 (sense (2)), because this Event behavior is the reason why the regression #728 (sense (2)) happened. specifically, this Event behavior is why it's not possible to implement Trio's memory channel cancellation semantics (the behavior of its abort_fns) by using purely the cancellation semantics of Event.

agronholm added a commit that referenced this issue May 14, 2024
Check if the receiving task has a pending cancellation before sending an item.

Fixes #728.
@agronholm agronholm linked a pull request May 14, 2024 that will close this issue
3 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants