Skip to content

Commit

Permalink
Fix topic closed normally but still call closeFencedTopicForcefully. (
Browse files Browse the repository at this point in the history
apache#15196) (apache#15202)

Co-authored-by: druidliu <druidliu@tencent.com>

Fixes apache#15196.

### Motivation

If broker having conf `topicFencingTimeoutSeconds`>0, a topic is trigged closed and closed normally, `closeFencedTopicForcefully` should not be called.

### Modifications

Cancel fenced topic monitoring task if topic close normally, which cancel running `closeFencedTopicForcefully`.

### Verifying this change

- [ ] Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:
  - Add `org.apache.pulsar.broker.service.PersistentTopicTest#testTopicCloseFencingTimeout`

### Does this pull request potentially affect one of the following parts:

*If `yes` was chosen, please highlight the changes*

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API: (yes / no)
  - The schema: (yes / no / don't know)
  - The default values of configurations: (yes / no)
  - The wire protocol: (yes / no)
  - The rest endpoints: (yes / no)
  - The admin cli options: (yes / no)
  - Anything that affects deployment: (yes / no / don't know)

### Documentation

Check the box below or label this PR directly.

Need to update docs?

- [ ] `doc-required`
- [x] `no-need-doc`
- [ ] `doc`
- [ ] `doc-added`

(cherry picked from commit e4a8de1)
  • Loading branch information
dragonls authored and lhotari committed Apr 22, 2022
1 parent 897cd38 commit 2a84c93
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 4 deletions.
Expand Up @@ -1271,6 +1271,7 @@ public void closeComplete(Object ctx) {

unregisterTopicPolicyListener();
log.info("[{}] Topic closed", topic);
cancelFencedTopicMonitoringTask();
closeFuture.complete(null);
})
.exceptionally(ex -> {
Expand Down Expand Up @@ -2934,6 +2935,13 @@ public boolean isPersistent() {
return true;
}

private synchronized void cancelFencedTopicMonitoringTask() {
ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask;
if (monitoringTask != null && !monitoringTask.isDone()) {
monitoringTask.cancel(false);
}
}

private synchronized void fence() {
isFenced = true;
ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask;
Expand All @@ -2948,10 +2956,7 @@ private synchronized void fence() {

private synchronized void unfence() {
isFenced = false;
ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask;
if (monitoringTask != null && !monitoringTask.isDone()) {
monitoringTask.cancel(false);
}
cancelFencedTopicMonitoringTask();
}

private void closeFencedTopicForcefully() {
Expand Down
Expand Up @@ -2159,6 +2159,28 @@ public void testTopicFencingTimeout() throws Exception {
assertTrue((boolean) isClosingOrDeletingField.get(topic));
}

@Test
public void testTopicCloseFencingTimeout() throws Exception {
pulsar.getConfiguration().setTopicFencingTimeoutSeconds(10);
Method fence = PersistentTopic.class.getDeclaredMethod("fence");
fence.setAccessible(true);
Field fencedTopicMonitoringTaskField = PersistentTopic.class.getDeclaredField("fencedTopicMonitoringTask");
fencedTopicMonitoringTaskField.setAccessible(true);

// create topic
PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get();

// fence topic to init fencedTopicMonitoringTask
fence.invoke(topic);

// close topic
topic.close().get();
assertFalse(brokerService.getTopicReference(successTopicName).isPresent());
ScheduledFuture<?> fencedTopicMonitoringTask = (ScheduledFuture<?>) fencedTopicMonitoringTaskField.get(topic);
assertTrue(fencedTopicMonitoringTask.isDone());
assertTrue(fencedTopicMonitoringTask.isCancelled());
}

@Test
public void testGetDurableSubscription() throws Exception {
ManagedLedger mockLedger = mock(ManagedLedger.class);
Expand Down

0 comments on commit 2a84c93

Please sign in to comment.