Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ordered list state for FnApi. #30317

Open
wants to merge 19 commits into
base: master
Choose a base branch
from

Conversation

shunping
Copy link
Contributor

@shunping shunping commented Feb 14, 2024

First attempt to implement ordered list state for FnApi. Notice that caching has not been implemented yet.

Reference:
https://docs.google.com/document/d/1U77sAvE6Iy9XsVruRYHxPdFji7nqS6HPi1XU8fhyrxs/


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@shunping shunping marked this pull request as draft February 14, 2024 20:49
@shunping
Copy link
Contributor Author

Run Python PreCommit 3.9

Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't get yet to implementation/test

// A response to the get state request for an ordered list.
message OrderedListStateGetResponse {
bytes continuation_token = 1;
bytes data = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we return multiple elements if the request was a range?

should we return the sort-key for elements, that seems like part of the user-data for example if it's some id/timestamp the user might want it back instead of having to duplicate it in the payload as well.

should this be repeated OrderedListEntry instead of data?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For efficiency reasons we have generally not split individual elements up into individual field protos, and provided them as contiguous bytes of data. Makes sense to do that here. But the coding should be specified (e.g. is this a concatenation of several encoded KV<sort-key, value>s?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also consider if there are considerations in the encoding of sort-key. If they're (typically) timestamps, bigendian might be preferable to varint.

Copy link
Contributor Author

@shunping shunping Feb 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we return multiple elements if the request was a range?

We concatenate the encoded entries in a byte array and send them back in chunks with corresponding continuation token.

should this be repeated OrderedListEntry instead of data?

I have considered this option. Besides the efficiency reason @robertwb mentioned, I also find that representing the response as bytes has an advantage of reusing the existing code in https://github.com/apache/beam/blob/3693174c0421d0ff049042ca283db633431892ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java. This iterator is used to parsed the returned data block (in not only OrderedListState but also Multimap, Bag, etc) and it supports blocks even if the boundary is not aligned with entries/elements. I think this is not achievable with OrderedListEntry.

But the coding should be specified (e.g. is this a concatenation of several encoded KV<sort-key, value>s

That's right. It is basically the concatenation of encoded TimestampedValue, since TimestampedValue is already in use in the sdk interface of OrderedListState

extends GroupingState<TimestampedValue<T>, Iterable<TimestampedValue<T>>> {

// timestamp range is not specified, the runner should use [MIN_INT64,
// MAX_INT64) by default.
message OrderedListStateGetRequest {
bytes continuation_token = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment on continuation token, ie should be returned by previous response and the key/range should match the previous request generating the token

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to require returning the range if there's a continuation token involved? Or should they be mutually exclusive?

Copy link
Contributor Author

@shunping shunping Feb 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment on continuation token ...

Good idea! I will add that when I revise the code.

Do we want to require returning the range if there's a continuation token involved? Or should they be mutually exclusive?

I don't think the range is a hard requirement.
In fact in my simple implementation of the fake client, I put the current sort key and the current block index into the continuation token:

// The continuation format here is the sort key (long) followed by an index (int)
.
In other implementation, you may need the range, but I think it is implementation-dependent. That's why I hesitate to put a range/sort key as a separate field when continuation token is present.

I thought the continuation token should allow the runner to uniquely identify where to find the next block of data. Is that what you mean by "mutually exclusive"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's what I meant by mutually exclusive. If we allow providing both, we have to figure out what to do when they disagree, and there's no good usecase for that.

// timestamp range is not specified, the runner should use [MIN_INT64,
// MAX_INT64) by default.
message OrderedListStateGetRequest {
bytes continuation_token = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to require returning the range if there's a continuation token involved? Or should they be mutually exclusive?

// A response to the get state request for an ordered list.
message OrderedListStateGetResponse {
bytes continuation_token = 1;
bytes data = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For efficiency reasons we have generally not split individual elements up into individual field protos, and provided them as contiguous bytes of data. Makes sense to do that here. But the coding should be specified (e.g. is this a concatenation of several encoded KV<sort-key, value>s?


// A request to update an ordered list
message OrderedListStateUpdateRequest {
// when the request is processed, deletes should always happen before inserts.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we instead split this up into two separate requests, e.g. OrderedListStateInsertRequest and OrderedListStateDeleteRequest?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussion 2 in the design doc talked about this, but there is no conclusion on that https://docs.google.com/document/d/1U77sAvE6Iy9XsVruRYHxPdFji7nqS6HPi1XU8fhyrxs/

I am fine with splitting them though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be helpful to outline the pro/con in the design doc of little decisions like, and note which one was chosen and why.

For example one benefit to splitting the requests is to avoid ordering issues. We would have to spec that either the inserts or deletes happen first, even though they are in one request together. It is a bit confusing. And then if you want them in the other order, you still have to make two requests but each one has an empty field.

And note whether there is an efficiency consideration.

Copy link
Contributor Author

@shunping shunping Feb 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. I am planning to add an addendum to the original design doc to summarize the decisions we make here. We should have that after this round of review completes.

message OrderedListStateUpdateRequest {
// when the request is processed, deletes should always happen before inserts.
repeated OrderedListRange deletes = 1;
repeated OrderedListEntry inserts = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be symmetry between the written and read data? E.g. one writes bytes that are encoded KV<sort_key, data> and then reads them?

// A response to the get state request for an ordered list.
message OrderedListStateGetResponse {
bytes continuation_token = 1;
bytes data = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also consider if there are considerations in the encoding of sort-key. If they're (typically) timestamps, bigendian might be preferable to varint.

;
try {
if (sortKey < start || sortKey >= end) {
throw new IndexOutOfBoundsException("sort key out of range");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, this goes back to the idea that range and (opaque) continuation token should be mutually exclusive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment in a previous thread: #30317 (comment)

* The range information is placed in the state key of ordered list
* For consistency, we reuse the existing get request and response
  mesasages of other states like Bag, MultiMap, etc.
* Reuse existing messages of clear and append.
* Replace String::size() > 0 with String::isEmpty()
* Return this in readLater and readRangeLater instead of throwing
  an exception
* Remove the added SupressWarnings("unchecked")
Previously, we used a repeated OrderedListEntry field in the
AppendRequest particularly for ordered list state. For consistency,
we now get rid of that and use the same data field as other states.
@shunping shunping marked this pull request as ready for review March 6, 2024 16:42
Copy link
Contributor

github-actions bot commented Mar 6, 2024

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @robertwb for label java.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@shunping
Copy link
Contributor Author

shunping commented Mar 6, 2024

Run Java PreCommit

createOrderedListUserState(key, elemCoder);

@Override
public void clear() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want a separate notion of "clear"? i.e. one that deletes all state associated with this OrderedList and not just clearRange(min, max), which would only delete all the elements from the OrderedList?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The clear() function in OrderedListState is quite similar to its counterpart in MultiMap, and I don't think we ever has the notion of deleting all states associated with a state object. There is one function called "asyncClose()" which will sync the local copy with the remote one and then invalidate any states associated with the state object.

On a different topic, I notice that we have a clear() function in the implementation, which handles the special case of deleting all elements from an OrderedList more efficiently than clearRange(min, max). I should probably use that instead.

@@ -1075,6 +1103,12 @@ message StateClearRequest {}
// A response to clear state.
message StateClearResponse {}

// A message describes a sort key range [start, end).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment about these values needing to be within [BoundedWindow.TIMESTAMP_MIN_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE)? Should these be the default values for these fields?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

import org.joda.time.Instant;

/**
* An implementation of a bag user state that utilizes the Beam Fn State API to fetch, clear and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ordered list

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

// Remove items (in a collection) in the specific range from pendingAdds.
// The old values of the removed sub map are kept, so that they will still be accessible in
// pre-existing iterables even after the sort key is cleared.
pendingAdds.subMap(minTimestamp, true, limitTimestamp, false).clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the same as just calling .subMap(minTimestamp, limitTimestamp).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. Will make the change.

public void asyncClose() throws Exception {
isClosed = true;

if (!pendingRemoves.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can processing the pendingRemoves and pendingAdds be combined into a single StateRequest across the FnApi? Fewer roundtrips would probably be better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it'd be nice if we could clear multiple ranges in a single request.

Copy link
Contributor Author

@shunping shunping May 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can processing the pendingRemoves and pendingAdds be combined into a single StateRequest across the FnApi? Fewer roundtrips would probably be better.

Hmmmm, again this is about whether we want to be consistent with other states and reuse the existing StateRequest OR create a new one particularly for OrderListState.

We had this discussion previously at #30317 (comment), and I think the decision was to follow the existing states and reuse the current StateRequest, which means we will have Append to do the insertion only and Clear to do the deletion.

Also, it'd be nice if we could clear multiple ranges in a single request.

There is no extra field in Clear request message; it is invoked on a state key that contains a single range. That's why we only do one range at a time.

// (1) a sort key is added to or removed from pendingAdds, or
// (2) a new value is added to an existing sort key
ArrayList<PrefetchableIterable<TimestampedValue<T>>> pendingAddsInRange = new ArrayList<>();
for (Entry<Instant, Collection<T>> kv :
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not super-clear on how Iterables work in Java. Does this actually copy the elements, or is it just wrapping the values in pendingAdds in an Iterable? I'm curious about the statements in the comments saying "the values are kept" so that we can make modifications to the ordered list and not mess up existing Iterables.

Copy link
Contributor Author

@shunping shunping May 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this actually copy the elements, or is it just wrapping the values in pendingAdds in an Iterable?

Iterables in java work like iterables in other languages. Basically, we use it to traverse an existing collection of elements without making a copy of them.

I'm curious about the statements in the comments saying "the values are kept" so that we can make modifications to the ordered list and not mess up existing Iterables.

The PrefetchableIterable interface in Beam is specifically for iterables that support prefetching. The function PrefetchableIterables.limit() is applied on an iterable, resulting in a new iterable, which can be used to traverse its backend collection until the number of elements reaches a preset limit.

Here is the tricky part.

  • Note that pendingAddsInRange is a list of iterables. For each of these iterables, when an iteration is performed at any time after its creation, the result will be truncated to the last element we see at the time we initialize the iterable with PrefetchableIterables.limit(). In other words, appending a new element to the end of an existing key of pendingAdds would not affect the outcome above, because the iterable in pendingAddsInRange is designed to be truncated to the size when it is created.
  • When we add a new key to pendingAdds, it won't change the outcome of traversing an pre-existing pendingAddsInRange either, because the iterable of the new key is not included in pendingAddsInRange.
  • When we delete an existing key from pendingAdds, we remove the key-value mapping, but not touch its value (i.e. the collection of elements). Therefore, the iterables in pre-existing pendingAddsInRange will still work as the backend collection is accessible. Only when no iterables reference these collections will they be garbage collected.

}

@Test
public void testAdd() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to have some tests where we're partway through an Iterable when the element gets added? In all of the ones I see here we completely process each Iterable before we do any further operations on it (by calling Iterables.toArray).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I will add some more tests to cover this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also need to add something here:

// TODO(https://github.com/apache/beam/issues/20486): Add protocol to support OrderedListState

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or are we going to piggy-back on multimap for this? (If so we should delete the TODO.)

Copy link
Contributor Author

@shunping shunping May 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I think we will need a separate one for orderedlist because although orderedlist and multimap have similar semantics, the code paths are totally different.

Copy link
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @kennknowles for label java.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

Copy link
Contributor

Reminder, please take a look at this pr: @kennknowles

Copy link
Contributor

github-actions bot commented Apr 3, 2024

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @Abacn for label java.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@Abacn
Copy link
Contributor

Abacn commented Apr 3, 2024

waiting on author

Copy link
Contributor

Reminder, please take a look at this pr: @Abacn

Copy link
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @kennknowles for label java.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@shunping
Copy link
Contributor Author

stop reviewer notifications

Copy link
Contributor

Stopping reviewer notifications for this pull request: requested by reviewer

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants