forked from spring-projects/spring-kafka
-
Notifications
You must be signed in to change notification settings - Fork 3
/
ListenerContainerPauseService.java
148 lines (129 loc) · 5.36 KB
/
ListenerContainerPauseService.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.kafka.listener;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Optional;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.TopicPartition;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;
/**
* Service for pausing and resuming of {@link MessageListenerContainer}.
*
* @author Jan Marincek
* @author Gary Russell
* @since 2.9
*/
public class ListenerContainerPauseService {
private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(ListenerContainerPauseService.class));
@Nullable
private final ListenerContainerRegistry registry;
private final TaskScheduler scheduler;
/**
* Create an instance with the provided registry and scheduler.
* @param registry the registry or null.
* @param scheduler the scheduler.
*/
public ListenerContainerPauseService(@Nullable ListenerContainerRegistry registry, TaskScheduler scheduler) {
Assert.notNull(scheduler, "'scheduler' cannot be null");
this.registry = registry;
this.scheduler = scheduler;
}
/**
* Pause the listener by given id.
* Checks if the listener has already been requested to pause.
* Sets executor schedule for resuming the same listener after pauseDuration.
* @param listenerId the id of the listener
* @param pauseDuration duration between pause() and resume() actions
*/
public void pause(String listenerId, Duration pauseDuration) {
Assert.notNull(this.registry, "Pause by id is only supported when a registry is provided");
getListenerContainer(listenerId)
.ifPresent(messageListenerContainer -> pause(messageListenerContainer, pauseDuration));
}
/**
* Pause the listener by given container instance. Checks if the listener has already
* been requested to pause. Sets executor schedule for resuming the same listener
* after pauseDuration.
* @param messageListenerContainer the listener container
* @param pauseDuration duration between pause() and resume() actions
*/
public void pause(MessageListenerContainer messageListenerContainer, Duration pauseDuration) {
if (messageListenerContainer.isPauseRequested()) {
LOGGER.debug(() -> "Container " + messageListenerContainer + " already has pause requested");
}
else {
Instant resumeAt = Instant.now().plusMillis(pauseDuration.toMillis());
LOGGER.debug(() -> "Pausing container " + messageListenerContainer + ", resume scheduled for "
+ resumeAt.atZone(ZoneId.systemDefault()).toLocalDateTime());
messageListenerContainer.pause();
this.scheduler.schedule(() -> {
LOGGER.debug(() -> "Pausing container " + messageListenerContainer);
resume(messageListenerContainer);
}, resumeAt);
}
}
/**
* Pause consumption from a given partition for the duration.
* @param messageListenerContainer the container.
* @param partition the partition.
* @param pauseDuration the duration.
*/
public void pausePartition(MessageListenerContainer messageListenerContainer, TopicPartition partition,
Duration pauseDuration) {
Instant resumeAt = Instant.now().plusMillis(pauseDuration.toMillis());
LOGGER.debug(() -> "Pausing container: " + messageListenerContainer + " partition: " + partition
+ ", resume scheduled for "
+ resumeAt.atZone(ZoneId.systemDefault()).toLocalDateTime());
messageListenerContainer.pausePartition(partition);
this.scheduler.schedule(() -> {
LOGGER.debug(() -> "Resuming container: " + messageListenerContainer + " partition: " + partition);
messageListenerContainer.resumePartition(partition);
}, resumeAt);
}
/**
* Resume the listener container by given id.
* @param listenerId the id of the listener
*/
public void resume(String listenerId) {
Assert.notNull(this.registry, "Resume by id is only supported when a registry is provided");
getListenerContainer(listenerId).ifPresent(this::resume);
}
/**
* Resume the listener container.
* @param messageListenerContainer the listener container
*/
public void resume(MessageListenerContainer messageListenerContainer) {
if (messageListenerContainer.isPauseRequested()) {
LOGGER.debug(() -> "Resuming container " + messageListenerContainer);
messageListenerContainer.resume();
}
else {
LOGGER.debug(() -> "Container " + messageListenerContainer + " was not paused");
}
}
private Optional<MessageListenerContainer> getListenerContainer(String listenerId) {
MessageListenerContainer messageListenerContainer = this.registry.getListenerContainer(listenerId);
if (messageListenerContainer == null) {
LOGGER.warn(() -> "MessageListenerContainer " + listenerId + " does not exists");
}
return Optional.ofNullable(messageListenerContainer);
}
}