Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory leak if writing to a channel that is never read from later #384

Open
davidfstr opened this issue Oct 2, 2019 · 23 comments
Open

Memory leak if writing to a channel that is never read from later #384

davidfstr opened this issue Oct 2, 2019 · 23 comments

Comments

@davidfstr
Copy link

davidfstr commented Oct 2, 2019

It appears that if you write a message to a channel, for example via group_send, and no reader ever appears on that channel, the messages will remain in the in-memory queue channels.layers.channel_layers.backends['default'].receive_buffer indefinitely when using the RedisChannelLayer backend. In particular I have captured a server that has over 100k items in that dictionary.

One way to avoid this problem would be to extend the API for group_send with a time-to-live parameter so that messages would expire over time if they weren't read. Thoughts?

My pip freeze, in case it's useful:

channels==2.1.2
channels-redis==2.2.1
@carltongibson
Copy link
Member

Thoughts?

Right, yes. 🙂

  • First off I guess, we should make clearing out stale items easier.
  • Then allowing a default TTL.
  • Then allowing that per-message.

@davidfstr
Copy link
Author

I have discovered that RedisChannelLayer already has a concept of a "group expiry", which we may want to hook into for the "set a default TTL" scenario:

class RedisChannelLayer(BaseChannelLayer):
    def __init__(..., group_expiry=86400, ...):
        self.group_expiry = group_expiry  # channels_redis.core:46

This is alluded to briefly in Channels documentation:

Groups are a broadcast system that:

  • ...
  • Provides group expiry for clean-up of connections whose disconnect handler didn’t get to run (e.g. power failure)

I was unable to find documentation for changing the group_expiry of RedisChannelLayer, but reading the source for channel/layers.py:58@_make_backend suggests that I could put in settings.py something like:

    CHANNEL_LAYERS = {
        'default': {
            'BACKEND': 'channels_redis.core.RedisChannelLayer',
            'CONFIG': {
                'hosts': ...,
                'prefix': ...,
                'group_expiry': 86400,  # seconds?; 24 hours
            },
        },
    }

Do you think it would be appropriate to hook into the existing "group_expiry" concept, or do you think a separate "message_expiry" (or similar) would be more appropriate?

@carltongibson
Copy link
Member

No, let's use what's there.

I was unable to find documentation ...

Ultimately I think this is the issue: I end up having to look at the channels_redis source to work out what the options are. So first step would be a PR explaining what's already possible.

(Maybe that'd be enough)

@davidfstr
Copy link
Author

davidfstr commented Oct 2, 2019

Agreed that the documentation should be extended to show how to configure the built-in backend types.

Unfortunately group_expiry doesn't actually control expiration of messages in the receive_buffer. So there still needs to be some logic added to perform expiration.

Fix 1 Sketch

  • First off I guess, we should make clearing out stale items easier.

I propose that receive_buffer keep track of the last time a message was read from it, on a per-channel basis. And that an API (expire_unread_channels) is provided on RedisChannelLayer that can clear any channels that were not read within the last N seconds. Example usage:

channel_layer = channels.layers.get_channel_layer()
assert isinstance(channel_layer, RedisChannelLayer)
channel_layer.expire_unread_channels(ttl=300)  # seconds; = 5 minutes; defaults to 5 minutes

Fix 2 Sketch

  • Then allowing a default TTL.

Extend Fix 1 above so that RedisChannelLayer spawns a thread that calls expire_unread_channels() on itself every M seconds, for some M (perhaps group_expiry/2?). This thread would use the configured group_expiry (mentioned earlier) as the default TTL to pass to expire_unread_channels().

@davidfstr
Copy link
Author

davidfstr commented Oct 2, 2019

Prior art: Sessions has a similar problem with clearing expired session records. It provides the following API:

clear_expired()

  • Removes expired sessions from the session store. This class method is called by clearsessions.

We could spell expire_unread_channels() instead as clear_expired() for consistency, although I like the original name a bit better. :)

@carltongibson
Copy link
Member

Yes, good. I like the build up.

If you're keen here, can I ask you to look at #165 first? It looks good at first sight, but I haven't had the bandwidth to sit down with it properly, so another pair of eyes there would be super handy!

@davidfstr
Copy link
Author

@carltongibson Is #165 potentially related to this bug?

Reviewing that would require me to learn in detail how the current and proposed receiving system works, which take a block of time (2hrs) that is difficult for me to schedule in a timely fashion.

@carltongibson
Copy link
Member

Hey @davidfstr.

...which take a block of time...

Yep. Open source. :) (This is the exact hold-up there... BANDWIDTH). No stress, no rush — if you don't have capacity to look at it, no problem! But... if you can slowly have it on the back burner, that would be a great help. (As I say, no stress about it!!!)

Is it related? To the extent that it's on the short list of improvements to be made here, and we're looking at touching the receive buffer, yes.

But... have a glance at it and see what you think. I'm happy to accept your input as it's available, so you tell me. 🙂

@davidfstr
Copy link
Author

Got it. Touches the receive buffer. So we'd have to reconcile these changes eventually anyway.

I'll keep the review in my queue. Projections at work have me looking at this again in 3 weeks, but I may get lucky and find some time earlier.

@carltongibson
Copy link
Member

