Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: rework pubsub source to support parallel read and at-least-once #16733

Merged
merged 16 commits into from
May 21, 2024

Conversation

xxchan
Copy link
Member

@xxchan xxchan commented May 13, 2024

Signed-off-by: xxchan xxchan22f@gmail.comI hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

close #16572

Behavior changes

  • Ack on checkpoint for Pub/Sub source.
    • (message) change the offset column from ts to ack_id
    • (split) don't update start_offset (for seek)
    • (reader) don't ack on read
    • refactor WaitCheckpointWorker to support update_task_on_chunk: store all ack_ids before next checkpoint
  • Add (back) pubsub.parallelism WITH option. (Previously called split_count, but I like parallelism better, because "split" is not a user-facing concept feat: pre-release pubsub #14531)
    • future work: use default parallelism when not specified. Like Flink SourceEnumerator enumContext.currentParallelism(). We don't have such a context in source manager yet.
  • Removed the restriction of retain_acked_messages.

Note: although there are huge changes, I think all are none breaking.

Explanations

Previously, we ack immediately and rely on retain_acked_messages + seek to timestamp to replay messages. This is awkward, and not like designed usage.

Now we ack on checkpoint and don't seek at all. It's stateless on RisingWave's side. This corresponds more the queueing semantics (delete after ack).

Testing

  • case 1: Set checkpoint_frequency to very large (and set visibility_mode TO 'all'). Create Table and check load successfully. Kill and restart RisingWave, check the data can still be loaded without missing data (probably duplicated).
  • case 2: Use normal config. Create Table and check load successfully without duplicated data.
  • (manual) test on emulator
  • (manual) test on cloud

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

Add pubsub.parallelism WITH option for PubSub source, which specifies the number of parallel consumers to run for the subscription. If not specified, the parallelism will be one.

Signed-off-by: xxchan <xxchan22f@gmail.com>
- (message) change the offset column from ts to ack_id
- (split) don't update start_offset
- (reader) don't ack on read
- refactor WaitCheckpointWorker to support update_task_on_chunk

Signed-off-by: xxchan <xxchan22f@gmail.com>
Signed-off-by: xxchan <xxchan22f@gmail.com>
Signed-off-by: xxchan <xxchan22f@gmail.com>
Signed-off-by: xxchan <xxchan22f@gmail.com>
Signed-off-by: xxchan <xxchan22f@gmail.com>
Signed-off-by: xxchan <xxchan22f@gmail.com>
Signed-off-by: xxchan <xxchan22f@gmail.com>
Copy link
Contributor

@tabVersion tabVersion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the pr generally LGTM
please update in the pr's note that pubsub does not rely on the SplitImpl's offset to start from a position. It prefers letting the broker side deal with offset, which is different from other connectors.

btw, idk if reusing offset in the SplitImpl is a good practice, it may be confusing if users want to query from the state table.

src/connector/src/source/google_pubsub/mod.rs Outdated Show resolved Hide resolved
src/connector/src/source/google_pubsub/mod.rs Show resolved Hide resolved
@xxchan
Copy link
Member Author

xxchan commented May 14, 2024

please update in the pr's note that pubsub does not rely on the SplitImpl's offset to start from a position. It prefers letting the broker side deal with offset, which is different from other connectors.

Sure. BTW, I've mentioned that it will be stateless from the RisingWave side.

btw, idk if reusing offset in the SplitImpl is a good practice, it may be confusing if users want to query from the state table.

I don't understand your point. What do you mean by reusing offset? Besides, actually SplitImpl doesn't maintain offset any more.

Anyway, I don't think any average user care about query from state table.

Comment on lines 31 to 36
/// pubsub reader.
pub(crate) start_offset: Option<String>,

/// `stop_offset` is a numeric timestamp.
/// When not `None`, the `PubsubReader` stops reading messages when the `offset` property of
/// the `SourceMessage` is greater than or equal to the `stop_offset`.
pub(crate) stop_offset: Option<String>,
#[serde(rename = "stop_offset")]
#[serde(skip_serializing)]
pub(crate) __deprecated_stop_offset: Option<String>,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess start_offset can also be deprecated.

stop_offset is actually never used.

Signed-off-by: xxchan <xxchan22f@gmail.com>
Signed-off-by: xxchan <xxchan22f@gmail.com>
Signed-off-by: xxchan <xxchan22f@gmail.com>
Signed-off-by: xxchan <xxchan22f@gmail.com>
@xxchan xxchan requested a review from tabVersion May 17, 2024 02:53
Copy link
Contributor

@StrikeW StrikeW left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM to WaitCheckpointWorker

…limit: 524288 bytes.

Signed-off-by: xxchan <xxchan22f@gmail.com>
@xxchan
Copy link
Member Author

xxchan commented May 20, 2024

simple manual load testing

image

@xxchan xxchan enabled auto-merge May 20, 2024 16:48
@xxchan xxchan added this pull request to the merge queue May 21, 2024
Merged via the queue into main with commit 5dc3861 May 21, 2024
27 of 28 checks passed
@xxchan xxchan deleted the xxchan/pubsub-2 branch May 21, 2024 04:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

allow >1 paralellism for pubsub source to increase ingestion performance
3 participants