-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement ordered list state for FnApi. #30317
base: master
Are you sure you want to change the base?
Conversation
If there are changes on a state after we obtain iterators from calling read() and readRange(), the behavior of these pre-existing iterators were incorrect in the previous implementation. The change introduced here will make sure that these iterators will still work as if no local change is made.
ac40716
to
5d7cd5e
Compare
Run Python PreCommit 3.9 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't get yet to implementation/test
// A response to the get state request for an ordered list. | ||
message OrderedListStateGetResponse { | ||
bytes continuation_token = 1; | ||
bytes data = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how do we return multiple elements if the request was a range?
should we return the sort-key for elements, that seems like part of the user-data for example if it's some id/timestamp the user might want it back instead of having to duplicate it in the payload as well.
should this be repeated OrderedListEntry instead of data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For efficiency reasons we have generally not split individual elements up into individual field protos, and provided them as contiguous bytes of data. Makes sense to do that here. But the coding should be specified (e.g. is this a concatenation of several encoded KV<sort-key, value>s?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also consider if there are considerations in the encoding of sort-key. If they're (typically) timestamps, bigendian might be preferable to varint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how do we return multiple elements if the request was a range?
We concatenate the encoded entries in a byte array and send them back in chunks with corresponding continuation token.
should this be repeated OrderedListEntry instead of data?
I have considered this option. Besides the efficiency reason @robertwb mentioned, I also find that representing the response as bytes has an advantage of reusing the existing code in https://github.com/apache/beam/blob/3693174c0421d0ff049042ca283db633431892ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java. This iterator is used to parsed the returned data block (in not only OrderedListState but also Multimap, Bag, etc) and it supports blocks even if the boundary is not aligned with entries/elements. I think this is not achievable with OrderedListEntry.
But the coding should be specified (e.g. is this a concatenation of several encoded KV<sort-key, value>s
That's right. It is basically the concatenation of encoded TimestampedValue, since TimestampedValue is already in use in the sdk interface of OrderedListState
beam/sdks/java/core/src/main/java/org/apache/beam/sdk/state/OrderedListState.java
Line 29 in 3693174
extends GroupingState<TimestampedValue<T>, Iterable<TimestampedValue<T>>> { |
// timestamp range is not specified, the runner should use [MIN_INT64, | ||
// MAX_INT64) by default. | ||
message OrderedListStateGetRequest { | ||
bytes continuation_token = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment on continuation token, ie should be returned by previous response and the key/range should match the previous request generating the token
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to require returning the range if there's a continuation token involved? Or should they be mutually exclusive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment on continuation token ...
Good idea! I will add that when I revise the code.
Do we want to require returning the range if there's a continuation token involved? Or should they be mutually exclusive?
I don't think the range is a hard requirement.
In fact in my simple implementation of the fake client, I put the current sort key and the current block index into the continuation token:
beam/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
Line 228 in 5d7cd5e
// The continuation format here is the sort key (long) followed by an index (int) |
In other implementation, you may need the range, but I think it is implementation-dependent. That's why I hesitate to put a range/sort key as a separate field when continuation token is present.
I thought the continuation token should allow the runner to uniquely identify where to find the next block of data. Is that what you mean by "mutually exclusive"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's what I meant by mutually exclusive. If we allow providing both, we have to figure out what to do when they disagree, and there's no good usecase for that.
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
Outdated
Show resolved
Hide resolved
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto
Outdated
Show resolved
Hide resolved
// timestamp range is not specified, the runner should use [MIN_INT64, | ||
// MAX_INT64) by default. | ||
message OrderedListStateGetRequest { | ||
bytes continuation_token = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to require returning the range if there's a continuation token involved? Or should they be mutually exclusive?
// A response to the get state request for an ordered list. | ||
message OrderedListStateGetResponse { | ||
bytes continuation_token = 1; | ||
bytes data = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For efficiency reasons we have generally not split individual elements up into individual field protos, and provided them as contiguous bytes of data. Makes sense to do that here. But the coding should be specified (e.g. is this a concatenation of several encoded KV<sort-key, value>s?
|
||
// A request to update an ordered list | ||
message OrderedListStateUpdateRequest { | ||
// when the request is processed, deletes should always happen before inserts. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we instead split this up into two separate requests, e.g. OrderedListStateInsertRequest and OrderedListStateDeleteRequest?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussion 2 in the design doc talked about this, but there is no conclusion on that https://docs.google.com/document/d/1U77sAvE6Iy9XsVruRYHxPdFji7nqS6HPi1XU8fhyrxs/
I am fine with splitting them though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be helpful to outline the pro/con in the design doc of little decisions like, and note which one was chosen and why.
For example one benefit to splitting the requests is to avoid ordering issues. We would have to spec that either the inserts or deletes happen first, even though they are in one request together. It is a bit confusing. And then if you want them in the other order, you still have to make two requests but each one has an empty field.
And note whether there is an efficiency consideration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. I am planning to add an addendum to the original design doc to summarize the decisions we make here. We should have that after this round of review completes.
message OrderedListStateUpdateRequest { | ||
// when the request is processed, deletes should always happen before inserts. | ||
repeated OrderedListRange deletes = 1; | ||
repeated OrderedListEntry inserts = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there be symmetry between the written and read data? E.g. one writes bytes that are encoded KV<sort_key, data> and then reads them?
// A response to the get state request for an ordered list. | ||
message OrderedListStateGetResponse { | ||
bytes continuation_token = 1; | ||
bytes data = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also consider if there are considerations in the encoding of sort-key. If they're (typically) timestamps, bigendian might be preferable to varint.
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
Outdated
Show resolved
Hide resolved
; | ||
try { | ||
if (sortKey < start || sortKey >= end) { | ||
throw new IndexOutOfBoundsException("sort key out of range"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, this goes back to the idea that range and (opaque) continuation token should be mutually exclusive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my comment in a previous thread: #30317 (comment)
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
Outdated
Show resolved
Hide resolved
* The range information is placed in the state key of ordered list * For consistency, we reuse the existing get request and response mesasages of other states like Bag, MultiMap, etc.
* Reuse existing messages of clear and append.
* Replace String::size() > 0 with String::isEmpty() * Return this in readLater and readRangeLater instead of throwing an exception * Remove the added SupressWarnings("unchecked")
Previously, we used a repeated OrderedListEntry field in the AppendRequest particularly for ordered list state. For consistency, we now get rid of that and use the same data field as other states.
5a438a3
to
90b7f7d
Compare
Assigning reviewers. If you would like to opt out of this review, comment R: @robertwb for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Run Java PreCommit |
createOrderedListUserState(key, elemCoder); | ||
|
||
@Override | ||
public void clear() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want a separate notion of "clear"? i.e. one that deletes all state associated with this OrderedList and not just clearRange(min, max)
, which would only delete all the elements from the OrderedList?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The clear() function in OrderedListState is quite similar to its counterpart in MultiMap, and I don't think we ever has the notion of deleting all states associated with a state object. There is one function called "asyncClose()" which will sync the local copy with the remote one and then invalidate any states associated with the state object.
On a different topic, I notice that we have a clear() function in the implementation, which handles the special case of deleting all elements from an OrderedList more efficiently than clearRange(min, max). I should probably use that instead.
@@ -1075,6 +1103,12 @@ message StateClearRequest {} | |||
// A response to clear state. | |||
message StateClearResponse {} | |||
|
|||
// A message describes a sort key range [start, end). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a comment about these values needing to be within [BoundedWindow.TIMESTAMP_MIN_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE)
? Should these be the default values for these fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack
import org.joda.time.Instant; | ||
|
||
/** | ||
* An implementation of a bag user state that utilizes the Beam Fn State API to fetch, clear and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ordered list
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack
// Remove items (in a collection) in the specific range from pendingAdds. | ||
// The old values of the removed sub map are kept, so that they will still be accessible in | ||
// pre-existing iterables even after the sort key is cleared. | ||
pendingAdds.subMap(minTimestamp, true, limitTimestamp, false).clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is the same as just calling .subMap(minTimestamp, limitTimestamp)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. Will make the change.
public void asyncClose() throws Exception { | ||
isClosed = true; | ||
|
||
if (!pendingRemoves.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can processing the pendingRemoves and pendingAdds be combined into a single StateRequest across the FnApi? Fewer roundtrips would probably be better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, it'd be nice if we could clear multiple ranges in a single request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can processing the pendingRemoves and pendingAdds be combined into a single StateRequest across the FnApi? Fewer roundtrips would probably be better.
Hmmmm, again this is about whether we want to be consistent with other states and reuse the existing StateRequest OR create a new one particularly for OrderListState.
We had this discussion previously at #30317 (comment), and I think the decision was to follow the existing states and reuse the current StateRequest, which means we will have Append to do the insertion only and Clear to do the deletion.
Also, it'd be nice if we could clear multiple ranges in a single request.
There is no extra field in Clear request message; it is invoked on a state key that contains a single range. That's why we only do one range at a time.
// (1) a sort key is added to or removed from pendingAdds, or | ||
// (2) a new value is added to an existing sort key | ||
ArrayList<PrefetchableIterable<TimestampedValue<T>>> pendingAddsInRange = new ArrayList<>(); | ||
for (Entry<Instant, Collection<T>> kv : |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not super-clear on how Iterables work in Java. Does this actually copy the elements, or is it just wrapping the values in pendingAdds
in an Iterable? I'm curious about the statements in the comments saying "the values are kept" so that we can make modifications to the ordered list and not mess up existing Iterables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this actually copy the elements, or is it just wrapping the values in pendingAdds in an Iterable?
Iterables in java work like iterables in other languages. Basically, we use it to traverse an existing collection of elements without making a copy of them.
I'm curious about the statements in the comments saying "the values are kept" so that we can make modifications to the ordered list and not mess up existing Iterables.
The PrefetchableIterable
interface in Beam is specifically for iterables that support prefetching. The function PrefetchableIterables.limit()
is applied on an iterable, resulting in a new iterable, which can be used to traverse its backend collection until the number of elements reaches a preset limit.
Here is the tricky part.
- Note that
pendingAddsInRange
is a list of iterables. For each of these iterables, when an iteration is performed at any time after its creation, the result will be truncated to the last element we see at the time we initialize the iterable withPrefetchableIterables.limit()
. In other words, appending a new element to the end of an existing key ofpendingAdds
would not affect the outcome above, because the iterable inpendingAddsInRange
is designed to be truncated to the size when it is created. - When we add a new key to
pendingAdds
, it won't change the outcome of traversing an pre-existingpendingAddsInRange
either, because the iterable of the new key is not included inpendingAddsInRange
. - When we delete an existing key from
pendingAdds
, we remove the key-value mapping, but not touch its value (i.e. the collection of elements). Therefore, the iterables in pre-existingpendingAddsInRange
will still work as the backend collection is accessible. Only when no iterables reference these collections will they be garbage collected.
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/OrderedListUserStateTest.java
Show resolved
Hide resolved
} | ||
|
||
@Test | ||
public void testAdd() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to have some tests where we're partway through an Iterable when the element gets added? In all of the ones I see here we completely process each Iterable before we do any further operations on it (by calling Iterables.toArray
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! I will add some more tests to cover this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we also need to add something here:
beam/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto
Line 478 in fb7ba65
// TODO(https://github.com/apache/beam/issues/20486): Add protocol to support OrderedListState |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or are we going to piggy-back on multimap for this? (If so we should delete the TODO.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I think we will need a separate one for orderedlist because although orderedlist and multimap have similar semantics, the code paths are totally different.
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @kennknowles for label java. Available commands:
|
Reminder, please take a look at this pr: @kennknowles |
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @Abacn for label java. Available commands:
|
waiting on author |
Reminder, please take a look at this pr: @Abacn |
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @kennknowles for label java. Available commands:
|
stop reviewer notifications |
Stopping reviewer notifications for this pull request: requested by reviewer |
First attempt to implement ordered list state for FnApi. Notice that caching has not been implemented yet.
Reference:
https://docs.google.com/document/d/1U77sAvE6Iy9XsVruRYHxPdFji7nqS6HPi1XU8fhyrxs/
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.