Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decorator like @subdag that works with async functions #903

Closed
kpounder opened this issue May 13, 2024 · 7 comments · Fixed by #905
Closed

Decorator like @subdag that works with async functions #903

kpounder opened this issue May 13, 2024 · 7 comments · Fixed by #905

Comments

@kpounder
Copy link

Is your feature request related to a problem? Please describe.
The workflow I'm currently considering migrating to Hamilton relies heavily on async functions and is a good use case for using something like @subdag to reuse certain code at different parts in the overall DAG. The problem is that @subdag does not work with sub-DAGs that contain async functions.

Describe the solution you'd like
Maybe a @subdag_async that works like @subdag

Describe alternatives you've considered
Trying now to create an async driver within the main driver to execute the sub-DAG async -- but would still prefer @subdag_async for developer UX, better overall DAG visualization, etc.

Additional context
N/A

@elijahbenizzy
Copy link
Collaborator

Is your feature request related to a problem? Please describe. The workflow I'm currently considering migrating to Hamilton relies heavily on async functions and is a good use case for using something like @subdag to reuse certain code at different parts in the overall DAG. The problem is that @subdag does not work with sub-DAGs that contain async functions.

Describe the solution you'd like Maybe a @subdag_async that works like @subdag

Describe alternatives you've considered Trying now to create an async driver within the main driver to execute the sub-DAG async -- but would still prefer @subdag_async for developer UX, better overall DAG visualization, etc.

Additional context N/A

Hey! I think we should be able to just get subdag to work. That said, do you have a minimum reproduction? Curious if you're seeing the same issue as I am, or a different one:

from hamilton.function_modifiers import subdag
from hamilton.experimental.h_async import AsyncDriver
from hamilton.ad_hoc_utils import create_temporary_module

async def foo() -> int:
    return 1

@subdag(
    foo
)
async def subdag_fn(foo: int) -> int:
    return foo

mod = create_temporary_module(subdag_fn)
dr = AsyncDriver({}, mod)
await dr.execute(["subdag_fn"])
image

Basically this is not realizing it is async, and thus not working.

@skrawcz
Copy link
Collaborator

skrawcz commented May 13, 2024

@elijahbenizzy yep sorry didn't add context, chatted offline with @kpounder. But yes subdag assumes everything is sync -- haven't scoped what's required.

If you have an actual DAG the error is somewhere in subdag because it gets a co-routine result, not what it was actually expecting.

@skrawcz
Copy link
Collaborator

skrawcz commented May 13, 2024

Using the async fast api example:

# async_subdag.py
import async_module
import fastapi
from hamilton.experimental import h_async
from hamilton import base
from hamilton.function_modifiers import subdag, source

@subdag(
    async_module,
    inputs={"request": source("request")},
)
async def my_subdag_with_decorator(pipeline: dict) -> dict:
    return pipeline
import async_subdag
dr = h_async.AsyncDriver({}, async_subdag, result_builder=base.DictResult())

@app.post("/execute")
async def call(request: fastapi.Request) -> dict:
    """Handler for pipeline call"""
    input_data = {"request": request}
    result = await dr.execute(["my_subdag_with_decorator"], inputs=input_data)
    return result

Stacktrace:

ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/uvicorn/protocols/http/h11_impl.py", line 404, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/uvicorn/middleware/proxy_headers.py", line 84, in __call__
    return await self.app(scope, receive, send)
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/fastapi/applications.py", line 1054, in __call__
    await super().__call__(scope, receive, send)
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/applications.py", line 123, in __call__
    await self.middleware_stack(scope, receive, send)
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/middleware/errors.py", line 186, in __call__
    raise exc
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/middleware/errors.py", line 164, in __call__
    await self.app(scope, receive, _send)
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/middleware/exceptions.py", line 62, in __call__
    await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
    raise exc
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    await app(scope, receive, sender)
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/routing.py", line 762, in __call__
    await self.middleware_stack(scope, receive, send)
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/routing.py", line 782, in app
    await route.handle(scope, receive, send)
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/routing.py", line 297, in handle
    await self.app(scope, receive, send)
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/routing.py", line 77, in app
    await wrap_app_handling_exceptions(app, request)(scope, receive, send)
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
    raise exc
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    await app(scope, receive, sender)
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/routing.py", line 72, in app
    response = await func(request)
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/fastapi/routing.py", line 299, in app
    raise e
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/fastapi/routing.py", line 294, in app
    raw_response = await run_endpoint_function(
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/fastapi/routing.py", line 191, in run_endpoint_function
    return await dependant.call(**values)
  File "hamilton/examples/async/fastapi_example.py", line 21, in call
    result = await dr.execute(["my_subdag_with_decorator"], inputs=input_data)
  File "hamilton/hamilton/experimental/h_async.py", line 173, in execute
    raise e
  File "hamilton/hamilton/experimental/h_async.py", line 164, in execute
    outputs = await self.raw_execute(final_vars, overrides, display_graph, inputs=inputs)
  File "hamilton/hamilton/experimental/h_async.py", line 136, in raw_execute
    return await await_dict_of_tasks(task_dict)
  File "hamilton/hamilton/experimental/h_async.py", line 20, in await_dict_of_tasks
    coroutines_gathered = await asyncio.gather(*coroutines)
  File "hamilton/hamilton/experimental/h_async.py", line 33, in process_value
    return await val
  File "hamilton/hamilton/experimental/h_async.py", line 68, in new_fn
    fn_kwargs = await await_dict_of_tasks(task_dict)
  File "hamilton/hamilton/experimental/h_async.py", line 20, in await_dict_of_tasks
    coroutines_gathered = await asyncio.gather(*coroutines)
  File "hamilton/hamilton/experimental/h_async.py", line 33, in process_value
    return await val
  File "hamilton/hamilton/experimental/h_async.py", line 68, in new_fn
    fn_kwargs = await await_dict_of_tasks(task_dict)
  File "hamilton/hamilton/experimental/h_async.py", line 20, in await_dict_of_tasks
    coroutines_gathered = await asyncio.gather(*coroutines)
  File "hamilton/hamilton/experimental/h_async.py", line 33, in process_value
    return await val
  File "hamilton/hamilton/experimental/h_async.py", line 68, in new_fn
    fn_kwargs = await await_dict_of_tasks(task_dict)
  File "hamilton/hamilton/experimental/h_async.py", line 20, in await_dict_of_tasks
    coroutines_gathered = await asyncio.gather(*coroutines)
  File "hamilton/hamilton/experimental/h_async.py", line 33, in process_value
    return await val
  File "hamilton/hamilton/experimental/h_async.py", line 71, in new_fn
    return fn(**fn_kwargs)
  File "hamilton/hamilton/function_modifiers/recursive.py", line 348, in fn
    return _callabl(**new_kwargs)
  File "hamilton/examples/async/async_module.py", line 20, in foo
    return request_raw.get("foo", "far")
AttributeError: 'coroutine' object has no attribute 'get'

@elijahbenizzy
Copy link
Collaborator

Using the async fast api example:

# async_subdag.py
import async_module
import fastapi
from hamilton.experimental import h_async
from hamilton import base
from hamilton.function_modifiers import subdag, source

@subdag(
    async_module,
    inputs={"request": source("request")},
)
async def my_subdag_with_decorator(pipeline: dict) -> dict:
    return pipeline
import async_subdag
dr = h_async.AsyncDriver({}, async_subdag, result_builder=base.DictResult())

@app.post("/execute")
async def call(request: fastapi.Request) -> dict:
    """Handler for pipeline call"""
    input_data = {"request": request}
    result = await dr.execute(["my_subdag_with_decorator"], inputs=input_data)
    return result

Stacktrace:

ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/uvicorn/protocols/http/h11_impl.py", line 404, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/uvicorn/middleware/proxy_headers.py", line 84, in __call__
    return await self.app(scope, receive, send)
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/fastapi/applications.py", line 1054, in __call__
    await super().__call__(scope, receive, send)
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/applications.py", line 123, in __call__
    await self.middleware_stack(scope, receive, send)
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/middleware/errors.py", line 186, in __call__
    raise exc
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/middleware/errors.py", line 164, in __call__
    await self.app(scope, receive, _send)
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/middleware/exceptions.py", line 62, in __call__
    await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
    raise exc
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    await app(scope, receive, sender)
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/routing.py", line 762, in __call__
    await self.middleware_stack(scope, receive, send)
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/routing.py", line 782, in app
    await route.handle(scope, receive, send)
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/routing.py", line 297, in handle
    await self.app(scope, receive, send)
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/routing.py", line 77, in app
    await wrap_app_handling_exceptions(app, request)(scope, receive, send)
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
    raise exc
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    await app(scope, receive, sender)
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/starlette/routing.py", line 72, in app
    response = await func(request)
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/fastapi/routing.py", line 299, in app
    raise e
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/fastapi/routing.py", line 294, in app
    raw_response = await run_endpoint_function(
  File ".pyenv/versions/my-venv/lib/python3.9/site-packages/fastapi/routing.py", line 191, in run_endpoint_function
    return await dependant.call(**values)
  File "hamilton/examples/async/fastapi_example.py", line 21, in call
    result = await dr.execute(["my_subdag_with_decorator"], inputs=input_data)
  File "hamilton/hamilton/experimental/h_async.py", line 173, in execute
    raise e
  File "hamilton/hamilton/experimental/h_async.py", line 164, in execute
    outputs = await self.raw_execute(final_vars, overrides, display_graph, inputs=inputs)
  File "hamilton/hamilton/experimental/h_async.py", line 136, in raw_execute
    return await await_dict_of_tasks(task_dict)
  File "hamilton/hamilton/experimental/h_async.py", line 20, in await_dict_of_tasks
    coroutines_gathered = await asyncio.gather(*coroutines)
  File "hamilton/hamilton/experimental/h_async.py", line 33, in process_value
    return await val
  File "hamilton/hamilton/experimental/h_async.py", line 68, in new_fn
    fn_kwargs = await await_dict_of_tasks(task_dict)
  File "hamilton/hamilton/experimental/h_async.py", line 20, in await_dict_of_tasks
    coroutines_gathered = await asyncio.gather(*coroutines)
  File "hamilton/hamilton/experimental/h_async.py", line 33, in process_value
    return await val
  File "hamilton/hamilton/experimental/h_async.py", line 68, in new_fn
    fn_kwargs = await await_dict_of_tasks(task_dict)
  File "hamilton/hamilton/experimental/h_async.py", line 20, in await_dict_of_tasks
    coroutines_gathered = await asyncio.gather(*coroutines)
  File "hamilton/hamilton/experimental/h_async.py", line 33, in process_value
    return await val
  File "hamilton/hamilton/experimental/h_async.py", line 68, in new_fn
    fn_kwargs = await await_dict_of_tasks(task_dict)
  File "hamilton/hamilton/experimental/h_async.py", line 20, in await_dict_of_tasks
    coroutines_gathered = await asyncio.gather(*coroutines)
  File "hamilton/hamilton/experimental/h_async.py", line 33, in process_value
    return await val
  File "hamilton/hamilton/experimental/h_async.py", line 71, in new_fn
    return fn(**fn_kwargs)
  File "hamilton/hamilton/function_modifiers/recursive.py", line 348, in fn
    return _callabl(**new_kwargs)
  File "hamilton/examples/async/async_module.py", line 20, in foo
    return request_raw.get("foo", "far")
AttributeError: 'coroutine' object has no attribute 'get'

I'm hoping it should be as simple as adding alternate implementations to:

  1. def new_function(**kwargs):

E.G. switching these to async def when the function is async would (hopefully) solve it.

I also want to add a little more tooling to the node calass (E.G. it should know if its async), so we can do this more ergonomically.

elijahbenizzy added a commit that referenced this issue May 14, 2024
This just adds another path for the two functions we redefine. We need
to add tests but this works on a manual test for now.

See #903
@elijahbenizzy
Copy link
Collaborator

@kpounder -- this was easy (if I got it right) -- see #905.

I've published an RC version, mind giving your code a spin under 1.62.0rc0? The nice thing is you can just use @subdag, you don't need another decorator. This fixes @skrawcz's example but I haven't added comprehensive testing yet.

Note there are a few decorators that might not work with async, let us know if you find more and we'll fix them too!

elijahbenizzy added a commit that referenced this issue May 14, 2024
This just adds another path for the two functions we redefine. We need
to add tests but this works on a manual test for now.

See #903
@kpounder
Copy link
Author

@elijahbenizzy it seems to work! 🎉 will do a bit more playing around with it, but thank you very much

elijahbenizzy added a commit that referenced this issue May 14, 2024
This just adds another path for the two functions we redefine. We need
to add tests but this works on a manual test for now.

See #903
elijahbenizzy added a commit that referenced this issue May 14, 2024
This just adds another path for the two functions we redefine. We need
to add tests but this works on a manual test for now.

See #903
elijahbenizzy added a commit that referenced this issue May 14, 2024
This just adds another path for the two functions we redefine. We need
to add tests but this works on a manual test for now.

See #903
@skrawcz
Copy link
Collaborator

skrawcz commented May 14, 2024

Closing -- releasing this soon.

@skrawcz skrawcz closed this as completed May 14, 2024
@skrawcz skrawcz linked a pull request May 14, 2024 that will close this issue
7 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants