Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: clustered stream recovery, ensure PurgeEx is executed instead of full purge #4197

Closed
wants to merge 2 commits into from

Conversation

MauriceVanVeen
Copy link
Contributor

@MauriceVanVeen MauriceVanVeen commented May 28, 2023

  • Link to issue, e.g. Resolves #NNN
  • Documentation added (if applicable)
  • Tests added
  • Branch rebased on top of current main (git pull --rebase origin main)
  • Changes squashed to a single commit (described here)
  • Build is green in Travis CI
  • You have certified that the contribution is your original work and that you license the work to the project under the Apache 2 license

Resolves #4196

Changes proposed in this pull request:

I believe this is caused by the following code:

// Ignore if we are recovering and we have already processed.
if isRecovering {
	if mset.State().FirstSeq <= sp.LastSeq {
		// Make sure all messages from the purge are gone.
		mset.store.Compact(sp.LastSeq + 1)
	}
	continue
}

Which was introduced in this PR: #1886

This was fine at the time, since at that point only full stream purges were supported. So this shortcut would not purge if was identified that it happened already.

However, since the introduction of more advanced purges in: #2296, purges can happen on: the whole stream, specific subjects, sequences, amount of messages to keep, etc.

But, this shortcut would always do a full stream purge on recovering, even if we only did a "partial" subject-based purge request.

This would result in any JetStream (/KV/OBJ) with partial purges to be fully purged upon restart of a node. Leading into inconsistent states across nodes, which can be observed when switching leaders.

This PR removes this shortcut, and just relies on the default purge-handling behaviour, added a test as well.

@MauriceVanVeen MauriceVanVeen requested a review from a team as a code owner May 28, 2023 21:49
@@ -155,7 +155,6 @@ type consumerAssignment struct {
type streamPurge struct {
Client *ClientInfo `json:"client,omitempty"`
Stream string `json:"stream"`
LastSeq uint64 `json:"last_seq"`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was only used in the below isRecovering + Compact block, so have cleaned it up.

@MauriceVanVeen
Copy link
Contributor Author

The TestJetStreamClusterInterestBasedStreamAndConsumerSnapshots test failed in CI, but passes locally for me. Could this be a flapper, maybe?

@derekcollison
Copy link
Member

If we have a purge then add some new messages, if we try to process the purge again on a restart it will remove more than it should.

I understand the fix you are proposing, but I think we still need to possibly protect against this condition which is what I was trying to do IIRC.

@MauriceVanVeen
Copy link
Contributor Author

Yeah, I have been thinking about this as well. This does fix all messages being purged, but there will still be some inconsistencies in certain cases..

For example when replaying a purge with keep set to some value. The next time it runs the purge, if new messages have been published in the mean time, it will keep different messages than the first time. So I'd expect it to start introducing some differences.

@derekcollison
Copy link
Member

Maybe keep the code that remembers LastSeq and the conditional on isRecovering() and instead of calling compact we call the extended version of purge with the last sequence.

@MauriceVanVeen
Copy link
Contributor Author

Was just thinking that as well, will try it out! 👍

@MauriceVanVeen
Copy link
Contributor Author

MauriceVanVeen commented May 29, 2023

Have been tinkering quite a bit, and found myself almost going into a refactor.. so taking a small step back to confirm I'm going into the right direction.

See also the commit I made just now: c40a6a3

In a nutshell:

  • have re-introduced the isRecovering conditional and now calling PurgeEx up to the sp.LastSeq if sp.Request != nil, otherwise just using the logic with mset.store.Compact(sp.LastSeq + 1) as was done before
  • added a "maxSequence" to the PurgeEx which will be set to sp.LastSeq in the case of a recovery, but otherwise is always mset.lastSeq (this ensures the purge only happens up to and including the maxSequence, but leaves any messages after alone in the case of a recovery)
  • have added some tests for the added maxSequence

This works in the cases of purging on sequence and keep, which should properly replay and be limited to the maxSequence / sp.LastSeq.

However.. where it becomes a bit harder (and where I stopped going into a refactor to ask for feedback) is how to handle purging on subject in combination with keep.
FilteredState(sseq, subj) is called, which returns (among others) the FirstSeq and LastSeq for the specific subject.
This currently doesn't take into account that we could be replaying for recovery and want to limit the LastSeq to the "max sequence" as captured by sp.LastSeq. So there is no support for that currently.

What could be next steps here?

derekcollison added a commit that referenced this pull request Jun 4, 2023
… being replayed. (#4212)

This is an extension to the excellent work by @MauriceVanVeen and his
original PR #4197 to fully resolve for all use cases.

Signed-off-by: Derek Collison <derek@nats.io>

Resolves #4196
@derekcollison
Copy link
Member

Awesome work! Thanks, fixed in #4212

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Clustered stream recovery, full stream purge is executed instead of PurgeEx
2 participants