OK, thanks @davidfstr! (As I say, no stress! It's OSS 🙂)

@jheld
Copy link

jheld commented Feb 26, 2020

Hello all! What's the status here? And what's the (average?) severity of it not being fixed (if that's even a reasonable question)?

All of the interactions into channels redis on my project happen on a celery worker so worst case the worker fails (as opposed to ballooning RAM on an API machine), but again, rather find a way around that!

@davidfstr
Copy link
Author

davidfstr commented Feb 26, 2020

Status: Bug doesn't hurt enough that anyone has put in the effort to fix it yet.

To workaround this (and other kinds of slowish memory leaks), I've configured most of my web services to restart after serving (N + random jitter) requests. Gunicorn in particular has a config option for this.

@adamhooper
Copy link

@jheld You can use https://github.com/CJWorkbench/channels_rabbitmq (disclaimer: I'm the author) instead of channels_redis. It uses message TTLs, limits buffer sizes, warns on buffer overflow and recovers gracefully.

@davidfstr this bug did hurt enough that we put in the effort to fix it. The fix is channels_rabbitmq.

@ryanpetrello
Copy link
Contributor

ryanpetrello commented Aug 12, 2020

@davidfstr @carltongibson

A version of this bug that I've also seen - you don't just see this if the channel is never read from later. You can also see if it the channel is read from too slowly. In the default channels_redis implementation, per-channel asyncio.Queue objects grow in an unbounded way; if they're not read at the same rate as insertion on the other end, Daphne will continue to just grow in memory consumption forever.

I'd argue that these per-channel Queue objects should probably be bound in size. There's already a capacity argument; maybe the per-channel buffer should respect that, and only buffer up to that many objects before dropping old ones?

https://github.com/django/channels_redis#capacity

I do think a goal of passive cleanup makes sense, but I think a reasonable upper bound on queue size would likely prevent many people from getting into bad situations in the first place.

@carltongibson
Copy link
Member

Hi @ryanpetrello -- Thanks for your work looking at this. Your points all seem reasonable.

If you want to block out proof-of-concept sketches, really happy to discuss those.

I'm working on the v3 update at the moment, and won't have capacity to cycle back to this immediately, so all help very much appreciated!

@adamhooper
Copy link

I think this issue is on the wrong project. It should be on channels_redis.

channels_rabbitmq doesn't have this bug. It has local_capacity and local_expiry to solve this problem.

I have helpful (but abrasive!) comments about the Redis layer's architecture -- and Channels' architecture in general. If you're dismayed by channels_redis behavior, here's a more complete picture:

The Channels layer spec -- and, heck, Channels' main loop -- has a big flaw: clients poll for messages instead of subscribing to messages. By definition, a client calls receive() on a channel that may or may not exist. In the microseconds after a client calls receive() and before it calls its next receive(), there is no client waiting on the channel and there may be no pending messages in the channel. According to the spec, the channel doesn't exist.

That leads to a corollary on the delivery side. When a Channels layer is delivering a message, it must deliver to non-existent channels!

channels_rabbitmq handles this with process-local expiry. channels_redis doesn't. (The channels_redis folks are welcome to copy channels_rabbitmq logic here.)

@carltongibson
Copy link
Member

Hey @adamhooper — We like the comments 😀

Yes, I know it should be on channels_redis, but I don't want to close the issues until I've got the bandwidth to resolve them.

...clients poll for messages instead of subscribing to messages.

Really happy to look at sketches of a reworking there.

@adamhooper
Copy link

adamhooper commented Aug 13, 2020

...clients poll for messages instead of subscribing to messages.

Really happy to look at sketches of a reworking there.

channels.consumer.AsyncConsumer.__call__:

    async def __call__(self, receive, send):
        """
        Dispatches incoming messages to type-based handlers asynchronously.
        """
        with contextlib.AsyncExitStack() as stack:
            # Initialize channel layer
            self.channel_layer = get_channel_layer(self.channel_layer_alias)
            if self.channel_layer is not None:
                channel = await stack.enter_async_context(self.channel_layer.new_channel_v2())
                self.channel_name = channel.name
            # Store send function
            if self._sync:
                self.base_send = async_to_sync(send)
            else:
                self.base_send = send
            # Pass messages in from channel layer or client to dispatch method
            try:
                if self.channel_layer is not None:
                    await await_many_dispatch(
                        [receive, channel], self.dispatch
                    )
                else:
                    await await_many_dispatch([receive], self.dispatch)
            except StopConsumer:
                # Exit cleanly
                pass

A channel object is an async iterator: __anext()__ returns a message, and __aclose()__ stops the world.

I think it would be easier to write against this API. I don't have time to actually submit and test a pull request, though :).

@mekk1t
Copy link

mekk1t commented Jan 12, 2024

Hello, everybody! Please, tell is there any progress on the issue? Maybe close the issue if it's abandoned. I'm also experiencing a memory leak in daphne-redis-websockets stack. Can't find the cause though.

@davidfstr
Copy link
Author

When somebody steps forward to fix the issue then it will be closed.

@mekk1t
Copy link

mekk1t commented Jan 14, 2024

Got it! Thank you.

@karatemir
Copy link

is group_expiry or capacity configuration options enough to workaround this issue, or do I need to restart the asgi container to free the leak?

@davidfstr
Copy link
Author

I workaround via restarting the worker that runs channels. From a previous comment of mine:

To workaround this (and other kinds of slowish memory leaks), I've configured most of my web services to restart after serving (N + random jitter) requests. Gunicorn in particular has a config option for this.

@carltongibson carltongibson transferred this issue from django/channels Apr 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants