Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"RuntimeError: is bound to a different event loop" when using prefect client from sync flow in kubernetes #13181

Open
4 tasks done
carlosjourdan opened this issue Apr 30, 2024 · 2 comments
Labels
bug Something isn't working

Comments

@carlosjourdan
Copy link

carlosjourdan commented Apr 30, 2024

First check

  • I added a descriptive title to this issue.
  • I used the GitHub search to find a similar issue and didn't find it.
  • I searched the Prefect documentation for this issue.
  • I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

When tying to run prefect client async methods from a sync context on kubernetes, I get a "RuntimeError: is bound to a different event loop" error. I've tried using prefect.utilities.asyncutils.run_sync, asyncio.run and asyncio.get_event_loop().run_util_complete.

The code works fine locally, and calling a simple async on the k8s deployment also works. I'm only able to reproduce the error in the k8s deployment with the prefect.client methods.

Reproduction

from prefect.utilities.asyncutils import run_sync
from prefect.context import get_run_context
from prefect import flow
from prefect.deployments import Deployment
import asyncio

@flow()
def flow2():
    #This method works fine locally, but crashes on k8s
    context = get_run_context()
    prefect_client = context.client
    flow_run_id = context.flow_run.id
    
    flow_run_state = run_sync(prefect_client.read_flow_run(flow_run_id))
    print(flow_run_state)


@flow()
def flow1():
    #This works fine locally and on k8s
    run_sync(asyncio.sleep(10))

if __name__ == "__main__":
    Deployment.build_from_flow(flow=flow1, name="flow1", path='/source', work_pool_name="my-workpool", job_variables={"image": "my-image"}).apply()
    Deployment.build_from_flow(flow=flow2, name="flow2", path='/source', work_pool_name="my-workpool", job_variables={"image": "my-image"}).apply()

The image basically builds from python:3.11-buster, installs some packages and copies the source files to the /source dir.

This is the result of a pip freeze:

aiosqlite==0.20.0
alembic==1.13.1
altair==4.2.2
annotated-types==0.6.0
anyio==3.7.1
apprise==1.7.6
argon2-cffi==23.1.0
argon2-cffi-bindings==21.2.0
arrow==1.3.0
asgi-lifespan==2.1.0
asn1crypto==1.5.1
asttokens==2.4.1
async-timeout==4.0.3
asyncpg==0.29.0
attrs==23.2.0
azure-common==1.1.28
azure-core==1.30.1
azure-identity==1.14.1
azure-keyvault-secrets==4.7.0
beautifulsoup4==4.12.3
bleach==6.1.0
boto3==1.34.95
botocore==1.34.95
cachetools==5.3.3
certifi==2024.2.2
cffi==1.16.0
charset-normalizer==3.3.2
click==8.1.7
cloudpickle==3.0.0
colorama==0.4.6
comm==0.2.2
coolname==2.2.0
croniter==2.0.5
cryptography==42.0.5
dateparser==1.2.0
debugpy==1.8.1
decorator==5.1.1
defusedxml==0.7.1
dependency-injector==4.41.0
dnspython==2.6.1
docker==6.1.3
elastic-transport==8.1.2
elasticsearch==8.2.3
email-validator==2.1.1
entrypoints==0.4
executing==2.0.1
fastjsonschema==2.19.1
filelock==3.14.0
fqdn==1.5.1
fsspec==2024.3.1
fuzzywuzzy==0.18.0
geographiclib==2.0
geopy==2.4.1
google-auth==2.29.0
graphviz==0.20.3
great-expectations==0.15.32
greenlet==3.0.3
griffe==0.44.0
h11==0.14.0
h2==4.1.0
hpack==4.0.0
httpcore==1.0.5
httpx==0.27.0
humanize==4.9.0
hyperframe==6.0.1
idna==3.7
ijson==3.2.3
importlib-metadata==7.1.0
importlib-resources==6.1.3
ipykernel==6.29.4
ipython==8.24.0
ipython-genutils==0.2.0
ipywidgets==8.1.2
isodate==0.6.1
isoduration==20.11.0
itsdangerous==2.2.0
jedi==0.19.1
jinja2==3.1.3
jinja2-humanize-extension==0.4.0
jmespath==1.0.1
jsonpatch==1.33
jsonpointer==2.4
jsonschema==4.7.2
jupyter-client==7.4.9
jupyter-core==5.7.2
jupyter-events==0.6.3
jupyter-server==2.10.0
jupyter-server-terminals==0.5.3
jupyterlab-pygments==0.3.0
jupyterlab-widgets==3.0.10
kubernetes==29.0.0
lazy-import==0.2.2
mailbits==0.2.1
makefun==1.15.2
mako==1.3.3
markdown==3.6
markdown-it-py==3.0.0
markupsafe==2.1.5
marshmallow==3.21.1
matplotlib-inline==0.1.7
mdurl==0.1.2
mistune==3.0.2
msal==1.23.0
msal-extensions==1.1.0
nbclassic==1.0.0
nbclient==0.10.0
nbconvert==7.16.4
nbformat==5.10.4
nest-asyncio==1.6.0
notebook==6.5.6
notebook-shim==0.2.4
nslookup==1.7.0
numpy==1.24.4
oauthlib==3.2.2
orjson==3.10.1
overrides==7.7.0
packaging==24.0
pandas==2.1.4
pandocfilters==1.5.1
parso==0.8.4
pathspec==0.12.1
pendulum==2.1.2
platformdirs==4.2.1
portalocker==2.8.2
prefect==2.18.1
prometheus-client==0.20.0
prompt-toolkit==3.0.43
psutil==5.9.8
pure-eval==0.2.2
pyasn1==0.6.0
pyasn1-modules==0.4.0
pycparser==2.22
pydantic==2.7.1
pydantic-core==2.18.2
pygments==2.17.2
pyjwt==2.8.0
pyopenssl==20.0.1
pyparsing==3.1.2
pypi-simple==1.5.0
pypyodbc==1.3.6
pyrsistent==0.20.0
pysmbclient==0.1.5
pyspnego==0.10.2
python-dateutil==2.9.0.post0
python-dotenv==1.0.1
python-json-logger==2.0.7
python-multipart==0.0.9
python-slugify==8.0.4
pytz==2024.1
pytzdata==2020.1
pywin32==306
pywinpty==2.0.13
pyyaml==6.0.1
pyzmq==24.0.1
readchar==4.0.6
regex==2024.4.28
requests==2.31.0
requests-oauthlib==2.0.0
rfc3339-validator==0.1.4
rfc3986-validator==0.1.1
rich==13.7.1
rsa==4.9
ruamel-yaml==0.17.17
s3transfer==0.10.1
scipy==1.13.0
selenium==3.141.0
send2trash==1.8.3
setuptools==69.5.1
shellingham==1.5.4
six==1.14.0
smbprotocol==1.6.2
sniffio==1.3.1
snowflake-connector-python==3.10.0
snowflake-sqlalchemy==1.5.3
sortedcontainers==2.4.0
soupsieve==2.5
sqlalchemy==1.4.52
sspilib==0.1.0
stack-data==0.6.3
termcolor==2.0.1
terminado==0.18.1
text-unidecode==1.3
tinycss2==1.3.0
toml==0.10.2
tomlkit==0.12.4
toolz==0.12.1
tornado==6.4
tqdm==4.66.2
traitlets==5.14.3
truststore==0.7.0
typer==0.12.3
types-python-dateutil==2.9.0.20240316
typing-extensions==4.11.0
tzdata==2024.1
tzlocal==5.2
ujson==5.9.0
unidecode==1.3.8
uri-template==1.3.0
urllib3==1.26.18
uvicorn==0.28.1
vertica-python==1.0.5
wcwidth==0.2.13
webcolors==1.13
webdriver-manager==4.0.1
webencodings==0.5.1
websocket-client==1.8.0
websockets==12.0
widgetsnbextension==4.0.10
zipp==3.18.1


### Error

```python3
Encountered exception during execution:
Traceback (most recent call last):
  File "/venv/lib/python3.11/site-packages/prefect/engine.py", line 875, in orchestrate_flow_run
    result = await flow_call.aresult()
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/flows/my_flows.py", line 16, in flow2
    flow_run_state = run_sync(prefect_client.read_flow_run(flow_run_id))
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 115, in run_sync
    return asyncio.run(coroutine)
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/client/orchestration.py", line 2007, in read_flow_run
    response = await self._client.get(f"/flow_runs/{flow_run_id}")
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpx/_client.py", line 1801, in get
    return await self.request(
           ^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpx/_client.py", line 1574, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/client/base.py", line 325, in send
    response = await self._send_with_retry(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/client/base.py", line 249, in _send_with_retry
    response = await send(request, *send_args, **send_kwargs)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpx/_client.py", line 1661, in send
    response = await self._send_handling_auth(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpx/_client.py", line 1689, in _send_handling_auth
    response = await self._send_handling_redirects(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpx/_client.py", line 1726, in _send_handling_redirects
    response = await self._send_single_request(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpx/_client.py", line 1763, in _send_single_request
    response = await transport.handle_async_request(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpx/_transports/default.py", line 373, in handle_async_request
    resp = await self._pool.handle_async_request(req)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 216, in handle_async_request
    raise exc from None
  File "/venv/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 196, in handle_async_request
    response = await connection.handle_async_request(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpcore/_async/connection.py", line 101, in handle_async_request
    return await self._connection.handle_async_request(request)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpcore/_async/http2.py", line 185, in handle_async_request
    raise exc
  File "/venv/lib/python3.11/site-packages/httpcore/_async/http2.py", line 148, in handle_async_request
    status, headers = await self._receive_response(
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpcore/_async/http2.py", line 292, in _receive_response
    event = await self._receive_stream_event(request, stream_id)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpcore/_async/http2.py", line 333, in _receive_stream_event
    await self._receive_events(request, stream_id)
  File "/venv/lib/python3.11/site-packages/httpcore/_async/http2.py", line 361, in _receive_events
    events = await self._read_incoming_data(request)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpcore/_async/http2.py", line 452, in _read_incoming_data
    raise exc
  File "/venv/lib/python3.11/site-packages/httpcore/_async/http2.py", line 438, in _read_incoming_data
    data = await self._network_stream.read(self.READ_NUM_BYTES, timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpcore/_backends/anyio.py", line 35, in read
    return await self._stream.receive(max_bytes=max_bytes)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/anyio/streams/tls.py", line 196, in receive
    data = await self._call_sslobject_method(self._ssl_object.read, max_bytes)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/anyio/streams/tls.py", line 138, in _call_sslobject_method
    data = await self.transport_stream.receive()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 1203, in receive
    await self._protocol.read_event.wait()
  File "/usr/local/lib/python3.11/asyncio/locks.py", line 210, in wait
    fut = self._get_loop().create_future()
          ^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/asyncio/mixins.py", line 20, in _get_loop
    raise RuntimeError(f'{self!r} is bound to a different event loop')
RuntimeError:  is bound to a different event loop

Versions

# Locally

Version:             2.18.1
API version:         0.8.4
Python version:      3.11.8
Git commit:          8cff545a
Built:               Thu, Apr 25, 2024 3:40 PM
OS/Arch:             win32/AMD64
Profile:             default
Server type:         cloud

# On k8s

Version:             2.18.1
API version:         0.8.4
Python version:      3.11.4
Git commit:          8cff545a
Built:               Thu, Apr 25, 2024 3:40 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         cloud

Additional context

No response

@carlosjourdan carlosjourdan added bug Something isn't working needs:triage Needs feedback from the Prefect product team labels Apr 30, 2024
@zzstoatzz
Copy link
Contributor

zzstoatzz commented Apr 30, 2024

hi @carlosjourdan - thanks for the issue! there are a couple things about this example code that you could try updating

  • i would use get_client() to get a client instead of using the context's client. you should have no client lifetime issues like the one you seem to be hitting here
  • i would make the above code async, since you're already using asyncio.sleep and async client methods. the run_sync is adding complexity that may not be warranted here

@zzstoatzz zzstoatzz removed the needs:triage Needs feedback from the Prefect product team label May 1, 2024
@carlosjourdan
Copy link
Author

Thanks @zzstoatzz.

To give some more context, I'm building a library that will be used by researchers at a financial institution. All their code is sync, hence the need to avoid exposing any async functions. I introduced the asyncio.sleep in the example just to show that run_sync works fine with it, but fails with the Prefect client.

Following your suggestion to use get_client(), I was able to make the code work, but I still see some things that could be addressed from this issue:

  • It's not clear from the docs why/when I shouldn't use RunContext.client attribute to call the backend. In my actual code, I was already calling get_run_context() to access some other information, so it only seemed natural to use the client attribute contained in it
  • get_client() has a sync_client property that would seem ideal for my use case, but the returned sync client exposes far less methods than its async counterpart
  • Even with get client(), if I executed several api calls, each one wrapped on its own run_sync call, I'd still get some Event loop is closed errors. The solution was to create one single async method that calls get_client(), awaits each api call, and then wrap that method with a single run_sync call.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants