You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
A key changing the partition (right now, specifically Kafka partitions) is considered not supported by UPSERT sources, for good reason: there is no defined order between partitions, so the order that updates occur at is undefined.
Previous behavior
Before #24663, if you DID create an UPSERT sources where a key's partition changed, we had the following behavior:
If the updates occur within the same mz time (i.e., they are reclocked the same) then choose the one with the higher partition.
If the updates occur in different mz times, choose the later one.
This is a (afaiui) a non-definite Collection; depending on how the exact same upstream data is reclocked, we could end up with collections the accumulates differently. However, once the data is reclocked, things work out fine, and we are able to resume an UPSERT ingestion correctly
Current behavior
After #24663, we now always choose the update from the larger partition, with one caveat: If we are resuming an ingestion, updates after the resumption frontier are chosen over anything
This is definite in the steady-state, but resumption can cause arbitrary orders, depending on the exact moment of resumption.
Committing to a decision
I think we should commit the current behavior; while it is not strictly correct, for kafka sources (of kafka-like sources) where a key's partition CAN change, but only ever go increase, it is the ideal behavior.
The text was updated successfully, but these errors were encountered:
This is definite in the steady-state, but resumption can cause arbitrary orders, depending on the exact moment of resumption.
Noting it here too. This is a bug, there should be no discrepancies in the produced collection no matter how many restart points you insert. We need to store the mz timestamp in the upsert state and use that too when comparing updates
@petrosagg for keys whose partitions are only-increasing, we ARE definite; handling keys whose partitions can go down and handling resumptions requires that we hold the partition of each message in upsert state, which can't be done (currently) unless the partition is part of the output relation, because during hydration we only have the output shard
for keys whose partitions are only-increasing, we ARE definite;
Indeed. I didn't claim there aren't cases we we are definite. We're also definite when we process an empty topic /s But in seriousness, given that there is a way to fix this, we should.
handling resumptions requires that we hold the partition of each message in upsert state,
I don't think so. The timestamp associated with each key in upsert state should be the tuple (mz_timestamp, partition, offset). This is because the (partition, offset) pair is only needed to disambiguate between updates that happen at the same mz timestamp, and so on rehydration we can initialize the upsert state with (resume_ts, 0, 0) without problems because we will only ever process data for timestamps greater than resume_ts.
A key changing the partition (right now, specifically Kafka partitions) is considered not supported by
UPSERT
sources, for good reason: there is no defined order between partitions, so the order that updates occur at is undefined.Previous behavior
Before #24663, if you DID create an
UPSERT
sources where a key's partition changed, we had the following behavior:This is a (afaiui) a non-definite Collection; depending on how the exact same upstream data is reclocked, we could end up with collections the accumulates differently. However, once the data is reclocked, things work out fine, and we are able to resume an
UPSERT
ingestion correctlyCurrent behavior
After #24663, we now always choose the update from the larger partition, with one caveat: If we are resuming an ingestion, updates after the resumption frontier are chosen over anything
This is definite in the steady-state, but resumption can cause arbitrary orders, depending on the exact moment of resumption.
Committing to a decision
I think we should commit the current behavior; while it is not strictly correct, for kafka sources (of kafka-like sources) where a key's partition CAN change, but only ever go increase, it is the ideal behavior.
The text was updated successfully, but these errors were encountered: