-
Notifications
You must be signed in to change notification settings - Fork 453
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
storage: wire up new reclock implementation #27058
Conversation
482e57a
to
5f2462c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generally happy with this (look at the minus red!!), a few nits!
Its unclear what the ci issues are...resumption frontier issues?
@@ -35,7 +35,6 @@ pub(crate) struct GeneralSourceMetricDefs { | |||
// Source metrics | |||
pub(crate) capability: UIntGaugeVec, | |||
pub(crate) resume_upper: IntGaugeVec, | |||
pub(crate) inmemory_remap_bindings: UIntGaugeVec, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we still have some metrics we can keep track of right?
deferred_source_bindings and the remap trace lenths seem nice?
}; | ||
((output, result), from_time, diff) | ||
}) | ||
.capture_into(PusherCapture(reclock_pusher)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
soooo much nicer!
let resume_upper = if PartialOrder::less_equal(upper, &as_of) { | ||
Antichain::from_elem(Timestamp::minimum()) | ||
} else { | ||
let idx = remap_updates.partition_point(|(_, t, _)| !upper.less_than(t)); | ||
source_upper.clear(); | ||
source_upper.update_iter( | ||
remap_updates[0..idx] | ||
.iter() | ||
.map(|(from_time, _, diff)| (from_time.clone(), *diff)), | ||
); | ||
source_upper.frontier().to_owned() | ||
}; | ||
source_resume_uppers.insert(*id, resume_upper); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know its basic now, but can this be abstracted into a function in the mz_storage::reclock
module?
5f2462c
to
be82541
Compare
be82541
to
15ac2c7
Compare
} else { | ||
let idx = remap_updates.partition_point(|(_, t, _)| !upper.less_equal(t)); | ||
source_upper.clear(); | ||
source_upper.update_iter( | ||
remap_updates[0..idx] | ||
.iter() | ||
.map(|(from_time, _, diff)| (from_time.clone(), *diff)), | ||
); | ||
source_upper.frontier().to_owned() | ||
}; | ||
source_resume_uppers.insert(*id, resume_upper); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we've lost the tests and comments associated with this logic due to removing the source_upper_at_frontier
method -- can we add back some sort of explanation comment for this? Similar to the previous one?
/// The conversion has the property that all messages that would be reclocked to times beyond
/// the provided `IntoTime` frontier will be beyond the returned `FromTime` frontier. This can
/// be used to compute a safe starting point to resume producing an `IntoTime` collection at a
/// particular frontier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, done!
15ac2c7
to
1e8e3b2
Compare
1e8e3b2
to
10abb1c
Compare
Motivation
With the new reclock implementation now merged, this PR integrates it into the ingestion pipeline and deletes the old one
Tips for reviewer
Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way), then it is tagged with aT-proto
label.