Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FnAPI proto changes for ordered list state. #31092

Merged
merged 3 commits into from May 1, 2024

Conversation

robertwb
Copy link
Contributor

This is the proto changes only for #30317 to unblock runner-side implementations.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@github-actions github-actions bot added the model label Apr 24, 2024
@robertwb
Copy link
Contributor Author

R: @acrites

Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

@@ -1063,6 +1087,10 @@ message StateAppendRequest {
// Represents a part of a logical byte stream. Elements within
// the logical byte stream are encoded in the nested context and
// multiple append requests are concatenated together.
// For OrderedListState, elements of TimeStampedValue<T> should be encoded
// with TimestampedValueCoder.of(LengthPrefixCoder.of(Coder<T>)), so that
// the request handler knows how to decode timestamps from the data without
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we refer to this as "sort key" above. Maybe just mention that sort keys should be encoded as timestamps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like TimestampedValueCoder is not well defined, and +1 to references to keys, not timestamps.

Should we use KV<VarInt, LP<value>>? Or is it worth defining a bigendian one? How do we want to treat negatives?

Copy link
Contributor

@shunping shunping Apr 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like TimestampedValueCoder is not well defined

TimestampedValueCoder is defined as an inner class in TimeStampedValue

public static class TimestampedValueCoder<T> extends StructuredCoder<TimestampedValue<T>> {

. It encodes the value first and then the timestamp.

Should we use KV<VarInt, LP>? Or is it worth defining a bigendian one? How do we want to treat negatives?

I use TimeStampedValue in the proto and implementation instead of a KV<,> because it is originally used in the OrderedListState interface

extends GroupingState<TimestampedValue<T>, Iterable<TimestampedValue<T>>> {
.
Another benefit of using TimeStampedValue is that we can reuse all the existing prefetched mechanisms without reimplementing them.

Here is a long discussion thread of why we choose to use timestamp as the sort key in the original design doc. As Reuven pointed out, timestamp is the most common sort key and we can easily converted it from and to an int64.

orderedList.add(TimestampedValue.of(element, Instant.of(sequence number)));

This is valid for both positives and negatives (which will be represented as timestamps prior to epoch 1970-01-01T00:00:00Z).

Copy link
Contributor Author

@robertwb robertwb Apr 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TimestampedValueCoder is not a well defined portable/cross-language Beam coder. We can continue to use TimestampedValue in the Java code (with a Coder that goes to and from this encoding) if it makes life easier, but IMHO we should be using a better defined coder. KV<VarInt64, LP<value>> seems like the most natural option here. (Another bonus: we don't have to dither about endianness or millis vs micros, even if it "usually" holds a timestamp.)

Copy link
Contributor

@shunping shunping May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds a bit not counter-intuitive, but the coder of TimestampedValue is not equivalent to that of KV<VarInt64, LP<value>>. Particularly, TimestampedValueCoder encodes/decodes the value first and then timestamp.

valueCoder.encode(windowedElem.getValue(), outStream);
InstantCoder.of().encode(windowedElem.getTimestamp(), outStream);

But KV<VarInt64, LP<value>> does that in the opposite order. In order to maintain the compatibility of these two coders and make the implementation simpler, how about we explicitly define the use of KV<LP<value>, VarInt64>> coder instead?

@robertwb

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given these are sort keys, let's keep them in the key slot. (In other words, I'd rather optimized for cleanliness of the spec over matching an existing internal implementation detail.)

It does not matter that the TimestampedValue$TimestampedValueCoder does not agree here, we would just use a different Coder<TimestampedValue>. (We need to explicitly ensure that the coder for T is LP anyway, so we shouldn't be inferring coders here.)

Copy link
Contributor

@shunping shunping May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds reasonable. I will go ahead to change the implementation I have right now to match this change once it is pushed. Thanks!

@shunping
Copy link
Contributor

Run GoPortable PreCommit

@shunping
Copy link
Contributor

Run Java PreCommit

1 similar comment
@shunping
Copy link
Contributor

Run Java PreCommit

@shunping
Copy link
Contributor

shunping commented Apr 30, 2024

All seven failed tests are related to macos, which is an issue related to using the latest macos artifact. @damccorm already addressed this in #31115 and #31123 by pinning an older version of macos artifact. Therefore, I think this PR should be safe to merge.

@robertwb robertwb merged commit 670e56f into apache:master May 1, 2024
83 of 87 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants