-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FnAPI proto changes for ordered list state. #31092
Merged
Merged
Changes from 2 commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we refer to this as "sort key" above. Maybe just mention that sort keys should be encoded as timestamps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like TimestampedValueCoder is not well defined, and +1 to references to keys, not timestamps.
Should we use
KV<VarInt, LP<value>>
? Or is it worth defining a bigendian one? How do we want to treat negatives?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TimestampedValueCoder is defined as an inner class in
TimeStampedValue
beam/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
Line 88 in 413af12
. It encodes the value first and then the timestamp.
I use TimeStampedValue in the proto and implementation instead of a KV<,> because it is originally used in the OrderedListState interface
beam/sdks/java/core/src/main/java/org/apache/beam/sdk/state/OrderedListState.java
Line 29 in f0d0605
Another benefit of using TimeStampedValue is that we can reuse all the existing prefetched mechanisms without reimplementing them.
Here is a long discussion thread of why we choose to use timestamp as the sort key in the original design doc. As Reuven pointed out, timestamp is the most common sort key and we can easily converted it from and to an int64.
orderedList.add(TimestampedValue.of(element, Instant.of(sequence number)));
This is valid for both positives and negatives (which will be represented as timestamps prior to epoch 1970-01-01T00:00:00Z).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TimestampedValueCoder is not a well defined portable/cross-language Beam coder. We can continue to use TimestampedValue in the Java code (with a Coder that goes to and from this encoding) if it makes life easier, but IMHO we should be using a better defined coder.
KV<VarInt64, LP<value>>
seems like the most natural option here. (Another bonus: we don't have to dither about endianness or millis vs micros, even if it "usually" holds a timestamp.)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds a bit not counter-intuitive, but the coder of
TimestampedValue
is not equivalent to that ofKV<VarInt64, LP<value>>
. Particularly,TimestampedValueCoder
encodes/decodes the value first and then timestamp.beam/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
Lines 110 to 111 in 413af12
But
KV<VarInt64, LP<value>>
does that in the opposite order. In order to maintain the compatibility of these two coders and make the implementation simpler, how about we explicitly define the use ofKV<LP<value>, VarInt64>>
coder instead?@robertwb
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given these are sort keys, let's keep them in the key slot. (In other words, I'd rather optimized for cleanliness of the spec over matching an existing internal implementation detail.)
It does not matter that the
TimestampedValue$TimestampedValueCoder
does not agree here, we would just use a differentCoder<TimestampedValue>
. (We need to explicitly ensure that the coder for T is LP anyway, so we shouldn't be inferring coders here.)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds reasonable. I will go ahead to change the implementation I have right now to match this change once it is pushed. Thanks!