Skip to content

Commit

Permalink
Add Observation API
Browse files Browse the repository at this point in the history
  • Loading branch information
marcingrzejszczak authored and lcmarvin committed Apr 16, 2022
1 parent 0582485 commit 74c3b15
Show file tree
Hide file tree
Showing 22 changed files with 670 additions and 35 deletions.
1 change: 1 addition & 0 deletions pom.xml
Expand Up @@ -55,6 +55,7 @@
<spring-retry.version>1.3.1</spring-retry.version>
<spring-integration.version>6.0.0-SNAPSHOT</spring-integration.version>
<micrometer.version>2.0.0-SNAPSHOT</micrometer.version>
<micrometer-tracing.version>1.0.0-SNAPSHOT</micrometer-tracing.version>
<jackson.version>2.13.1</jackson.version>

<!-- optional production dependencies -->
Expand Down
6 changes: 6 additions & 0 deletions spring-batch-core/pom.xml
Expand Up @@ -256,6 +256,12 @@
<version>${jakarta.inject-api.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-test</artifactId>
<version>${micrometer.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Expand Up @@ -18,10 +18,12 @@

import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;

import io.micrometer.api.instrument.LongTaskTimer;
import io.micrometer.api.instrument.Tag;
import io.micrometer.api.instrument.Timer;
import io.micrometer.core.instrument.LongTaskTimer;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.observation.Observation;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.BatchStatus;
Expand Down Expand Up @@ -62,7 +64,7 @@
* @author Mahmoud Ben Hassine
*/
public abstract class AbstractJob implements Job, StepLocator, BeanNameAware,
InitializingBean {
InitializingBean, Observation.TagsProviderAware<BatchJobTagsProvider> {

protected static final Log logger = LogFactory.getLog(AbstractJob.class);

Expand All @@ -80,6 +82,8 @@ public abstract class AbstractJob implements Job, StepLocator, BeanNameAware,

private StepHandler stepHandler;

private BatchJobTagsProvider tagsProvider = new DefaultBatchJobTagsProvider();

/**
* Default constructor.
*/
Expand Down Expand Up @@ -304,8 +308,11 @@ public final void execute(JobExecution execution) {
LongTaskTimer longTaskTimer = BatchMetrics.createLongTaskTimer("job.active", "Active jobs",
Tag.of("name", execution.getJobInstance().getJobName()));
LongTaskTimer.Sample longTaskTimerSample = longTaskTimer.start();
Timer.Sample timerSample = BatchMetrics.createTimerSample();
try {
Observation observation = BatchMetrics.createObservation(BatchJobObservation.BATCH_JOB_OBSERVATION.getName(), new BatchJobContext(execution))
.contextualName(execution.getJobInstance().getJobName())
.tagsProvider(this.tagsProvider)
.start();
try (Observation.Scope scope = observation.openScope()) {

jobParametersValidator.validate(execution.getJobParameters());

Expand Down Expand Up @@ -361,11 +368,7 @@ public final void execute(JobExecution execution) {
ExitStatus.NOOP.addExitDescription("All steps already completed or no steps configured for this job.");
execution.setExitStatus(exitStatus.and(newExitStatus));
}

timerSample.stop(BatchMetrics.createTimer("job", "Job duration",
Tag.of("name", execution.getJobInstance().getJobName()),
Tag.of("status", execution.getExitStatus().getExitCode())
));
stopObservation(execution, observation);
longTaskTimerSample.stop();
execution.setEndTime(new Date());

Expand All @@ -384,6 +387,19 @@ public final void execute(JobExecution execution) {

}

private void stopObservation(JobExecution execution, Observation observation) {
List<Throwable> throwables = execution.getFailureExceptions();
if (!throwables.isEmpty()) {
observation.error(mergedThrowables(throwables));
}
observation.stop();
}

private IllegalStateException mergedThrowables(List<Throwable> throwables) {
return new IllegalStateException(
throwables.stream().map(Throwable::toString).collect(Collectors.joining("\n")));
}

/**
* Convenience method for subclasses to delegate the handling of a specific
* step in the context of the current {@link JobExecution}. Clients of this
Expand Down Expand Up @@ -443,6 +459,11 @@ private void updateStatus(JobExecution jobExecution, BatchStatus status) {
jobRepository.update(jobExecution);
}

@Override
public void setTagsProvider(BatchJobTagsProvider tagsProvider) {
this.tagsProvider = tagsProvider;
}

@Override
public String toString() {
return ClassUtils.getShortName(getClass()) + ": [name=" + name + "]";
Expand Down
@@ -0,0 +1,35 @@
/*
* Copyright 2013-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.batch.core.job;

import io.micrometer.core.instrument.observation.Observation;

import org.springframework.batch.core.JobExecution;

public class BatchJobContext extends Observation.Context {

private final JobExecution jobExecution;

public BatchJobContext(JobExecution jobExecution) {
this.jobExecution = jobExecution;
}

public JobExecution getJobExecution() {
return jobExecution;
}

}
@@ -0,0 +1,101 @@
/*
* Copyright 2013-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.batch.core.job;

import io.micrometer.core.instrument.docs.DocumentedObservation;
import io.micrometer.core.instrument.docs.TagKey;

enum BatchJobObservation implements DocumentedObservation {

/**
* Observation created around a Job execution.
*/
BATCH_JOB_OBSERVATION {
@Override
public String getName() {
return "spring.batch.job";
}

@Override
public String getContextualName() {
return "%s";
}

@Override
public TagKey[] getLowCardinalityTagKeys() {
return JobLowCardinalityTags.values();
}

@Override
public TagKey[] getHighCardinalityTagKeys() {
return JobHighCardinalityTags.values();
}

@Override
public String getPrefix() {
return "spring.batch";
}
};

enum JobLowCardinalityTags implements TagKey {

/**
* Name of the Spring Batch job.
*/
JOB_NAME {
@Override
public String getKey() {
return "spring.batch.job.name";
}
},

/**
* Job status.
*/
JOB_STATUS {
@Override
public String getKey() {
return "spring.batch.job.status";
}
}

}

enum JobHighCardinalityTags implements TagKey {

/**
* ID of the Spring Batch job instance.
*/
JOB_INSTANCE_ID {
@Override
public String getKey() {
return "spring.batch.job.instanceId";
}
},

/**
* ID of the Spring Batch execution.
*/
JOB_EXECUTION_ID {
@Override
public String getKey() {
return "spring.batch.job.executionId";
}
}

}
}
@@ -0,0 +1,40 @@
/*
* Copyright 2006-2009 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.batch.core.job;

import io.micrometer.core.instrument.observation.Observation;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInterruptedException;
import org.springframework.batch.core.StartLimitExceededException;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.repository.JobRestartException;

/**
* {@link Observation.TagsProvider} for {@link BatchJobContext}.
*
* @author Marcin Grzejszczak
*/
public interface BatchJobTagsProvider extends Observation.TagsProvider<BatchJobContext> {

@Override
default boolean supportsContext(Observation.Context context) {
return context instanceof BatchJobContext;
}
}
@@ -0,0 +1,42 @@
/*
* Copyright 2011-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.core.job;

import io.micrometer.core.instrument.Tags;

import org.springframework.batch.core.JobExecution;

/**
* Default {@link BatchJobTagsProvider} implementation.
*
* @author Marcin Grzejszczak
*/
public class DefaultBatchJobTagsProvider implements BatchJobTagsProvider {
@Override
public Tags getLowCardinalityTags(BatchJobContext context) {
JobExecution execution = context.getJobExecution();
return Tags.of(BatchJobObservation.JobLowCardinalityTags.JOB_NAME.of(execution.getJobInstance().getJobName()),
BatchJobObservation.JobLowCardinalityTags.JOB_STATUS.of(execution.getExitStatus().getExitCode()));
}

@Override
public Tags getHighCardinalityTags(BatchJobContext context) {
JobExecution execution = context.getJobExecution();
return Tags.of(BatchJobObservation.JobHighCardinalityTags.JOB_INSTANCE_ID.of(String.valueOf(execution.getJobInstance().getInstanceId())),
BatchJobObservation.JobHighCardinalityTags.JOB_EXECUTION_ID.of(String.valueOf(execution.getId())));
}

}
Expand Up @@ -20,10 +20,12 @@
import java.util.Date;
import java.util.concurrent.TimeUnit;

import io.micrometer.api.instrument.LongTaskTimer;
import io.micrometer.api.instrument.Metrics;
import io.micrometer.api.instrument.Tag;
import io.micrometer.api.instrument.Timer;
import io.micrometer.core.instrument.LongTaskTimer;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.observation.Observation;
import io.micrometer.core.instrument.observation.TimerObservationHandler;

import org.springframework.lang.Nullable;

Expand Down Expand Up @@ -66,6 +68,19 @@ public static Timer createTimer(String name, String description, Tag... tags) {
.register(Metrics.globalRegistry);
}

/**
* Create a new {@link Observation}. It's not started, you must
* explicitly call {@link Observation#start()} to start it.
*
* Remember to register the {@link TimerObservationHandler}
* via the {@code Metrics.globalRegistry.withTimerObservationHandler()}
* in the user code. Otherwise you won't observe any metrics.
* @return a new observation instance
*/
public static Observation createObservation(String name, Observation.Context context) {
return Observation.createNotStarted(name, context, Metrics.globalRegistry);
}

/**
* Create a new {@link Timer.Sample}.
* @return a new timer sample instance
Expand Down

0 comments on commit 74c3b15

Please sign in to comment.