Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: a general commit back framework for source #16736

Open
tabVersion opened this issue May 13, 2024 · 6 comments
Open

feat: a general commit back framework for source #16736

tabVersion opened this issue May 13, 2024 · 6 comments
Assignees
Milestone

Comments

@tabVersion
Copy link
Contributor

tabVersion commented May 13, 2024

Hi all, I am revisiting the issue for implementing a general ack mechanism in source, thanks @xxchan for mentioning this.

My major questions are:

  1. for a mq with ack, it has a timeout for the staging queue. If a message fails to ack within the timeout, it will go back to the deliver queue and be delivered again to the clients. Then we can get a message twice in some cases, for example, an mq's timeout is 30min and at that time rw is experiencing heavy load, and it takes 35min to finish one epoch. Then all data in the epoch will be delivered twice (fails to achieve exactly-once semantic, and makes future traffic heavier). Is it desired or we gonna trigger a rollback in advance proactively for the promise?
  2. unlike a cdc-spec one, a general framework gonna retains all messages' ack-id. Considering a 30min sliding time window, we may have to persist millions of records. Shall we introducing a log store for it?

BTW, this situation is also true for persisted pulsar source with 0 retention.

@yuhao-su can you share more about the case? IIRC, a non-persistent pulsar topic cannot seek to spec position and I think it is expected.

Originally posted by @tabVersion in #15464 (comment)

@github-actions github-actions bot added this to the release-1.10 milestone May 13, 2024
@yuhao-su
Copy link
Contributor

non-persistent pulsar does not equal to 0 retention.

0 retention does persist data. But it will not be kept after acked by all subscriptions. In our case, we should ack a message only after it was committed.

More detailed in https://pulsar.apache.org/docs/next/cookbooks-retention-expiry/

@xxchan
Copy link
Member

xxchan commented May 14, 2024

Thanks for raising the questions. First I've already had a WIP PR #16733 if you want to preview the impl details in my mind.

For question 1:

fails to achieve exactly-once semantic

I think @fuyufjh answered this:
"For most cases, exactly-once is less important than multi-threading. Duplications only happen in some corner cases such as scale out/in, and many users are okay with it." https://risingwave-labs.slack.com/archives/C0606NNR74P/p1715048506103469?thread_ts=1714493567.060129&cid=C0606NNR74P

makes future traffic heavier

Do you mean heavier at the RW side or the MQ side? If former, I think it's handled by backpressure an no worries. Not sure about the affect of long ack timeout on MQ's side though.

we gonna trigger a rollback in advance proactively for the promise

What does this mean?

For quersion 2:

We don't need to persist the ack-ids. They are just kept in memory. I think it's OK to lose them and let upstream resend the unacked messages.

@fuyufjh
Copy link
Contributor

fuyufjh commented May 14, 2024

for example, an mq's timeout is 30min and at that time rw is experiencing heavy load, and it takes 35min to finish one epoch. Then all data in the epoch will be delivered twice (fails to achieve exactly-once semantic, and makes future traffic heavier)

Got your concern. Indeed, we must be careful when using a source with ack timeout, otherwise, if it timeouts before our epoch commits, the message will be sent again in the next epoch, causing a live lock effectively.

For the case of Google Pub/Sub, they provided modifyAckDeadline API to delay the deadline. I suppose most Pub/Sub systems (that doesn't rely on offsets) must have similar functions to provide exactly-once delivery.

unlike a cdc-spec one, a general framework gonna retains all messages' ack-id. Considering a 30min sliding time window, we may have to persist millions of records. Shall we introducing a log store for it?

Log store will introduce huge complexity. Please just ignore this extreme cases for now, a 30-min barrier will cause way more problems than this.

@xxchan
Copy link
Member

xxchan commented May 14, 2024

BTW, Pulsar support log semantics and accumulative ack, so we just need to commit latest offset, like CDC.

Also, it seems currently we don't ack for Pulsar at all, so Pulsar will not reclaim the consumed data, like the problem we met for Pulsar.

By default, Pulsar stores all unacknowledged messages forever.

It will clean data unless

  1. non-persistent topic or 2
  2. TTL is set.

Note that TTL (automatically ack) is different from retention.

To further clarify, actually there are 2 issues related with ack:

  1. correctness (at least once), or don't let upstream clean uncommitted data. This is related with Pubsub & Pulsar 0 retention
  2. allow upstream to clean data after commit. This is related with CDC & Pulsar no TTL

P.S. Pubsub's retention duration is not related with ack.
It has configs: message_retention_duration and retain_acknowledged_messages. Comparatively, Pulsar has retention and TTL.

@xxchan
Copy link
Member

xxchan commented May 14, 2024

For reference some facts about ack ddl:

Flink recommends:

For this reason it’s recommended to have a (much) lower checkpoint interval than acknowledgement deadline.

https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pubsub/#at-least-once-guarantee

Pubsub Acknowledgement deadline options:

Default value = 10 seconds
Minimum value = 10 seconds
Maximum value = 600 seconds

https://cloud.google.com/pubsub/docs/subscription-properties#ack_deadline

@tabVersion
Copy link
Contributor Author

Thanks for answering the questions raised above.

we gonna trigger a rollback in advance proactively for the promise

What does this mean?

Take the example above, a 30min mq-side timeout, and now the barrier latency is 35min. RW received all data but the mq thought the messages got lost, so RW will receive the same data one more time in the future.
For pubsub/mqtt, we did not give the exactly-once promise so it should be fine. But for pulsar, I am not quite sure about how to explain the degradation.

and there can be a chance of losing data when the barrier latency remains at a high level. Messages failing multiple times can go into the dead letter queue and will not be delivered again. (it can be configured, just FYI, https://cloud.google.com/pubsub/docs/handling-failures)

How dead-letter topics work with Pub/Sub
A dead-letter topic is a subscription property, not a topic property. This means that you set a dead-letter topic when you create a subscription, not when you create a topic.

If you create a dead-letter topic, you can set the following subscription properties:

Maximum number of delivery attempts: A numeric value that signifies the number of delivery attempts that Pub/Sub makes for a specific message. If the subscriber client cannot acknowledge the message within the configured number of delivery attempts, the message is forwarded to a dead-letter topic.

Default value = 5
Maximum value = 100
Minimum value = 5
Project with the dead-letter topic: If the dead-letter topic is in a different project from the subscription, you must specify the project with the dead-letter topic. Set the dead-letter topic to a different topic from the topic to which the subscription is attached.

How maximum delivery attempts are calculated

Pub/Sub only counts delivery attempts when a dead-letter topic is configured correctly and includes the correct IAM permissions.

The maximum number of delivery attempts is approximate because Pub/Sub forwards undeliverable messages on a best-effort basis.

The tracked number of delivery attempts for a message may also reset to zero, especially for a pull subscription with inactive subscribers. As a result, the messages might be delivered to the subscriber client more times than the configured maximum number of delivery attempts.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants