Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FnAPI proto changes for ordered list state. #31092

Merged
merged 3 commits into from
May 1, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,29 @@ message StateKey {
bytes map_key = 5;
}

// Represents a request for an ordered list of values associated with a
// specified user key and window for a PTransform. See
// https://s.apache.org/beam-fn-state-api-and-bundle-processing for further
// details.
//
// The response data stream will be a concatenation of all entries of sort key
// and V's associated with the specified user key and window.
// See https://s.apache.org/beam-fn-api-send-and-receive-data for further
// details.
message OrderedListUserState {
// (Required) The id of the PTransform containing user state.
string transform_id = 1;
// (Required) The id of the user state.
string user_state_id = 2;
// (Required) The window encoded in a nested context.
bytes window = 3;
// (Required) The key of the currently executing element encoded in a
// nested context.
bytes key = 4;
// (Required) The sort range encoded in a nested context.
OrderedListRange range = 5;
}

// (Required) One of the following state keys must be set.
oneof type {
Runner runner = 1;
Expand All @@ -1031,6 +1054,7 @@ message StateKey {
MultimapKeysValuesSideInput multimap_keys_values_side_input = 8;
MultimapKeysUserState multimap_keys_user_state = 6;
MultimapUserState multimap_user_state = 7;
OrderedListUserState ordered_list_user_state = 9;
}
}

Expand Down Expand Up @@ -1063,6 +1087,10 @@ message StateAppendRequest {
// Represents a part of a logical byte stream. Elements within
// the logical byte stream are encoded in the nested context and
// multiple append requests are concatenated together.
// For OrderedListState, elements of TimeStampedValue<T> should be encoded
// with TimestampedValueCoder.of(LengthPrefixCoder.of(Coder<T>)), so that
// the request handler knows how to decode timestamps from the data without
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we refer to this as "sort key" above. Maybe just mention that sort keys should be encoded as timestamps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like TimestampedValueCoder is not well defined, and +1 to references to keys, not timestamps.

Should we use KV<VarInt, LP<value>>? Or is it worth defining a bigendian one? How do we want to treat negatives?

Copy link
Contributor

@shunping shunping Apr 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like TimestampedValueCoder is not well defined

TimestampedValueCoder is defined as an inner class in TimeStampedValue

public static class TimestampedValueCoder<T> extends StructuredCoder<TimestampedValue<T>> {

. It encodes the value first and then the timestamp.

Should we use KV<VarInt, LP>? Or is it worth defining a bigendian one? How do we want to treat negatives?

I use TimeStampedValue in the proto and implementation instead of a KV<,> because it is originally used in the OrderedListState interface

extends GroupingState<TimestampedValue<T>, Iterable<TimestampedValue<T>>> {
.
Another benefit of using TimeStampedValue is that we can reuse all the existing prefetched mechanisms without reimplementing them.

Here is a long discussion thread of why we choose to use timestamp as the sort key in the original design doc. As Reuven pointed out, timestamp is the most common sort key and we can easily converted it from and to an int64.

orderedList.add(TimestampedValue.of(element, Instant.of(sequence number)));

This is valid for both positives and negatives (which will be represented as timestamps prior to epoch 1970-01-01T00:00:00Z).

Copy link
Contributor Author

@robertwb robertwb Apr 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TimestampedValueCoder is not a well defined portable/cross-language Beam coder. We can continue to use TimestampedValue in the Java code (with a Coder that goes to and from this encoding) if it makes life easier, but IMHO we should be using a better defined coder. KV<VarInt64, LP<value>> seems like the most natural option here. (Another bonus: we don't have to dither about endianness or millis vs micros, even if it "usually" holds a timestamp.)

Copy link
Contributor

@shunping shunping May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds a bit not counter-intuitive, but the coder of TimestampedValue is not equivalent to that of KV<VarInt64, LP<value>>. Particularly, TimestampedValueCoder encodes/decodes the value first and then timestamp.

valueCoder.encode(windowedElem.getValue(), outStream);
InstantCoder.of().encode(windowedElem.getTimestamp(), outStream);

But KV<VarInt64, LP<value>> does that in the opposite order. In order to maintain the compatibility of these two coders and make the implementation simpler, how about we explicitly define the use of KV<LP<value>, VarInt64>> coder instead?

@robertwb

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given these are sort keys, let's keep them in the key slot. (In other words, I'd rather optimized for cleanliness of the spec over matching an existing internal implementation detail.)

It does not matter that the TimestampedValue$TimestampedValueCoder does not agree here, we would just use a different Coder<TimestampedValue>. (We need to explicitly ensure that the coder for T is LP anyway, so we shouldn't be inferring coders here.)

Copy link
Contributor

@shunping shunping May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds reasonable. I will go ahead to change the implementation I have right now to match this change once it is pushed. Thanks!

// decoding the value bytes. Returned data will be encoded the same way.
bytes data = 1;
}

Expand All @@ -1075,6 +1103,12 @@ message StateClearRequest {}
// A response to clear state.
message StateClearResponse {}

// A message describes a sort key range [start, end).
message OrderedListRange {
int64 start = 1;
int64 end = 2;
}

/*
* Logging API
*
Expand Down