New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FnAPI proto changes for ordered list state. #31092
Conversation
R: @acrites |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
@@ -1063,6 +1087,10 @@ message StateAppendRequest { | |||
// Represents a part of a logical byte stream. Elements within | |||
// the logical byte stream are encoded in the nested context and | |||
// multiple append requests are concatenated together. | |||
// For OrderedListState, elements of TimeStampedValue<T> should be encoded | |||
// with TimestampedValueCoder.of(LengthPrefixCoder.of(Coder<T>)), so that | |||
// the request handler knows how to decode timestamps from the data without |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we refer to this as "sort key" above. Maybe just mention that sort keys should be encoded as timestamps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like TimestampedValueCoder is not well defined, and +1 to references to keys, not timestamps.
Should we use KV<VarInt, LP<value>>
? Or is it worth defining a bigendian one? How do we want to treat negatives?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like TimestampedValueCoder is not well defined
TimestampedValueCoder is defined as an inner class in TimeStampedValue
beam/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
Line 88 in 413af12
public static class TimestampedValueCoder<T> extends StructuredCoder<TimestampedValue<T>> { |
. It encodes the value first and then the timestamp.
Should we use KV<VarInt, LP>? Or is it worth defining a bigendian one? How do we want to treat negatives?
I use TimeStampedValue in the proto and implementation instead of a KV<,> because it is originally used in the OrderedListState interface
beam/sdks/java/core/src/main/java/org/apache/beam/sdk/state/OrderedListState.java
Line 29 in f0d0605
extends GroupingState<TimestampedValue<T>, Iterable<TimestampedValue<T>>> { |
Another benefit of using TimeStampedValue is that we can reuse all the existing prefetched mechanisms without reimplementing them.
Here is a long discussion thread of why we choose to use timestamp as the sort key in the original design doc. As Reuven pointed out, timestamp is the most common sort key and we can easily converted it from and to an int64.
orderedList.add(TimestampedValue.of(element, Instant.of(sequence number)));
This is valid for both positives and negatives (which will be represented as timestamps prior to epoch 1970-01-01T00:00:00Z).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TimestampedValueCoder is not a well defined portable/cross-language Beam coder. We can continue to use TimestampedValue in the Java code (with a Coder that goes to and from this encoding) if it makes life easier, but IMHO we should be using a better defined coder. KV<VarInt64, LP<value>>
seems like the most natural option here. (Another bonus: we don't have to dither about endianness or millis vs micros, even if it "usually" holds a timestamp.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds a bit not counter-intuitive, but the coder of TimestampedValue
is not equivalent to that of KV<VarInt64, LP<value>>
. Particularly, TimestampedValueCoder
encodes/decodes the value first and then timestamp.
beam/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
Lines 110 to 111 in 413af12
valueCoder.encode(windowedElem.getValue(), outStream); | |
InstantCoder.of().encode(windowedElem.getTimestamp(), outStream); |
But KV<VarInt64, LP<value>>
does that in the opposite order. In order to maintain the compatibility of these two coders and make the implementation simpler, how about we explicitly define the use of KV<LP<value>, VarInt64>>
coder instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given these are sort keys, let's keep them in the key slot. (In other words, I'd rather optimized for cleanliness of the spec over matching an existing internal implementation detail.)
It does not matter that the TimestampedValue$TimestampedValueCoder
does not agree here, we would just use a different Coder<TimestampedValue>
. (We need to explicitly ensure that the coder for T is LP anyway, so we shouldn't be inferring coders here.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds reasonable. I will go ahead to change the implementation I have right now to match this change once it is pushed. Thanks!
Run GoPortable PreCommit |
Run Java PreCommit |
1 similar comment
Run Java PreCommit |
This is the proto changes only for #30317 to unblock runner-side implementations.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.