Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

trio.TrioInternalError when sync func run with trio.to_thread.run_sync returns #2878

Open
jpatel624 opened this issue Nov 14, 2023 · 10 comments

Comments

@jpatel624
Copy link

Version: 0.23.1
Just trying out trio with some test code and got this:

python-BaseException
Traceback (most recent call last):
  File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_run.py", line 2574, in unrolled_run
    runner.task_exited(task, final_outcome)
  File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_run.py", line 1800, in task_exited
    task._parent_nursery._child_finished(task, outcome)
  File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_run.py", line 1069, in _child_finished
    self._add_exc(outcome.error)
  File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_run.py", line 1057, in _add_exc
    self.cancel_scope.cancel()
  File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_ki.py", line 180, in wrapper
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_run.py", line 773, in cancel
    self._cancel_status.recalculate()
  File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_run.py", line 431, in recalculate
    task._attempt_delivery_of_any_pending_cancel()
  File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_run.py", line 1411, in _attempt_delivery_of_any_pending_cancel
    self._attempt_abort(raise_cancel)
  File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_run.py", line 1393, in _attempt_abort
    success = self._abort_func(raise_cancel)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_channel.py", line 206, in abort_fn
    self._tasks.remove(task)
KeyError: <Task '__main__.async_main' at 0x259b1770a40>
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
  File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_run.py", line 2240, in run
    timeout = gen.send(next_send)
              ^^^^^^^^^^^^^^^^^^^
  File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_run.py", line 2631, in unrolled_run
    raise TrioInternalError("internal error in Trio - please file a bug!") from exc
trio.TrioInternalError: internal error in Trio - please file a bug!

Here's the test code that caused it:

import trio
import time

start = 0

class dummy_driver:
    def get_handler(self, msg):
        if msg == 'step':
            return self.run_step

    def run_step(self, ext_comms, send_resp):
        time.sleep(2)
        # trio.sleep(2)
        print(f"handler(): sending response, time = {int(time.time())-start}")
        send_resp(ext_comms, 'ok')

class dummy_io:
    def get(self):
        time.sleep(2)
        return 'step'

    def put(self, resp):
        time.sleep(1)
        print(f"received resp {resp}")


# reads msg from the comms channel from TEAL and sends it back to main driver thread
def read_msg(ext_comms, msg_in: trio.MemoryReceiveChannel, msg_out: trio.MemorySendChannel):
    while True:
        msg = ext_comms.get()    # blocks
        trio.from_thread.run(msg_out.send, msg)
        try:
            die = msg_in.receive_nowait()
            if die:
                print(f"read_msg: terminating, time = {int(time.time())-start}")
                return
        except trio.WouldBlock as e:
            pass


def send_resp(ext_comms, resp):
    if resp:
        ext_comms.put(resp)


async def async_main():
    driver = dummy_driver()
    ext_comms = dummy_io()

    send, receive = trio.open_memory_channel(0)
    send2, receive2 = trio.open_memory_channel(0)

    async with trio.open_nursery() as nursery:
        count = 0
        nursery.start_soon(trio.to_thread.run_sync, read_msg, ext_comms, receive2, send)
        while True:
            msg = await receive.receive()
            fn = driver.get_handler(msg)
            print(f"main: assign item to handler, time = {int(time.time())-start}")
            nursery.start_soon(trio.to_thread.run_sync, fn, ext_comms, send_resp)
            count += 1
            if count == 5:
                print(f"main: terminating, time = {int(time.time())-start}")
                await send2.send(True)

if __name__ == "__main__":
    start = int(time.time())
    trio.run(async_main)
    print(f"main: terminated, time = {int(time.time())-start}")

This happened when count reached 5 and read_msg() returned.

@jpatel624
Copy link
Author

Python 3.11.5

@TeamSpen210
Copy link
Contributor

I'm not certain, but I suspect your issue might be with the msg_in.receive_nowait() call in read_msg(). Trio's functions in general aren't thread-safe - that allows them to be fast, but it means you'll need to use trio.from_thread.run_sync there.

@jpatel624
Copy link
Author

jpatel624 commented Nov 14, 2023

It didn't matter, got the same error. Here's additional exceptions that I forgot to post earlier:

received resp ok, time = 11
Exception while delivering result of thread
Traceback (most recent call last):
  File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_thread_cache.py", line 169, in _handle_job
    deliver(result)
  File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_threads.py", line 377, in deliver_worker_fn_result
    current_trio_token.run_sync_soon(report_back_in_trio_thread_fn, result)
  File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_entry_queue.py", line 211, in run_sync_soon
    self._reentry_queue.run_sync_soon(sync_fn, *args, idempotent=idempotent)
  File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_entry_queue.py", line 139, in run_sync_soon
    self.wakeup.wakeup_thread_and_signal_safe()
  File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_wakeup_socketpair.py", line 35, in wakeup_thread_and_signal_safe
    self.write_sock.send(b"\x00")
OSError: [WinError 10038] An operation was attempted on something that is not a socket
handler(): sending response, time = 12
received resp ok, time = 13
Exception while delivering result of thread
Traceback (most recent call last):
  File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_thread_cache.py", line 169, in _handle_job
    deliver(result)
  File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_threads.py", line 377, in deliver_worker_fn_result
    current_trio_token.run_sync_soon(report_back_in_trio_thread_fn, result)
  File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_entry_queue.py", line 211, in run_sync_soon
    self._reentry_queue.run_sync_soon(sync_fn, *args, idempotent=idempotent)
  File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_entry_queue.py", line 139, in run_sync_soon
    self.wakeup.wakeup_thread_and_signal_safe()
  File "C:\cygwin64\home\JayeshPatel\SIEGE\.myenv\Lib\site-packages\trio\_core\_wakeup_socketpair.py", line 35, in wakeup_thread_and_signal_safe
    self.write_sock.send(b"\x00")
OSError: [WinError 10038] An operation was attempted on something that is not a socket

Hope it helps.

@oremanj
Copy link
Member

oremanj commented Nov 14, 2023

I can reproduce your failure with the original code you posted, but I don't see it anymore after the fix that @TeamSpen210 suggested. Specifically, I changed die = msg_in.receive_nowait() to die = trio.from_thread.run_sync(msg_in.receive_nowait). I also added a break after await send2.send(True), because otherwise it doesn't terminate (it will loop around and block in await receive.receive() because the read_msg thread has exited so no one is feeding that channel anymore).

@jpatel624
Copy link
Author

I can confirm that fix suggested by @TeamSpen210 works on Ubuntu 22.04 running Python 3.10.12 and save version of trio, but not on Windows 11 where it was originally seen.

@TeamSpen210
Copy link
Contributor

One other thing you could do is simplify the code a bit by getting rid of send2/receive2. You can close() the receive side of the channel, which will make the send side immediately raise an exception. Also read_msg() might be better if it was an async function itself, just use to_thread.run_sync() with ext_comms.get(). Then you don't have to worry about any of this.

@A5rocks
Copy link
Contributor

A5rocks commented Nov 15, 2023

I personally cannot reproduce after applying @TeamSpen210's fix on Windows 11, running trio as of dfa5576, and CPython 3.11.4.

At least I assume it's supposed to hang after this output?:

main: assign item to handler, time = 2
main: assign item to handler, time = 4
handler(): sending response, time = 4
received resp ok
handler(): sending response, time = 6
main: assign item to handler, time = 6
received resp ok
handler(): sending response, time = 8
main: assign item to handler, time = 8
received resp ok
handler(): sending response, time = 10
main: assign item to handler, time = 10
main: terminating, time = 10
read_msg: terminating, time = 10
received resp ok
handler(): sending response, time = 12
received resp ok

If so! Then maybe still breaking after that fix is a more machine specific thing?

As for the original issue, I haven't looked at the source code at all but maybe there's some cheap way to check for concurrency and error in most cases? (without a lock, obviously...)

@richardsheridan
Copy link
Contributor

Incidentally the thread-unsafe operation you did was checking for a termination message, which is no longer really required in trio>=0.23 if you are using trio.from_thread.run regularly:

import trio
import time

start = 0

class dummy_driver:
    def get_handler(self, msg):
        if msg == 'step':
            return self.run_step

    def run_step(self, ext_comms, send_resp):
        time.sleep(2)
        # trio.sleep(2)
        print(f"handler(): sending response, time = {int(time.time())-start}")
        send_resp(ext_comms, 'ok')

class dummy_io:
    def get(self):
        time.sleep(2)
        return 'step'

    def put(self, resp):
        time.sleep(1)
        print(f"received resp {resp}")


# reads msg from the comms channel from TEAL and sends it back to main driver thread
def read_msg(ext_comms, msg_in: trio.MemoryReceiveChannel, msg_out: trio.MemorySendChannel):
    try:
        while True:
            msg = ext_comms.get()    # blocks
            # This line will notice cancellation and end the thread
            trio.from_thread.run(msg_out.send, msg)
    finally:
        print(f"read_msg: terminating, time = {int(time.time())-start}")


def send_resp(ext_comms, resp):
    if resp:
        ext_comms.put(resp)


async def async_main():
    driver = dummy_driver()
    ext_comms = dummy_io()

    send, receive = trio.open_memory_channel(0)
    send2, receive2 = trio.open_memory_channel(0)

    async with trio.open_nursery() as nursery:
        count = 0
        nursery.start_soon(trio.to_thread.run_sync, read_msg, ext_comms, receive2, send)
        while True:
            msg = await receive.receive()
            fn = driver.get_handler(msg)
            print(f"main: assign item to handler, time = {int(time.time())-start}")
            nursery.start_soon(trio.to_thread.run_sync, fn, ext_comms, send_resp)
            count += 1
            if count == 5:
                print(f"main: terminating, time = {int(time.time())-start}")
                # just cancel everything to exit
                nursery.cancel_scope.cancel()

if __name__ == "__main__":
    start = int(time.time())
    trio.run(async_main)
    print(f"main: terminated, time = {int(time.time())-start}")

@oremanj
Copy link
Member

oremanj commented Nov 15, 2023

At least I assume it's supposed to hang after this output?:

In my tests it did that unless I added a break after the operation in async_main that sends the termination request. I assume that's a bug in the reduced example that wasn't noticed because execution didn't reach that point before.

As for the original issue, I haven't looked at the source code at all but maybe there's some cheap way to check for concurrency and error in most cases? (without a lock, obviously...)

I think it already would fail when receive_nowait calls trio.lowlevel.reschedule(), because reschedule won't be able to find the runner via GLOBAL_RUN_CONTEXT, because the context is thread-local. It's not obvious to me what caused the TrioInternalError here and whether we could guard against that.

@jpatel624
Copy link
Author

Thanks @TeamSpen210. That solves the problem for me. Please feel free to close or let me know if you want me to close the issue if you don't need it to resolve the trio.TrioInternalError.

One other thing you could do is simplify the code a bit by getting rid of send2/receive2. You can close() the receive side of the channel, which will make the send side immediately raise an exception. Also read_msg() might be better if it was an async function itself, just use to_thread.run_sync() with ext_comms.get(). Then you don't have to worry about any of this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants