Skip to content

Commit

Permalink
Make the meaning of running status consistent
Browse files Browse the repository at this point in the history
- A running status is STARTING, STARTED, or STOPPING.

 Resolves spring-projects#1483
  • Loading branch information
lcmarvin committed May 12, 2022
1 parent 5bce152 commit ae522a9
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 11 deletions.
Expand Up @@ -85,10 +85,10 @@ public static BatchStatus max(BatchStatus status1, BatchStatus status2) {
/**
* Convenience method to decide if a status indicates that work is in progress.
*
* @return true if the status is STARTING, STARTED
* @return true if the status is STARTING, STARTED, STOPPING
*/
public boolean isRunning() {
return this == STARTING || this == STARTED;
return this == STARTING || this == STARTED || this == STOPPING;
}

/**
Expand Down
Expand Up @@ -263,10 +263,10 @@ public StepExecution createStepExecution(String stepName) {
* Test if this {@link JobExecution} indicates that it is running.
* Note that this does not necessarily mean that it has been persisted.
*
* @return {@code true} if the end time is null and the start time is not null.
* @return {@code true} if the status is one of the running status.
*/
public boolean isRunning() {
return startTime != null && endTime == null;
return status.isRunning();
}

/**
Expand Down
Expand Up @@ -112,7 +112,7 @@ public JobExecution run(final Job job, final JobParameters jobParameters)
*/
for (StepExecution execution : lastExecution.getStepExecutions()) {
BatchStatus status = execution.getStatus();
if (status.isRunning() || status == BatchStatus.STOPPING) {
if (status.isRunning()) {
throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: "
+ lastExecution);
} else if (status == BatchStatus.UNKNOWN) {
Expand Down
Expand Up @@ -85,7 +85,7 @@ public class JdbcJobExecutionDao extends AbstractJdbcBatchMetadataDao implements
+ " from %PREFIX%JOB_EXECUTION where JOB_EXECUTION_ID = ?";

private static final String GET_RUNNING_EXECUTIONS = "SELECT E.JOB_EXECUTION_ID, E.START_TIME, E.END_TIME, E.STATUS, E.EXIT_CODE, E.EXIT_MESSAGE, E.CREATE_TIME, E.LAST_UPDATED, E.VERSION, "
+ "E.JOB_INSTANCE_ID from %PREFIX%JOB_EXECUTION E, %PREFIX%JOB_INSTANCE I where E.JOB_INSTANCE_ID=I.JOB_INSTANCE_ID and I.JOB_NAME=? and E.START_TIME is not NULL and E.END_TIME is NULL order by E.JOB_EXECUTION_ID desc";
+ "E.JOB_INSTANCE_ID from %PREFIX%JOB_EXECUTION E, %PREFIX%JOB_INSTANCE I where E.JOB_INSTANCE_ID=I.JOB_INSTANCE_ID and I.JOB_NAME=? and E.STATUS in ('STARTING', 'STARTED', 'STOPPING') order by E.JOB_EXECUTION_ID desc";

private static final String CURRENT_VERSION_JOB_EXECUTION = "SELECT VERSION FROM %PREFIX%JOB_EXECUTION WHERE JOB_EXECUTION_ID=?";

Expand Down
Expand Up @@ -123,7 +123,7 @@ public JobExecution createJobExecution(String jobName, JobParameters jobParamete

// check for running executions and find the last started
for (JobExecution execution : executions) {
if (execution.isRunning() || execution.isStopping()) {
if (execution.isRunning()) {
throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: "
+ jobInstance);
}
Expand Down
Expand Up @@ -73,6 +73,7 @@ public void testIsRunning() {
assertFalse(BatchStatus.COMPLETED.isRunning());
assertTrue(BatchStatus.STARTED.isRunning());
assertTrue(BatchStatus.STARTING.isRunning());
assertTrue(BatchStatus.STOPPING.isRunning());
}

@Test
Expand Down
Expand Up @@ -57,13 +57,19 @@ public void testGetEndTime() {

/**
* Test method for
* {@link org.springframework.batch.core.JobExecution#getEndTime()}.
* {@link org.springframework.batch.core.JobExecution#isRunning()}.
*/
@Test
public void testIsRunning() {
execution.setStartTime(new Date());
execution.setStatus(BatchStatus.STARTING);
assertTrue(execution.isRunning());
execution.setEndTime(new Date(100L));
execution.setStatus(BatchStatus.STARTED);
assertTrue(execution.isRunning());
execution.setStatus(BatchStatus.STOPPING);
assertTrue(execution.isRunning());
execution.setStatus(BatchStatus.COMPLETED);
assertFalse(execution.isRunning());
execution.setStatus(BatchStatus.FAILED);
assertFalse(execution.isRunning());
}

Expand Down
@@ -0,0 +1,103 @@
/*
* Copyright 2006-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.core.launch;

import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import javax.sql.DataSource;

import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.core.test.AbstractIntegrationTests;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.transaction.PlatformTransactionManager;

/**
* @author Marvin Deng
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"/simple-job-launcher-context.xml"})
public class SimpleJobLauncherIntegrationTests extends AbstractIntegrationTests {

@Autowired
private JobLauncher jobLauncher;

@Autowired
private JobRepository jobRepository;

@Autowired
private PlatformTransactionManager transactionManager;

@Autowired
public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource;
}

@Test
public void testLaunchWithSameParametersInMultiThreads() throws Exception {
JobBuilderFactory jobBuilderFactory = new JobBuilderFactory(jobRepository);
StepBuilderFactory stepBuilderFactory = new StepBuilderFactory(jobRepository, transactionManager);

TaskletStep step = stepBuilderFactory.get("testStep")
.tasklet((contribution, chunkContext) -> RepeatStatus.FINISHED)
.build();
Job job = jobBuilderFactory.get("testJob").start(step).build();
JobParameters jobParameters = new JobParametersBuilder()
.addDate("now", new Date())
.toJobParameters();

AtomicInteger succeedCount = new AtomicInteger(0);
AtomicInteger failedCount = new AtomicInteger(0);
final int count = 10;
CountDownLatch latch = new CountDownLatch(count);
ExecutorService executorService = Executors.newFixedThreadPool(count);
for (int i = 0; i < count; i++) {
executorService.submit(() -> {
try {
jobLauncher.run(job, jobParameters);
succeedCount.incrementAndGet();
}
catch (Exception e) {
failedCount.incrementAndGet();
}
finally {
latch.countDown();
}
});
}
executorService.shutdown();
latch.await();
Assert.assertEquals(1, succeedCount.intValue());
Assert.assertEquals(count - 1, failedCount.intValue());
}

}
Expand Up @@ -202,6 +202,7 @@ public void testFindRunningExecutions() {
exec.setCreateTime(new Date(0));
exec.setStartTime(new Date(1L));
exec.setEndTime(new Date(2L));
exec.setStatus(BatchStatus.COMPLETED);
exec.setLastUpdated(new Date(5L));
dao.saveJobExecution(exec);

Expand All @@ -212,9 +213,17 @@ public void testFindRunningExecutions() {
exec.setLastUpdated(new Date(5L));
dao.saveJobExecution(exec);

//Stopping JobExecution as status is STOPPING
exec = new JobExecution(jobInstance, jobParameters);
exec.setStartTime(new Date(3L));
exec.setStatus(BatchStatus.STOPPING);
exec.setLastUpdated(new Date(5L));
dao.saveJobExecution(exec);

//Running JobExecution as StartTime is populated but EndTime is null
exec = new JobExecution(jobInstance, jobParameters);
exec.setStartTime(new Date(2L));
exec.setStatus(BatchStatus.STARTED);
exec.setLastUpdated(new Date(5L));
exec.createStepExecution("step");
dao.saveJobExecution(exec);
Expand All @@ -228,7 +237,7 @@ public void testFindRunningExecutions() {

Set<JobExecution> values = dao.findRunningJobExecutions(exec.getJobInstance().getJobName());

assertEquals(1, values.size());
assertEquals(3, values.size());
JobExecution value = values.iterator().next();
assertEquals(exec, value);
assertEquals(5L, value.getLastUpdated().getTime());
Expand Down

0 comments on commit ae522a9

Please sign in to comment.