New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[controller] Support dangling topic check and deletion in topic cleanup service. #974
base: main
Are you sure you want to change the base?
Conversation
a53fd79
to
ae70258
Compare
ae70258
to
3ef9f3a
Compare
int versionNum = Version.parseVersionFromKafkaTopicName(pubSubTopic.getName()); | ||
if (!store.containsVersion(versionNum)) { | ||
LOGGER.info("Will remove dangling version topic {}", pubSubTopic); | ||
topicsTobeCleanup.add(pubSubTopic); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We submitted a change recently to preserve VT data when DIV fatal issue happens. Can we double check that it won't be considered as dandling topics thus got deleted here?
@@ -74,13 +82,17 @@ public class TopicCleanupService extends AbstractVeniceService { | |||
private boolean isRTTopicDeletionBlocked = false; | |||
private boolean isLeaderControllerOfControllerCluster = false; | |||
private long refreshQueueCycle = Time.MS_PER_MINUTE; | |||
private Optional<PubSubAdminAdapter> apacheKafkaAdminAdapter; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any benefit of using Optional<>
? If there's not much value, we can use null
if it's absent.
&& System.currentTimeMillis() - danglingTopicCleanupIntervalMs > recentDanglingTopicCleanupTime) { | ||
List<PubSubTopic> pubSubTopics = collectDanglingTopics(topicsWithRetention); | ||
if (!pubSubTopics.isEmpty()) { | ||
LOGGER.info("Find topic discrepancy: topics not in kafka but in xinfra {}", pubSubTopics); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should not expose the proprietary pubsub system name in OSS. And IMO it's more intuitive to say
LOGGER.warn("Find topic discrepancy: {} is not present among all pubsub systems"
String storeName = pubSubTopic.getStoreName(); | ||
String clusterDiscovered = admin.discoverCluster(storeName).getFirst(); | ||
Store store = admin.getStore(clusterDiscovered, storeName); | ||
LOGGER.info("Find topic discrepancy case: {}", pubSubTopic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest for these bad but not the end-of-world case, we use WARN. It's not good to have discrepancy among different pubsub ystems.
@@ -217,10 +248,12 @@ public void testCleanupVeniceTopics() throws ExecutionException { | |||
verify(topicManager, never()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName2, "_rt")); | |||
// Delete should be blocked by remote VT | |||
verify(topicManager, never()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName3, "_rt")); | |||
verify(topicCleanupServiceStats, atLeastOnce()).recordDeletableTopicsCount(5); | |||
verify(topicCleanupServiceStats, atLeastOnce()).recordDeletableTopicsCount(6); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the existing test case has covered all 3 cases?
1. Real-time topic, if store is not hybrid, that topic will be deleted.
2. Version topic, if store does not contain that version, the topic will be deleted.
3. If there is no store available for that topic, the topic will be deleted.
Summary, imperative, start upper case, don't end with a period
Add dangling topic checking functionality,
TopicCleanupService
will compare topics listed from kafka and topics from other type of pub sub client, if there is topic could not be found by kafka client but can be founded by other type of pub sub client, topic will be deleted by such cases:Add a new config to enable this feature:
CONTROLLER_DANGLING_TOPIC_CLEAN_UP_INTERVAL_SECOND
. If its value is >0 and admin client class is not kafka, the feature will be turned on. Otherwise, it will be disabled.How was this PR tested?
CI, integration test will be in internal repo with other type of pub sub system.
Does this PR introduce any user-facing changes?