Skip to content

Commit

Permalink
Fix topic closed normally but still call closeFencedTopicForcefully. (
Browse files Browse the repository at this point in the history
apache#15196) (apache#15202)

Co-authored-by: druidliu <druidliu@tencent.com>

Fixes apache#15196.

If broker having conf `topicFencingTimeoutSeconds`>0, a topic is trigged closed and closed normally, `closeFencedTopicForcefully` should not be called.

Cancel fenced topic monitoring task if topic close normally, which cancel running `closeFencedTopicForcefully`.

- [ ] Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:
  - Add `org.apache.pulsar.broker.service.PersistentTopicTest#testTopicCloseFencingTimeout`

*If `yes` was chosen, please highlight the changes*

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API: (yes / no)
  - The schema: (yes / no / don't know)
  - The default values of configurations: (yes / no)
  - The wire protocol: (yes / no)
  - The rest endpoints: (yes / no)
  - The admin cli options: (yes / no)
  - Anything that affects deployment: (yes / no / don't know)

Check the box below or label this PR directly.

Need to update docs?

- [ ] `doc-required`
- [x] `no-need-doc`
- [ ] `doc`
- [ ] `doc-added`

(cherry picked from commit e4a8de1)
  • Loading branch information
dragonls authored and lhotari committed Apr 22, 2022
1 parent 6981c6b commit a5f5798
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 4 deletions.
Expand Up @@ -1203,6 +1203,7 @@ public void closeComplete(Object ctx) {

brokerService.pulsar().getTopicPoliciesService().clean(TopicName.get(topic));
log.info("[{}] Topic closed", topic);
cancelFencedTopicMonitoringTask();
closeFuture.complete(null);
})
.exceptionally(ex -> {
Expand Down Expand Up @@ -2800,6 +2801,13 @@ public boolean isSystemTopic() {
return false;
}

private synchronized void cancelFencedTopicMonitoringTask() {
ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask;
if (monitoringTask != null && !monitoringTask.isDone()) {
monitoringTask.cancel(false);
}
}

private synchronized void fence() {
isFenced = true;
ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask;
Expand All @@ -2814,10 +2822,7 @@ private synchronized void fence() {

private synchronized void unfence() {
isFenced = false;
ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask;
if (monitoringTask != null && !monitoringTask.isDone()) {
monitoringTask.cancel(false);
}
cancelFencedTopicMonitoringTask();
}

private void closeFencedTopicForcefully() {
Expand Down
Expand Up @@ -2125,6 +2125,28 @@ public void testTopicFencingTimeout() throws Exception {
assertTrue((boolean) isClosingOrDeletingField.get(topic));
}

@Test
public void testTopicCloseFencingTimeout() throws Exception {
pulsar.getConfiguration().setTopicFencingTimeoutSeconds(10);
Method fence = PersistentTopic.class.getDeclaredMethod("fence");
fence.setAccessible(true);
Field fencedTopicMonitoringTaskField = PersistentTopic.class.getDeclaredField("fencedTopicMonitoringTask");
fencedTopicMonitoringTaskField.setAccessible(true);

// create topic
PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get();

// fence topic to init fencedTopicMonitoringTask
fence.invoke(topic);

// close topic
topic.close().get();
assertFalse(brokerService.getTopicReference(successTopicName).isPresent());
ScheduledFuture<?> fencedTopicMonitoringTask = (ScheduledFuture<?>) fencedTopicMonitoringTaskField.get(topic);
assertTrue(fencedTopicMonitoringTask.isDone());
assertTrue(fencedTopicMonitoringTask.isCancelled());
}

@Test
public void testGetDurableSubscription() throws Exception {
ManagedLedger mockLedger = mock(ManagedLedger.class);
Expand Down

0 comments on commit a5f5798

Please sign in to comment.