Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: FoldStreamAsync #89

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

thinkbeforecoding
Copy link

This is a proposition for a FoldStreamAsync method on the .net client.
The goal is to make it easy to both fold the stream and get the last event number.
If you don't need the last event number, it not too compicated using Linq.Async:

client.ReadAsync(...).SelectMany(e => deserialize(e).ToAsyncEnumerable()).AggregateAsync(aggregator, seed);

The reason for the SelectMany is that, this way, the serializer can return an empty list to ignore an event, or a list with multiple value when a stored event would be better splitted in several events. In most cases the list contains a single element.
It is to be noted that a check on the client.ReadAsync result is necessary to avoid an exception on NotFound stream.

When appending an event to the stream, an expected version is required, and it should be the version of the last event.
In the code above, it means adding the version number along the results of the deserializer, the in the aggregator, maintaining the last version. it also has to be fed with the seed (this is not necessarily totally valid C# syntax):

   client.ReadAsync(..version..)
       .SelectMany(e => deserialize(e).Select(v => (v,e.Event.EventNumber)).ToAsyncEnumerable())
       .AggregateAsync(((_,state), (v,e)) => (v,aggregator(state,e)), (version, seed))

This is doable but tedious and introduce several intermediate enumerable and tuples that will bloat the GC.

This PR propose an implementation of FoldAsync including this recurring pattern in a more integrated way.

 client.FoldAsync(deserialize, aggregator, streamname, revision, seed);
  • The deserializer is a ResolvedEvent -> IEnumerable.
  • The aggregator a (T,E) -> T
  • The seed a T.
    The result is a structure that contains the final value of the aggregator and the revision of the last event

If the stream is empty of not found, it returns the seen with revision None or the input revision.

@thinkbeforecoding
Copy link
Author

There is of course more work to do on this PR (tests, documentation, I'm not even sure it's correctly using the GRPC Api), but the idea is to start a discussion. Especially about the ordering of parameters (there are good reasons for ordering them differently). About the deserializer (returning an IEnumerable or not), perf etc.

(and of course take time to answer this on your work time. Sorry for posting this during the weekend)

@thinkbeforecoding
Copy link
Author

(seems the builds failled for another reason..)

Copy link
Contributor

@thefringeninja thefringeninja left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Loos good overall, just a couple of requested changes. Also needs some testing around it.

@thinkbeforecoding
Copy link
Author

About the argument order, for now I choose the following:

client.FoldAsync(deserialize, aggregator, streamname, revision, seed);

the reason is that deserialize and aggregator are the most stable arguments. after that, the streamname is usually changing but for a single stream we have many revisions. The revision and seed go together, as in the FoldResult. this is a revision an a state.

But to look more like the Linq Aggregate method, we could group (aggregator, seed), and (streamname,revision) .. one part being the fold part, the other the collection part..

@thinkbeforecoding
Copy link
Author

Last thing I need to be validated:

The function returns the Revision of the last Event read in the stream, or None if the stream is not found.

This is what is expected by AppendToStreamAsync expectedRevision, right ?

@thinkbeforecoding
Copy link
Author

Something else...
Let's say, I used Fold to compute current state, and get the last Event revision in the result (FoldResult).
From there, after some time I want to make sure that my state is uptodate but without reloading everything. I can just fold from result.Value. But if I pass result.Revision as a parameter to Fold, it will fold the last event
of the previous call again...

Would you recomment a NextRevision property on FoldResult to easily pass it to fold again ? (it would return Start when result.Revision is None)

@thinkbeforecoding
Copy link
Author

Now with better documentation, and unit tests.

@thinkbeforecoding
Copy link
Author

And this time, everything is green 🎉

@thinkbeforecoding thinkbeforecoding changed the title FoldStreamAsync Draft: FoldStreamAsync Dec 17, 2020
@thinkbeforecoding
Copy link
Author

thinkbeforecoding commented Dec 17, 2020

To be consistent, the FoldResult.Revision would ideally always be equal to the last event StreamPosition.
This works as expected for empty stream and when there are returned events but is not really possible yet when the input revision is at the end of the stream and no event gets returned.
Current implementation does a -1 on the input revision, but it's definitely a hack.
Could I have access to the last Event revision in this case ?
For instance, when the stream doesn't exist, a single element indicating it's not found is returned.
Could it be possible in this case that a similar element would be returned indicating the last revision in the stream ?

@thinkbeforecoding
Copy link
Author

This is mainly due to the way expectedVersion is understood by AppendToStream.
in AppendToStream, the expectedVersion is the position of the current last event in the stream. The return value indicated this new position.
But this position cannot be used in a snapshot, because reading from this position returns the last event that has already been taken into account. So in the snapshot, the version should be the position of the last event + 1.
This way, when reading a snapshot that is uptodate, the read/fold should return no events. The problem then is that the expected version to use is.... the version in the snapshot - 1.
we could avoid this complexity if expectedVersion was not the position of the last event, but the possition where we want to append: for an empty stream it would be 0. for a stream with a single event a pos 0, it would be 1. and the function AppendToStream would return the number where we should append next.
Of course, AppendToStream would reject writes to streams where this version is not exactly the next one.

thefringeninja
thefringeninja previously approved these changes Jan 7, 2021
@thefringeninja
Copy link
Contributor

After discussing with the team it looks like we'll need to make a proto change to make this work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants