You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I used the GitHub search to find a similar issue and didn't find it.
I searched the Prefect documentation for this issue.
I checked that this issue is related to Prefect and not one of its dependencies.
Bug summary
When tying to run prefect client async methods from a sync context on kubernetes, I get a "RuntimeError: is bound to a different event loop" error. I've tried using prefect.utilities.asyncutils.run_sync, asyncio.run and asyncio.get_event_loop().run_util_complete.
The code works fine locally, and calling a simple async on the k8s deployment also works. I'm only able to reproduce the error in the k8s deployment with the prefect.client methods.
Reproduction
fromprefect.utilities.asyncutilsimportrun_syncfromprefect.contextimportget_run_contextfromprefectimportflowfromprefect.deploymentsimportDeploymentimportasyncio@flow()defflow2():
#This method works fine locally, but crashes on k8scontext=get_run_context()
prefect_client=context.clientflow_run_id=context.flow_run.idflow_run_state=run_sync(prefect_client.read_flow_run(flow_run_id))
print(flow_run_state)
@flow()defflow1():
#This works fine locally and on k8srun_sync(asyncio.sleep(10))
if__name__=="__main__":
Deployment.build_from_flow(flow=flow1, name="flow1", path='/source', work_pool_name="my-workpool", job_variables={"image": "my-image"}).apply()
Deployment.build_from_flow(flow=flow2, name="flow2", path='/source', work_pool_name="my-workpool", job_variables={"image": "my-image"}).apply()
The image basically builds from python:3.11-buster, installs some packages and copies the source files to the /source dir.
### Error
```python3
Encountered exception during execution:
Traceback (most recent call last):
File "/venv/lib/python3.11/site-packages/prefect/engine.py", line 875, in orchestrate_flow_run
result = await flow_call.aresult()
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
return await asyncio.wrap_future(self.future)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/flows/my_flows.py", line 16, in flow2
flow_run_state = run_sync(prefect_client.read_flow_run(flow_run_id))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 115, in run_sync
return asyncio.run(coroutine)
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/asyncio/runners.py", line 190, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/prefect/client/orchestration.py", line 2007, in read_flow_run
response = await self._client.get(f"/flow_runs/{flow_run_id}")
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/httpx/_client.py", line 1801, in get
return await self.request(
^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/httpx/_client.py", line 1574, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/prefect/client/base.py", line 325, in send
response = await self._send_with_retry(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/prefect/client/base.py", line 249, in _send_with_retry
response = await send(request, *send_args, **send_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/httpx/_client.py", line 1661, in send
response = await self._send_handling_auth(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/httpx/_client.py", line 1689, in _send_handling_auth
response = await self._send_handling_redirects(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/httpx/_client.py", line 1726, in _send_handling_redirects
response = await self._send_single_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/httpx/_client.py", line 1763, in _send_single_request
response = await transport.handle_async_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/httpx/_transports/default.py", line 373, in handle_async_request
resp = await self._pool.handle_async_request(req)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 216, in handle_async_request
raise exc from None
File "/venv/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 196, in handle_async_request
response = await connection.handle_async_request(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/httpcore/_async/connection.py", line 101, in handle_async_request
return await self._connection.handle_async_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/httpcore/_async/http2.py", line 185, in handle_async_request
raise exc
File "/venv/lib/python3.11/site-packages/httpcore/_async/http2.py", line 148, in handle_async_request
status, headers = await self._receive_response(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/httpcore/_async/http2.py", line 292, in _receive_response
event = await self._receive_stream_event(request, stream_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/httpcore/_async/http2.py", line 333, in _receive_stream_event
await self._receive_events(request, stream_id)
File "/venv/lib/python3.11/site-packages/httpcore/_async/http2.py", line 361, in _receive_events
events = await self._read_incoming_data(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/httpcore/_async/http2.py", line 452, in _read_incoming_data
raise exc
File "/venv/lib/python3.11/site-packages/httpcore/_async/http2.py", line 438, in _read_incoming_data
data = await self._network_stream.read(self.READ_NUM_BYTES, timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/httpcore/_backends/anyio.py", line 35, in read
return await self._stream.receive(max_bytes=max_bytes)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/anyio/streams/tls.py", line 196, in receive
data = await self._call_sslobject_method(self._ssl_object.read, max_bytes)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/anyio/streams/tls.py", line 138, in _call_sslobject_method
data = await self.transport_stream.receive()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 1203, in receive
await self._protocol.read_event.wait()
File "/usr/local/lib/python3.11/asyncio/locks.py", line 210, in wait
fut = self._get_loop().create_future()
^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/asyncio/mixins.py", line 20, in _get_loop
raise RuntimeError(f'{self!r} is bound to a different event loop')
RuntimeError: is bound to a different event loop
hi @carlosjourdan - thanks for the issue! there are a couple things about this example code that you could try updating
i would use get_client() to get a client instead of using the context's client. you should have no client lifetime issues like the one you seem to be hitting here
i would make the above code async, since you're already using asyncio.sleep and async client methods. the run_sync is adding complexity that may not be warranted here
To give some more context, I'm building a library that will be used by researchers at a financial institution. All their code is sync, hence the need to avoid exposing any async functions. I introduced the asyncio.sleep in the example just to show that run_sync works fine with it, but fails with the Prefect client.
Following your suggestion to use get_client(), I was able to make the code work, but I still see some things that could be addressed from this issue:
It's not clear from the docs why/when I shouldn't use RunContext.client attribute to call the backend. In my actual code, I was already calling get_run_context() to access some other information, so it only seemed natural to use the client attribute contained in it
get_client() has a sync_client property that would seem ideal for my use case, but the returned sync client exposes far less methods than its async counterpart
Even with get client(), if I executed several api calls, each one wrapped on its own run_sync call, I'd still get some Event loop is closed errors. The solution was to create one single async method that calls get_client(), awaits each api call, and then wrap that method with a single run_sync call.
First check
Bug summary
When tying to run prefect client async methods from a sync context on kubernetes, I get a "RuntimeError: is bound to a different event loop" error. I've tried using
prefect.utilities.asyncutils.run_sync
,asyncio.run
andasyncio.get_event_loop().run_util_complete
.The code works fine locally, and calling a simple async on the k8s deployment also works. I'm only able to reproduce the error in the k8s deployment with the prefect.client methods.
Reproduction
The image basically builds from
python:3.11-buster
, installs some packages and copies the source files to the/source
dir.This is the result of a pip freeze:
Versions
Additional context
No response
The text was updated successfully, but these errors were encountered: