Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller] Support dangling topic check and deletion in topic cleanup service. #974

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

haoxu07
Copy link
Contributor

@haoxu07 haoxu07 commented May 1, 2024

Summary, imperative, start upper case, don't end with a period

Add dangling topic checking functionality, TopicCleanupService will compare topics listed from kafka and topics from other type of pub sub client, if there is topic could not be found by kafka client but can be founded by other type of pub sub client, topic will be deleted by such cases:

  1. Real-time topic, if store is not hybrid, that topic will be deleted.
  2. Version topic, if store does not contain that version, the topic will be deleted.
  3. If there is no store available for that topic, the topic will be deleted.

Add a new config to enable this feature: CONTROLLER_DANGLING_TOPIC_CLEAN_UP_INTERVAL_SECOND. If its value is >0 and admin client class is not kafka, the feature will be turned on. Otherwise, it will be disabled.

How was this PR tested?

CI, integration test will be in internal repo with other type of pub sub system.

Does this PR introduce any user-facing changes?

  • [*] No. You can skip the rest of this section.
  • [] Yes. Make sure to explain your proposed changes and call out the behavior change.

@haoxu07 haoxu07 force-pushed the danglingTopicDeletion branch 2 times, most recently from a53fd79 to ae70258 Compare May 1, 2024 21:01
@haoxu07 haoxu07 marked this pull request as ready for review May 1, 2024 21:06
@haoxu07 haoxu07 requested a review from sushantmane May 1, 2024 21:06
Comment on lines +467 to +471
int versionNum = Version.parseVersionFromKafkaTopicName(pubSubTopic.getName());
if (!store.containsVersion(versionNum)) {
LOGGER.info("Will remove dangling version topic {}", pubSubTopic);
topicsTobeCleanup.add(pubSubTopic);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We submitted a change recently to preserve VT data when DIV fatal issue happens. Can we double check that it won't be considered as dandling topics thus got deleted here?

@@ -74,13 +82,17 @@ public class TopicCleanupService extends AbstractVeniceService {
private boolean isRTTopicDeletionBlocked = false;
private boolean isLeaderControllerOfControllerCluster = false;
private long refreshQueueCycle = Time.MS_PER_MINUTE;
private Optional<PubSubAdminAdapter> apacheKafkaAdminAdapter;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any benefit of using Optional<>? If there's not much value, we can use null if it's absent.

&& System.currentTimeMillis() - danglingTopicCleanupIntervalMs > recentDanglingTopicCleanupTime) {
List<PubSubTopic> pubSubTopics = collectDanglingTopics(topicsWithRetention);
if (!pubSubTopics.isEmpty()) {
LOGGER.info("Find topic discrepancy: topics not in kafka but in xinfra {}", pubSubTopics);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should not expose the proprietary pubsub system name in OSS. And IMO it's more intuitive to say

LOGGER.warn("Find topic discrepancy: {} is not present among all pubsub systems"

String storeName = pubSubTopic.getStoreName();
String clusterDiscovered = admin.discoverCluster(storeName).getFirst();
Store store = admin.getStore(clusterDiscovered, storeName);
LOGGER.info("Find topic discrepancy case: {}", pubSubTopic);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest for these bad but not the end-of-world case, we use WARN. It's not good to have discrepancy among different pubsub ystems.

@@ -217,10 +248,12 @@ public void testCleanupVeniceTopics() throws ExecutionException {
verify(topicManager, never()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName2, "_rt"));
// Delete should be blocked by remote VT
verify(topicManager, never()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName3, "_rt"));
verify(topicCleanupServiceStats, atLeastOnce()).recordDeletableTopicsCount(5);
verify(topicCleanupServiceStats, atLeastOnce()).recordDeletableTopicsCount(6);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the existing test case has covered all 3 cases?

1. Real-time topic, if store is not hybrid, that topic will be deleted.
2. Version topic, if store does not contain that version, the topic will be deleted.
3. If there is no store available for that topic, the topic will be deleted.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants