Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EventStream class based on 'pulse' prototype #1990

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

lordmauve
Copy link

Add the EventStream class, which allows multiple subscribers to a stream of events without exhibiting a missing wakeups problem.

I think the documentation is far from fully fleshed out here - I want to draw up a diagram of the read/write cursor interpretation for EventStream - but this is the bare bones of the direction I took this so I'm submitting it for sokme early feedbac.

@codecov
Copy link

codecov bot commented Apr 30, 2021

Codecov Report

Merging #1990 (c84e9dd) into master (d793272) will decrease coverage by 0.05%.
The diff coverage is 92.38%.

@@            Coverage Diff             @@
##           master    #1990      +/-   ##
==========================================
- Coverage   99.56%   99.51%   -0.06%     
==========================================
  Files         114      114              
  Lines       14618    14722     +104     
  Branches     1117     1127      +10     
==========================================
+ Hits        14554    14650      +96     
- Misses         43       49       +6     
- Partials       21       23       +2     
Impacted Files Coverage Δ
trio/__init__.py 100.00% <ø> (ø)
trio/_sync.py 98.63% <87.87%> (-1.37%) ⬇️
trio/tests/test_sync.py 99.17% <94.44%> (-0.83%) ⬇️

Copy link
Member

@njsmith njsmith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looks pretty clean overall. Thanks for picking this up! I think it'll get used a lot.

The addition of close is an interesting idea. Do you have a use case in mind? So far I've always used this for cases where there's some persistent data and subscribers are waiting for it to change, so "closing" isn't very intuitive -- the data might stop changing, but it's still there! But it's also a natural addition in some ways...

I'm uncertain about the value of coalesce=False – can you make the case for it here so we can consider? Ditto for from_start=False, and particularly for making it the default.
skeptical about coalesce=False

Naming: naming is always hard, and especially for something like this, where there's no prior art (that I know of). But, it's important, since I think this is one of those cases where at first look the semantics are counterintuitive and hard to explain, even though they feel very natural once you wrap your head around them. So if the name can point people in the right direction, it'll help a lot.

"Stream" is maybe not ideal, because we use that as a term of art for byte streams. But the big thing I want is a name that hints at the coalescing, and the suitability for tracking persistent state rather than transient events.

VersionTracker? UpdateNotifier? ChangeNotifier? Refresher? StateTracker?

except:
import traceback
traceback.print_exc()
raise
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was pytest not printing the traceback properly somehow while you were developing?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it was reporting a ResourceWarning about an async generator being disposed:

E               Traceback (most recent call last):                                                                                                                       
E                 File "/home/mauve/dev/trio/trio/_core/_asyncgens.py", line 79, in finalizer                                                                           
E                   warnings.warn(                                                                                                                                       
E               ResourceWarning: Async generator 'trio._sync.EventStream.subscribe' was garbage collected before it had been exhausted. Surround its use in 'async with a
closing(...):' to ensure that it gets cleaned up as soon as you're done using it.                                                                                        

I didn't dig into what you're doing here, and it sounds a bit odd to me, but I rolled with it. This is one reason I added the .close() method (but I think it is a good idea anyway).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, ok. A lot going on here :-)

I don't see how the except block could interact with that warning. The warning is generated in "GC context", where raising exceptions is impossible. Well, it's really a fake GC context, because we re-enter Trio and run the finalizer in async context, so Trio tries to emulate the interpreter's regular GC context... it's possible we don't do it perfectly and some kind of exception escapes somewhere. But I don't think so?

Also, that warning ought to be harmless in this case. It's pointing out that if the async generator is holding any resources like file descriptors, then they are being cleaned up by the GC rather than explicitly. Is this a useful thing to point out? tbh I'm not sure. ResourceWarnings are hidden by default so it's not necessarily a big deal to have some false positives. But in this case it's pure noise, since subscribe doesn't hold any resources. And CPython doesn't bother pointing this out when it GC's regular sync generators -- the only reason we can do it is because the GC delegates async generator cleanup to Trio. Maybe it would be better to just drop the warning entirely? It's kind of a mess.

@oremanj, any thoughts you want to add?

"""
if self._wakeup is None:
self._wakeup = trio.Event()
await self._wakeup.wait()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're worried about efficiency, then also consider implementing this with a set and wait_task_rescheduled – basically inlining Event.

.. autoclass:: EventStream
:members:


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd put the docs for this right after the docs for Event -- "if you clicked here because you were looking for multiple events, then maybe you want...". (Maybe add a cross-ref at the top of the Event docs too.)

@lordmauve
Copy link
Author

The addition of close is an interesting idea. Do you have a use case in mind?

I find it's generally useful to have a way to signal receivers to stop waiting. If you were using this to notify writes to a log file, say, then you could close when you rotate log files, causing them to exit the loop and pick up the next log file. At least, there are two outcomes for anext(), yield a value or stop iteration. By adding close() an event sender can cause its receiver to take either path, and that seems like a generally useful capability.

There's also a parallel with memory channels, when you view an EventStream as a transaction log.

I'm uncertain about the value of coalesce=False – can you make the case for it here so we can consider? Ditto for from_start=False, and particularly for making it the default.

I've mentally generalised EventStream as a transaction log, where each reader is independent and consumes all transactions in the transaction log. The EventStream class exposes integer read cursors and write cursors which indicate the position in a stream of transactions, but these counters were built into Pulse already; this is an interpretation of them.

The EventStream class doesn't hold the transactions, which I think is fine for a concurrency primitive. But what history you hold and where is up to you. You could hold a full list of transactions in memory (this doesn't scale, but it is helpful for learners), in which case the read_cursor yielded by subscribe() is an index into a list. You could hold bounded history, like 100 events in a deque. A use case here is, say, broadcasting terminal output to many connected subscribers, and a change on the terminal is a small delta of a few bytes. If a user is behind by <=100 deltas it's best to stream them the deltas, but if they are further behind then maybe you cancel the subscription, send them a full-screen snapshot and then start streaming deltas again. And then there's the case where there's zero history kept. A new event causes a wakeup but woken tasks only need to act on the status quo at the time they are woken. I think this is the aspect you have in mind.

So those are three use cases that only really vary by the amount of transaction history you expect to hold - infinite, finite, or zero.

In the infinite/finite cases, it might be most convenient to subscribe without coalescing because you want to process each transaction in turn (but maybe it's cheaper to coalesce in order to act in bulk). In the zero history case you certainly want coalescing.

If you're holding bounded history then you will certainly start each subscriber at the current write cursor. The start of the entire transaction stream isn't data you hold any more. For infinite history, the same is probably true; you don't want to send an unbounded number of transactions to a new subscriber, so makes sense to send a slice of recent data and then subscribe at the end. For zero history, we're going to want to coalesce anyway, so from_start just offers the choice of whether to iterate once on subscribe (if there have been previous events) or iterate only on the next event after the subscription starts. Both options make sense to me in different contexts.

@njsmith
Copy link
Member

njsmith commented May 11, 2021

You could hold bounded history, like 100 events in a deque. A use case here is, say, broadcasting terminal output to many connected subscribers, and a change on the terminal is a small delta of a few bytes. If a user is behind by <=100 deltas it's best to stream them the deltas, but if they are further behind then maybe you cancel the subscription, send them a full-screen snapshot and then start streaming deltas again.

This is a really interesting example, thanks!

I feel like it might be even easier to implement with coalescing, though? Something like:

prev_event = 0
async for latest_event in obj.subscribe():
    gap = latest_event - prev_event
    if gap <= 100:
        await send_deltas(history_deque[-gap:])
    else:
        await send_snapshot()
    prev_event = latest_event

OTOH if you don't coalesce events, I haven't actually implemented it, by it doesn't feel easy to me -- I think you have to separately track the latest-event counter to compare to the index you get from subscribe to figure out where the event is in the history, and if you do send a snapshot then you still have to iterate through all the old stale events to catch up, even though you aren't doing anything with them. Also I bet send_deltas can do something more efficient if you give it a whole batch of deltas all at once (e.g., pack several into a single packet), versus calling it separately for each individual delta. What do you think?

Oh hmm though, I just realized that the above code relies on a very subtle invariant: there are no checkpoints between when subscribe picks the value for latest_event, and when we look at the history deque. I... think that's probably a good invariant for subscribe to guarantee? And the PR's code does in fact guarantee it. But it's subtle enough that it probably requires some careful discussion in the docs. (And if you do switch to using wait_task_rescheduled directly, then you have to be careful to read the current value after waking up, rather than passing it to reschedule.)

Of course the other advantage of coalescing is that it nudges you away from designs that require infinite history. This is another version of the backpressure problem: in every real system like this, you need some strategy to catch up a consumer that's arbitrarily far behind. Or put another way, if you really want to send every item exactly once and you don't care about the costs, you should be using something like a memory channel, not this thing :-).

Re: from_start and also the name: On further thought, I realized I really like the name EventCounter for this primitive. It really points out the similarities/differences to Event in a natural way, makes clear what this weird incrementing integer is, and I think communicates a lot of the intuition we want. It also suggests a slightly different semantics for subscribe's first iteration:

  • The global counter starts at zero
  • Each call to subscribe starts its local counter at zero
  • subscribe yields whenever the global counter is larger than the local counter

So in this approach, initially subscribe's first iteration will block until at least one event has happened, and then once an event has happened all future subscribes will yield the current counter immediately. The example code above actually depends on this as well, sort of.

...OTOH, unconditionally yielding immediately on first iteration is probably more convenient for all the places where I've been using this, where you're waiting for a condition to become true. Of course you could trigger the counter immediately when you set it up, but it feels error-prone. Maybe yielding 0 is better, and the example above should be tweaked to handle that appropriately. It also has the nice property that if your code isn't expecting an immediate yield, you'll probably notice quickly and fix that, because it will happen on every test case.

Do you have any examples where you specifically don't want to yield immediately on first subscribe, especially where there have been previous events? I'm still struggling to visualize those.

@njsmith
Copy link
Member

njsmith commented Aug 8, 2021

Here's some other inventions of the same fundamental idea. None of this affects the PR really, but I think it's nice to have cross-references in the tracker history, and it's nice to see multiple independent efforts converge around the same abstraction:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants