Skip to content

Commit

Permalink
Use TopicPartition.topic for metrics (#235)
Browse files Browse the repository at this point in the history
* Use TopicPartition.topic for metrics

* Use variable and rename SubscriptionScope.topic to originTopic

* Use topicPartition.topic() for ThreadUtilizationMetrics
  • Loading branch information
tadashiya committed May 17, 2024
1 parent d792a13 commit 91fde46
Show file tree
Hide file tree
Showing 8 changed files with 18 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ private void updateState(SubscriptionStateListener.State newState) {

private Set<String> subscribeTopics() {
return Stream.concat(
Stream.of(Optional.of(scope.topic()), scope.retryTopic())
Stream.of(Optional.of(scope.originTopic()), scope.retryTopic())
.filter(Optional::isPresent)
.map(Optional::get),
scope.shapingTopics().stream()).collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ private QuotaApplier quotaApplier(SubscriptionScope scope) {
(properties, new ByteArraySerializer(), new ByteArraySerializer()));
return new QuotaApplierImpl(
producerSupplier.apply(producerConfig),
perKeyQuotaConfig.callbackSupplier().apply(scope.topic()),
perKeyQuotaConfig.callbackSupplier().apply(scope.originTopic()),
scope);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.linecorp.decaton.processor.runtime.Property;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.TopicPartition;

/**
* This class is responsible for following portions:
Expand Down Expand Up @@ -57,10 +58,11 @@ protected AbstractSubPartitions(PartitionScope scope, Processors<?> processors)
shutdownTimeoutMillis = scope.props().get(
ProcessorProperties.CONFIG_PROCESSOR_THREADS_TERMINATION_TIMEOUT_MS);
rateLimiter = new DynamicRateLimiter(rateProperty(scope));
TopicPartition topicPartition = scope.topicPartition();
Metrics metrics = Metrics.withTags(
"subscription", scope.subscriptionId(),
"topic", scope.topic(),
"partition", String.valueOf(scope.topicPartition().partition()));
"topic", topicPartition.topic(),
"partition", String.valueOf(topicPartition.partition()));
taskMetrics = metrics.new TaskMetrics();
schedulerMetrics = metrics.new SchedulerMetrics();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private static SubPartitions createSubPartitions(PartitionScope scope, Processor
scope.props().get(ProcessorProperties.CONFIG_DEFERRED_COMPLETE_TIMEOUT_MS),
metricsCtor.new CommitControlMetrics());
commitControl = new OutOfOrderCommitControl(scope.topicPartition(), capacity, offsetStateReaper);
if (scope.perKeyQuotaConfig().isPresent() && scope.topic().equals(scope.topicPartition().topic())) {
if (scope.perKeyQuotaConfig().isPresent() && scope.originTopic().equals(scope.topicPartition().topic())) {
perKeyQuotaManager = PerKeyQuotaManager.create(scope);
} else {
perKeyQuotaManager = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class PartitionScope extends SubscriptionScope {
private final TopicPartition topicPartition;

PartitionScope(SubscriptionScope parent, TopicPartition topicPartition) {
super(parent.subscriptionId(), parent.topic(), parent.subPartitionRuntime(),
super(parent.subscriptionId(), parent.originTopic(), parent.subPartitionRuntime(),
parent.retryConfig(), parent.perKeyQuotaConfig(), parent.props(),
parent.tracingProvider(), parent.maxPollRecords(), parent.subPartitionerSupplier());
this.topicPartition = topicPartition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
@Getter
public class SubscriptionScope {
private final String subscriptionId;
private final String topic;
private final String originTopic;
private final SubPartitionRuntime subPartitionRuntime;
private final Optional<RetryConfig> retryConfig;
private final Optional<PerKeyQuotaConfig> perKeyQuotaConfig;
Expand All @@ -48,11 +48,11 @@ public class SubscriptionScope {
private final SubPartitionerSupplier subPartitionerSupplier;

public Optional<String> retryTopic() {
return retryConfig.map(conf -> conf.retryTopicOrDefault(topic));
return retryConfig.map(conf -> conf.retryTopicOrDefault(originTopic));
}

public Set<String> shapingTopics() {
return perKeyQuotaConfig.map(conf -> conf.shapingTopicsSupplier().apply(topic))
return perKeyQuotaConfig.map(conf -> conf.shapingTopicsSupplier().apply(originTopic))
.orElse(Collections.emptySet());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.TopicPartition;

@Slf4j
public class ThreadPoolSubPartitions extends AbstractSubPartitions {
Expand Down Expand Up @@ -67,10 +68,11 @@ public void addTask(TaskRequest request) {
SubPartition subPartition = subPartitions[threadId];
if (subPartition == null) {
ThreadScope threadScope = new ThreadScope(scope, threadId);
TopicPartition topicPartition = threadScope.topicPartition();
ThreadUtilizationMetrics metrics =
Metrics.withTags("subscription", threadScope.subscriptionId(),
"topic", threadScope.topic(),
"partition", String.valueOf(threadScope.topicPartition().partition()),
"topic", topicPartition.topic(),
"partition", String.valueOf(topicPartition.partition()),
"subpartition", String.valueOf(threadId))
.new ThreadUtilizationMetrics();
ExecutorService executor = createExecutorService(threadScope, metrics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private static ProcessorSubscription subscription(Consumer<byte[], byte[]> consu
DecatonProcessor<String> processor) {
SubscriptionScope scope = scope(tp.topic(), 0L);
ProcessorsBuilder<String> builder =
ProcessorsBuilder.consuming(scope.topic(),
ProcessorsBuilder.consuming(scope.originTopic(),
(byte[] bytes) -> new DecatonTask<>(
TaskMetadata.builder().build(),
new String(bytes), bytes));
Expand Down Expand Up @@ -276,7 +276,7 @@ public synchronized ConsumerRecords<byte[], byte[]> poll(Duration timeout) {
scope,
consumer,
NoopQuotaApplier.INSTANCE,
ProcessorsBuilder.consuming(scope.topic(),
ProcessorsBuilder.consuming(scope.originTopic(),
(byte[] bytes) -> new DecatonTask<>(
TaskMetadata.builder().build(), "dummy", bytes))
.thenProcess(processor)
Expand Down Expand Up @@ -350,7 +350,7 @@ public synchronized void commitSync(Map<TopicPartition, OffsetAndMetadata> offse
scope,
consumer,
NoopQuotaApplier.INSTANCE,
ProcessorsBuilder.consuming(scope.topic(),
ProcessorsBuilder.consuming(scope.originTopic(),
(byte[] bytes) -> new DecatonTask<>(
TaskMetadata.builder().build(), "dummy", bytes))
.thenProcess((ctx, task) -> {
Expand Down

0 comments on commit 91fde46

Please sign in to comment.