Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: commit to some ordering between partitions in UPSERT #26965

Open
guswynn opened this issue May 7, 2024 · 3 comments · May be fixed by #27092
Open

storage: commit to some ordering between partitions in UPSERT #26965

guswynn opened this issue May 7, 2024 · 3 comments · May be fixed by #27092
Labels
A-storage Area: storage

Comments

@guswynn
Copy link
Contributor

guswynn commented May 7, 2024

A key changing the partition (right now, specifically Kafka partitions) is considered not supported by UPSERT sources, for good reason: there is no defined order between partitions, so the order that updates occur at is undefined.

Previous behavior

Before #24663, if you DID create an UPSERT sources where a key's partition changed, we had the following behavior:

  • If the updates occur within the same mz time (i.e., they are reclocked the same) then choose the one with the higher partition.
  • If the updates occur in different mz times, choose the later one.

This is a (afaiui) a non-definite Collection; depending on how the exact same upstream data is reclocked, we could end up with collections the accumulates differently. However, once the data is reclocked, things work out fine, and we are able to resume an UPSERT ingestion correctly

Current behavior

After #24663, we now always choose the update from the larger partition, with one caveat: If we are resuming an ingestion, updates after the resumption frontier are chosen over anything

This is definite in the steady-state, but resumption can cause arbitrary orders, depending on the exact moment of resumption.

Committing to a decision

I think we should commit the current behavior; while it is not strictly correct, for kafka sources (of kafka-like sources) where a key's partition CAN change, but only ever go increase, it is the ideal behavior.

@petrosagg
Copy link
Contributor

This is definite in the steady-state, but resumption can cause arbitrary orders, depending on the exact moment of resumption.

Noting it here too. This is a bug, there should be no discrepancies in the produced collection no matter how many restart points you insert. We need to store the mz timestamp in the upsert state and use that too when comparing updates

@guswynn
Copy link
Contributor Author

guswynn commented May 20, 2024

@petrosagg for keys whose partitions are only-increasing, we ARE definite; handling keys whose partitions can go down and handling resumptions requires that we hold the partition of each message in upsert state, which can't be done (currently) unless the partition is part of the output relation, because during hydration we only have the output shard

@petrosagg
Copy link
Contributor

for keys whose partitions are only-increasing, we ARE definite;

Indeed. I didn't claim there aren't cases we we are definite. We're also definite when we process an empty topic /s But in seriousness, given that there is a way to fix this, we should.

handling resumptions requires that we hold the partition of each message in upsert state,

I don't think so. The timestamp associated with each key in upsert state should be the tuple (mz_timestamp, partition, offset). This is because the (partition, offset) pair is only needed to disambiguate between updates that happen at the same mz timestamp, and so on rehydration we can initialize the upsert state with (resume_ts, 0, 0) without problems because we will only ever process data for timestamps greater than resume_ts.

@bosconi bosconi added the A-storage Area: storage label Jun 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-storage Area: storage
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants