Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: wire up new reclock implementation #27058

Merged
merged 1 commit into from
Jun 3, 2024

Conversation

petrosagg
Copy link
Contributor

@petrosagg petrosagg commented May 13, 2024

Motivation

With the new reclock implementation now merged, this PR integrates it into the ingestion pipeline and deletes the old one

Tips for reviewer

Checklist

@petrosagg petrosagg requested review from a team and guswynn May 13, 2024 13:17
Copy link
Contributor

@guswynn guswynn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally happy with this (look at the minus red!!), a few nits!

Its unclear what the ci issues are...resumption frontier issues?

@@ -35,7 +35,6 @@ pub(crate) struct GeneralSourceMetricDefs {
// Source metrics
pub(crate) capability: UIntGaugeVec,
pub(crate) resume_upper: IntGaugeVec,
pub(crate) inmemory_remap_bindings: UIntGaugeVec,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we still have some metrics we can keep track of right?

deferred_source_bindings and the remap trace lenths seem nice?

};
((output, result), from_time, diff)
})
.capture_into(PusherCapture(reclock_pusher));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

soooo much nicer!

Comment on lines 175 to 191
let resume_upper = if PartialOrder::less_equal(upper, &as_of) {
Antichain::from_elem(Timestamp::minimum())
} else {
let idx = remap_updates.partition_point(|(_, t, _)| !upper.less_than(t));
source_upper.clear();
source_upper.update_iter(
remap_updates[0..idx]
.iter()
.map(|(from_time, _, diff)| (from_time.clone(), *diff)),
);
source_upper.frontier().to_owned()
};
source_resume_uppers.insert(*id, resume_upper);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know its basic now, but can this be abstracted into a function in the mz_storage::reclock module?

@petrosagg petrosagg marked this pull request as draft May 19, 2024 18:05
@petrosagg petrosagg marked this pull request as ready for review June 3, 2024 16:49
@petrosagg petrosagg requested a review from rjobanp June 3, 2024 16:49
Comment on lines +177 to +191
} else {
let idx = remap_updates.partition_point(|(_, t, _)| !upper.less_equal(t));
source_upper.clear();
source_upper.update_iter(
remap_updates[0..idx]
.iter()
.map(|(from_time, _, diff)| (from_time.clone(), *diff)),
);
source_upper.frontier().to_owned()
};
source_resume_uppers.insert(*id, resume_upper);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we've lost the tests and comments associated with this logic due to removing the source_upper_at_frontier method -- can we add back some sort of explanation comment for this? Similar to the previous one?

    /// The conversion has the property that all messages that would be reclocked to times beyond
    /// the provided `IntoTime` frontier will be beyond the returned `FromTime` frontier. This can
    /// be used to compute a safe starting point to resume producing an `IntoTime` collection at a
    /// particular frontier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, done!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants