Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous context manager crash with langchain-openai >= 0.1.2 #9077

Closed
backmask opened this issue Apr 24, 2024 · 9 comments · Fixed by #9321
Closed

Asynchronous context manager crash with langchain-openai >= 0.1.2 #9077

backmask opened this issue Apr 24, 2024 · 9 comments · Fixed by #9321
Labels

Comments

@backmask
Copy link

Summary of problem

ddtrace breaks streaming with langchain-openai >= 0.1.2

Which version of dd-trace-py are you using?

2.8.2

Which version of pip are you using?

23.3.1

How can we reproduce your problem?

The following code will crash:

async for message in ChatOpenAI(...).astream("My prompt"):
  print(message)

With the following exception:

Traceback (most recent call last):
  File "/.venv/lib/python3.12/site-packages/langchain_core/language_models/chat_models.py", line 293, in astream
    async for chunk in self._astream(
  File "/.venv/lib/python3.12/site-packages/langchain_openai/chat_models/base.py", line 560, in _astream
    async with response:
TypeError: 'async_generator' object does not support the asynchronous context manager protocol

The root cause appears to be in the way ChatOpenAI is patched. The langchain-openai code was recently updated to use context managers:

response = await self.async_client.create(messages=message_dicts, **params)
async with response:
  ...

(PR: https://github.com/langchain-ai/langchain/pull/18013/files#diff-197e34e7a297cef90049a95d035073870fb484896bab67f800a95fe190ab02bfR560)

However, the tracer returns a type that isn't compatible

@emmettbutler
Copy link
Collaborator

@Yun-Kim

@mintuhouse
Copy link

Adding another stacktrace (not using async)

ERROR:    Exception in ASGI application"{"TimeStamp":"2024-05-06T04:13:32.6669377Z","Log":"
  + Exception Group Traceback (most recent call last):"
  |   File "/.cache/venv/lib/python3.12/site-packages/starlette/_utils.py", line 87, in collapse_excgroups"
  |     yield"
  |   File "/.cache/venv/lib/python3.12/site-packages/starlette/middleware/base.py", line 190, in __call__"
  |     async with anyio.create_task_group() as task_group:"
  |   File "/.cache/venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 678, in __aexit__"
  |     raise BaseExceptionGroup("
  | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)"
  +-+---------------- 1 ----------------"
    | Traceback (most recent call last):"
    |   File "/.cache/venv/lib/python3.12/site-packages/uvicorn/protocols/http/httptools_impl.py", line 426, in run_asgi"
    |     result = await app(  # type: ignore[func-returns-value]"
    |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"
    |   File "/.cache/venv/lib/python3.12/site-packages/uvicorn/middleware/proxy_headers.py", line 84, in __call__"
    |     return await self.app(scope, receive, send)"
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"
    |   File "/.cache/venv/lib/python3.12/site-packages/fastapi/applications.py", line 1054, in __call__"
    |     await super().__call__(scope, receive, send)"
    |   File "/.cache/venv/lib/python3.12/site-packages/starlette/applications.py", line 123, in __call__"
    |     await self.middleware_stack(scope, receive, send)"
    |   File "/.cache/venv/lib/python3.12/site-packages/ddtrace/contrib/asgi/middleware.py", line 290, in __call__"
    |     return await self.app(scope, receive, wrapped_send)"
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"
    |   File "/.cache/venv/lib/python3.12/site-packages/starlette/middleware/errors.py", line 186, in __call__"
    |     raise exc"
    |   File "/.cache/venv/lib/python3.12/site-packages/starlette/middleware/errors.py", line 164, in __call__"
    |     await self.app(scope, receive, _send)"
    |   File "/.cache/venv/lib/python3.12/site-packages/asgi_correlation_id/middleware.py", line 90, in __call__"
    |     await self.app(scope, receive, handle_outgoing_request)"
    |   File "/.cache/venv/lib/python3.12/site-packages/starlette/middleware/base.py", line 189, in __call__"
    |     with collapse_excgroups():"
    |   File "/usr/local/lib/python3.12/contextlib.py", line 158, in __exit__"
    |     self.gen.throw(value)"
    |   File "/.cache/venv/lib/python3.12/site-packages/starlette/_utils.py", line 93, in collapse_excgroups"
    |     raise exc"
    |   File "/.cache/venv/lib/python3.12/site-packages/starlette/middleware/base.py", line 191, in __call__"
    |     response = await self.dispatch_func(request, call_next)"
    |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"
    |   File "/mlai/src/config/logger.py", line 28, in log_request"
    |     return await call_next(request)"
    |            ^^^^^^^^^^^^^^^^^^^^^^^^"
    |   File "/.cache/venv/lib/python3.12/site-packages/starlette/middleware/base.py", line 165, in call_next"
    |     raise app_exc"
    |   File "/.cache/venv/lib/python3.12/site-packages/starlette/middleware/base.py", line 151, in coro"
    |     await self.app(scope, receive_or_disconnect, send_no_error)"
    |   File "/.cache/venv/lib/python3.12/site-packages/starlette/middleware/trustedhost.py", line 53, in __call__"
    |     await self.app(scope, receive, send)"
    |   File "/.cache/venv/lib/python3.12/site-packages/starlette/middleware/base.py", line 189, in __call__"
    |     with collapse_excgroups():"
    |   File "/usr/local/lib/python3.12/contextlib.py", line 158, in __exit__"
    |     self.gen.throw(value)"
    |   File "/.cache/venv/lib/python3.12/site-packages/starlette/_utils.py", line 93, in collapse_excgroups"
    |     raise exc"
    |   File "/.cache/venv/lib/python3.12/site-packages/starlette/middleware/base.py", line 191, in __call__"
    |     response = await self.dispatch_func(request, call_next)"
    |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"
    |   File "/mlai/src/middleware/authentication.py", line 41, in authentication"
    |     response = await call_next(request)"
    |                ^^^^^^^^^^^^^^^^^^^^^^^^"
    |   File "/.cache/venv/lib/python3.12/site-packages/starlette/middleware/base.py", line 165, in call_next"
    |     raise app_exc"
    |   File "/.cache/venv/lib/python3.12/site-packages/starlette/middleware/base.py", line 151, in coro"
    |     await self.app(scope, receive_or_disconnect, send_no_error)"
    |   File "/.cache/venv/lib/python3.12/site-packages/starlette/middleware/exceptions.py", line 65, in __call__"
    |     await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)"
    |   File "/.cache/venv/lib/python3.12/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app"
    |     raise exc"
    |   File "/.cache/venv/lib/python3.12/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app"
    |     await app(scope, receive, sender)"
    |   File "/.cache/venv/lib/python3.12/site-packages/starlette/routing.py", line 756, in __call__"
    |     await self.middleware_stack(scope, receive, send)"
    |   File "/.cache/venv/lib/python3.12/site-packages/starlette/routing.py", line 776, in app"
    |     await route.handle(scope, receive, send)"
    |   File "/.cache/venv/lib/python3.12/site-packages/starlette/routing.py", line 297, in handle"
    |     await self.app(scope, receive, send)"
    |   File "/.cache/venv/lib/python3.12/site-packages/starlette/routing.py", line 77, in app"
    |     await wrap_app_handling_exceptions(app, request)(scope, receive, send)"
    |   File "/.cache/venv/lib/python3.12/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app"
    |     raise exc"
    |   File "/.cache/venv/lib/python3.12/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app"
    |     await app(scope, receive, sender)"
    |   File "/.cache/venv/lib/python3.12/site-packages/starlette/routing.py", line 72, in app"
    |     response = await func(request)"
    |                ^^^^^^^^^^^^^^^^^^^"
    |   File "/.cache/venv/lib/python3.12/site-packages/fastapi/routing.py", line 278, in app"
    |     raw_response = await run_endpoint_function("
    |                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^"
    |   File "/.cache/venv/lib/python3.12/site-packages/fastapi/routing.py", line 191, in run_endpoint_function"
    |     return await dependant.call(**values)"
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"
    |   File "/mlai/main.py", line 103, in decide_and_act"
    |     return router.route(body)"
    |            ^^^^^^^^^^^^^^^^^^"
    |   File "/mlai/src/router.py", line 102, in route"
    |     agent_response = self.agent_executor.invoke({"input": json.dumps(input_for_agent)})"
    |                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"
    |   File "/.cache/venv/lib/python3.12/site-packages/ddtrace/contrib/trace_utils.py", line 334, in wrapper"
    |     return func(mod, pin, wrapped, instance, args, kwargs)"
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"
    |   File "/.cache/venv/lib/python3.12/site-packages/ddtrace/contrib/langchain/patch.py", line 612, in traced_chain_call"
    |     final_outputs = func(*args, **kwargs)"
    |                     ^^^^^^^^^^^^^^^^^^^^^"
    |   File "/.cache/venv/lib/python3.12/site-packages/langchain/chains/base.py", line 163, in invoke"
    |     raise e"
    |   File "/.cache/venv/lib/python3.12/site-packages/langchain/chains/base.py", line 153, in invoke"
    |     self._call(inputs, run_manager=run_manager)"
    |   File "/.cache/venv/lib/python3.12/site-packages/langchain/agents/agent.py", line 1432, in _call"
    |     next_step_output = self._take_next_step("
    |                        ^^^^^^^^^^^^^^^^^^^^^"
    |   File "/.cache/venv/lib/python3.12/site-packages/langchain/agents/agent.py", line 1138, in _take_next_step"
    |     ["
    |   File "/.cache/venv/lib/python3.12/site-packages/langchain/agents/agent.py", line 1166, in _iter_next_step"
    |     output = self.agent.plan("
    |              ^^^^^^^^^^^^^^^^"
    |   File "/.cache/venv/lib/python3.12/site-packages/langchain/agents/agent.py", line 514, in plan"
    |     for chunk in self.runnable.stream(inputs, config={"callbacks": callbacks}):"
    |   File "/.cache/venv/lib/python3.12/site-packages/langchain_core/runnables/base.py", line 2875, in stream"
    |     yield from self.transform(iter([input]), config, **kwargs)"
    |   File "/.cache/venv/lib/python3.12/site-packages/langchain_core/runnables/base.py", line 2862, in transform"
    |     yield from self._transform_stream_with_config("
    |   File "/.cache/venv/lib/python3.12/site-packages/langchain_core/runnables/base.py", line 1881, in _transform_stream_with_config"
    |     chunk: Output = context.run(next, iterator)  # type: ignore"
    |                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^"
    |   File "/.cache/venv/lib/python3.12/site-packages/langchain_core/runnables/base.py", line 2826, in _transform"
    |     for output in final_pipeline:"
    |   File "/.cache/venv/lib/python3.12/site-packages/langchain_core/runnables/base.py", line 1282, in transform"
    |     for ichunk in input:"
    |   File "/.cache/venv/lib/python3.12/site-packages/langchain_core/runnables/base.py", line 4736, in transform"
    |     yield from self.bound.transform("
    |   File "/.cache/venv/lib/python3.12/site-packages/langchain_core/runnables/base.py", line 1300, in transform"
    |     yield from self.stream(final, config, **kwargs)"
    |   File "/.cache/venv/lib/python3.12/site-packages/langchain_core/language_models/chat_models.py", line 249, in stream"
    |     raise e"
    |   File "/.cache/venv/lib/python3.12/site-packages/langchain_core/language_models/chat_models.py", line 229, in stream"
    |     for chunk in self._stream(messages, stop=stop, **kwargs):"
    |   File "/.cache/venv/lib/python3.12/site-packages/langchain_openai/chat_models/base.py", line 480, in _stream"
    |     with self.client.create(messages=message_dicts, **params) as response:"
    | TypeError: 'generator' object does not support the context manager protocol"
    +------------------------------------"}

@mintuhouse
Copy link

mintuhouse commented May 6, 2024

btw there is a pending PR reverting above linked change
(Not sure if it will get merged though)

Above PR won't be merged. Can this be fixed?

@theseus905
Copy link

Will a fix for this be included in v2.9.0?

@mintuhouse
Copy link

Tagging @Yun-Kim for visibility since I see you in most of openai related commits

Please retag if you know who to ask

@Yun-Kim
Copy link
Contributor

Yun-Kim commented May 20, 2024

Thanks for raising the issue, and apologies for the late response.

We're working on getting a fix out ASAP. In the meantime as a workaround, I recommend pinning your version of langchain-openai<0.1.2.

@dariatsenter
Copy link

Confirming this is happening for

langchain = "^0.1.16"
ddtrace = "^2.3.1"

Yun-Kim added a commit that referenced this issue May 21, 2024
Fixes #9077.

This PR changes the way we patch streamed completion/chat responses in
the OpenAI integration. Previously we had been returning generator
functions that yielded the value of each chunk. However, this was
incorrect as `OpenAIStream/OpenAIAsyncStream` objects can be used as
context managers as of `openai>=1.6.0`, which means returning a
generator function to replace `OpenAIStream` would break user
applications that tried to access stream responses as context managers:

```python
with openai.OpenAI().completions.create(..., stream=True) as response:
    for chunk in response:
        print(chunk)
```
There is no introduced/changed functionality other than the fact that we
now returned a wrapped `TracedOpenAIStream` object instead of the
previous traced generator function.

Note: This PR may seem much larger than it really is, we also updated
the tested versions of OpenAI so the riot requirement lockfiles have
been changed.

## Checklist

- [x] Change(s) are motivated and described in the PR description
- [x] Testing strategy is described if automated tests are not included
in the PR
- [x] Risks are described (performance impact, potential for breakage,
maintainability)
- [x] Change is maintainable (easy to change, telemetry, documentation)
- [x] [Library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
are followed or label `changelog/no-changelog` is set
- [x] Documentation is included (in-code, generated user docs, [public
corp docs](https://github.com/DataDog/documentation/))
- [x] Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))
- [x] If this PR changes the public interface, I've notified
`@DataDog/apm-tees`.

## Reviewer Checklist

- [x] Title is accurate
- [x] All changes are related to the pull request's stated goal
- [x] Description motivates each change
- [x] Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- [x] Testing strategy adequately addresses listed risks
- [x] Change is maintainable (easy to change, telemetry, documentation)
- [x] Release note makes sense to a user of the library
- [x] Author has acknowledged and discussed the performance implications
of this PR as reported in the benchmarks PR comment
- [x] Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)
@mattflo
Copy link

mattflo commented May 22, 2024

Thanks for getting a fix for this @Yun-Kim!

Just to point out, I ran into this with langchain, but it is an issue with just openai and ddtrace:

ddtrace                                 2.8.5          Datadog APM client...
langchain-openai                        0.1.2          An integration pac...
openai                                  1.30.1         The official Pytho...

The following test illustrates the issue using openai without langchain:

import openai
import pytest

@pytest.mark.asyncio
async def test_openai_client():
    client = openai.AsyncOpenAI().chat.completions

    # this exhibits the issue only when run under ddtrace
    # TypeError: 'async_generator' object does not support the asynchronous context manager protocol
    response = await client.create(
        messages=[
            {
                "role": "user",
                "content": "what is the meaning of life?",
            }
        ],
        model="gpt-3.5-turbo-0125",
        stream=True,
    )

    # type of response:
    # without ddtrace: <class 'openai.AsyncStream'>
    # with ddtrace: <class 'async_generator'>
    print(type(response))

    async with response as resp:
        async for chunk in resp:
            print(chunk)

Failure output:

>       async with response as resp:
E       TypeError: 'async_generator' object does not support the asynchronous context manager protocol

@Yun-Kim
Copy link
Contributor

Yun-Kim commented May 22, 2024

Yes, thanks for clarifying @mattflo, this is a bug with our openAI integration which didn't account for the fact that OpenAI started making Stream/AsyncStream objects be usable as context managers starting in openai>=1.6.0. We will get this fixed released in the coming weeks for 2.8 and 2.9, in the meantime I would recommend pinning your version of openai to a previous version.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants