-
Notifications
You must be signed in to change notification settings - Fork 46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix] Exchange data cleanup improvement #698
base: master
Are you sure you want to change the base?
Conversation
2f33a4a
to
a54babd
Compare
queueName, position.getLedgerId(), position.getEntryId())); | ||
} | ||
} | ||
FutureUtil.waitForAll(futures).thenRun(this::scheduleExchangeClearTask); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When (router.getExchange().markDeleteAsync) throws an exception, the task will stop. Replace (thenRun) with (whenComplete)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
Fixed, please take a look again, thanks.
Motivation
The PR #619 changed the queue cursor to be durable, this will cause the exchange topic can't be cleaned up completely because one queue only handles part of the exchange data, and only these index positions could be acknowledged.
At checkpoint1, get the route mark delete position of exchange1 and exchange2, then get the last confirm position of the queue. The exchange1 route mark delete pos is 1:2, the exchange2 mark delete pos is 2:2, and the LAC of the queue may be equal with 3:2 or greater than 3:2 (because the routing message process is still running), when the mark delete pos of the queue reach the LAC at checkpoint1, it indicates that the index message 3:0, 3:1 and 3:2 were all acked, so we can ack the exchange1 with pos 1:2, and ack the exchange2 with pos 2:2.
Modifications
Add exchange data cleanup scheduled task for each queue.
Remove useless unAckMessages in
AmqpConsumer
.Verifying this change
Add verification to make sure exchange data could be cleaned up completely.
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation
Check the box below.
Need to update docs?
doc-required
(If you need help on updating docs, create a doc issue)
no-need-doc
(Please explain why)
doc
(If this PR contains doc changes)