From ae522a9b3d521957ae5667d212f0de6b15794180 Mon Sep 17 00:00:00 2001 From: Marvin Deng Date: Wed, 11 May 2022 23:50:19 +0800 Subject: [PATCH] Make the meaning of running status consistent - A running status is STARTING, STARTED, or STOPPING. Resolves #1483 --- .../batch/core/BatchStatus.java | 4 +- .../batch/core/JobExecution.java | 4 +- .../launch/support/SimpleJobLauncher.java | 2 +- .../repository/dao/JdbcJobExecutionDao.java | 2 +- .../support/SimpleJobRepository.java | 2 +- .../batch/core/BatchStatusTests.java | 1 + .../batch/core/JobExecutionTests.java | 12 +- .../SimpleJobLauncherIntegrationTests.java | 103 ++++++++++++++++++ .../dao/AbstractJobExecutionDaoTests.java | 11 +- 9 files changed, 130 insertions(+), 11 deletions(-) create mode 100644 spring-batch-core/src/test/java/org/springframework/batch/core/launch/SimpleJobLauncherIntegrationTests.java diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/BatchStatus.java b/spring-batch-core/src/main/java/org/springframework/batch/core/BatchStatus.java index c548a1d854..73aefe9dbe 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/BatchStatus.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/BatchStatus.java @@ -85,10 +85,10 @@ public static BatchStatus max(BatchStatus status1, BatchStatus status2) { /** * Convenience method to decide if a status indicates that work is in progress. * - * @return true if the status is STARTING, STARTED + * @return true if the status is STARTING, STARTED, STOPPING */ public boolean isRunning() { - return this == STARTING || this == STARTED; + return this == STARTING || this == STARTED || this == STOPPING; } /** diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/JobExecution.java b/spring-batch-core/src/main/java/org/springframework/batch/core/JobExecution.java index bbe83c80ca..56f5251258 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/JobExecution.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/JobExecution.java @@ -263,10 +263,10 @@ public StepExecution createStepExecution(String stepName) { * Test if this {@link JobExecution} indicates that it is running. * Note that this does not necessarily mean that it has been persisted. * - * @return {@code true} if the end time is null and the start time is not null. + * @return {@code true} if the status is one of the running status. */ public boolean isRunning() { - return startTime != null && endTime == null; + return status.isRunning(); } /** diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/SimpleJobLauncher.java b/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/SimpleJobLauncher.java index 60a1847e96..f12c5c33ef 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/SimpleJobLauncher.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/SimpleJobLauncher.java @@ -112,7 +112,7 @@ public JobExecution run(final Job job, final JobParameters jobParameters) */ for (StepExecution execution : lastExecution.getStepExecutions()) { BatchStatus status = execution.getStatus(); - if (status.isRunning() || status == BatchStatus.STOPPING) { + if (status.isRunning()) { throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: " + lastExecution); } else if (status == BatchStatus.UNKNOWN) { diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/JdbcJobExecutionDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/JdbcJobExecutionDao.java index 5cbb17b0db..786e3e2c17 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/JdbcJobExecutionDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/JdbcJobExecutionDao.java @@ -85,7 +85,7 @@ public class JdbcJobExecutionDao extends AbstractJdbcBatchMetadataDao implements + " from %PREFIX%JOB_EXECUTION where JOB_EXECUTION_ID = ?"; private static final String GET_RUNNING_EXECUTIONS = "SELECT E.JOB_EXECUTION_ID, E.START_TIME, E.END_TIME, E.STATUS, E.EXIT_CODE, E.EXIT_MESSAGE, E.CREATE_TIME, E.LAST_UPDATED, E.VERSION, " - + "E.JOB_INSTANCE_ID from %PREFIX%JOB_EXECUTION E, %PREFIX%JOB_INSTANCE I where E.JOB_INSTANCE_ID=I.JOB_INSTANCE_ID and I.JOB_NAME=? and E.START_TIME is not NULL and E.END_TIME is NULL order by E.JOB_EXECUTION_ID desc"; + + "E.JOB_INSTANCE_ID from %PREFIX%JOB_EXECUTION E, %PREFIX%JOB_INSTANCE I where E.JOB_INSTANCE_ID=I.JOB_INSTANCE_ID and I.JOB_NAME=? and E.STATUS in ('STARTING', 'STARTED', 'STOPPING') order by E.JOB_EXECUTION_ID desc"; private static final String CURRENT_VERSION_JOB_EXECUTION = "SELECT VERSION FROM %PREFIX%JOB_EXECUTION WHERE JOB_EXECUTION_ID=?"; diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/SimpleJobRepository.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/SimpleJobRepository.java index 6b8ebf037a..04b0a90600 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/SimpleJobRepository.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/SimpleJobRepository.java @@ -123,7 +123,7 @@ public JobExecution createJobExecution(String jobName, JobParameters jobParamete // check for running executions and find the last started for (JobExecution execution : executions) { - if (execution.isRunning() || execution.isStopping()) { + if (execution.isRunning()) { throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: " + jobInstance); } diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/BatchStatusTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/BatchStatusTests.java index 43c0dfd05d..46ebef77d7 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/BatchStatusTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/BatchStatusTests.java @@ -73,6 +73,7 @@ public void testIsRunning() { assertFalse(BatchStatus.COMPLETED.isRunning()); assertTrue(BatchStatus.STARTED.isRunning()); assertTrue(BatchStatus.STARTING.isRunning()); + assertTrue(BatchStatus.STOPPING.isRunning()); } @Test diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/JobExecutionTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/JobExecutionTests.java index 927a857104..934da82125 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/JobExecutionTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/JobExecutionTests.java @@ -57,13 +57,19 @@ public void testGetEndTime() { /** * Test method for - * {@link org.springframework.batch.core.JobExecution#getEndTime()}. + * {@link org.springframework.batch.core.JobExecution#isRunning()}. */ @Test public void testIsRunning() { - execution.setStartTime(new Date()); + execution.setStatus(BatchStatus.STARTING); assertTrue(execution.isRunning()); - execution.setEndTime(new Date(100L)); + execution.setStatus(BatchStatus.STARTED); + assertTrue(execution.isRunning()); + execution.setStatus(BatchStatus.STOPPING); + assertTrue(execution.isRunning()); + execution.setStatus(BatchStatus.COMPLETED); + assertFalse(execution.isRunning()); + execution.setStatus(BatchStatus.FAILED); assertFalse(execution.isRunning()); } diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/launch/SimpleJobLauncherIntegrationTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/launch/SimpleJobLauncherIntegrationTests.java new file mode 100644 index 0000000000..cf0c235382 --- /dev/null +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/launch/SimpleJobLauncherIntegrationTests.java @@ -0,0 +1,103 @@ +/* + * Copyright 2006-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.core.launch; + +import java.util.Date; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.sql.DataSource; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.JobParametersBuilder; +import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; +import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.tasklet.TaskletStep; +import org.springframework.batch.core.test.AbstractIntegrationTests; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import org.springframework.transaction.PlatformTransactionManager; + +/** + * @author Marvin Deng + */ +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(locations = {"/simple-job-launcher-context.xml"}) +public class SimpleJobLauncherIntegrationTests extends AbstractIntegrationTests { + + @Autowired + private JobLauncher jobLauncher; + + @Autowired + private JobRepository jobRepository; + + @Autowired + private PlatformTransactionManager transactionManager; + + @Autowired + public void setDataSource(DataSource dataSource) { + this.dataSource = dataSource; + } + + @Test + public void testLaunchWithSameParametersInMultiThreads() throws Exception { + JobBuilderFactory jobBuilderFactory = new JobBuilderFactory(jobRepository); + StepBuilderFactory stepBuilderFactory = new StepBuilderFactory(jobRepository, transactionManager); + + TaskletStep step = stepBuilderFactory.get("testStep") + .tasklet((contribution, chunkContext) -> RepeatStatus.FINISHED) + .build(); + Job job = jobBuilderFactory.get("testJob").start(step).build(); + JobParameters jobParameters = new JobParametersBuilder() + .addDate("now", new Date()) + .toJobParameters(); + + AtomicInteger succeedCount = new AtomicInteger(0); + AtomicInteger failedCount = new AtomicInteger(0); + final int count = 10; + CountDownLatch latch = new CountDownLatch(count); + ExecutorService executorService = Executors.newFixedThreadPool(count); + for (int i = 0; i < count; i++) { + executorService.submit(() -> { + try { + jobLauncher.run(job, jobParameters); + succeedCount.incrementAndGet(); + } + catch (Exception e) { + failedCount.incrementAndGet(); + } + finally { + latch.countDown(); + } + }); + } + executorService.shutdown(); + latch.await(); + Assert.assertEquals(1, succeedCount.intValue()); + Assert.assertEquals(count - 1, failedCount.intValue()); + } + +} diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/dao/AbstractJobExecutionDaoTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/dao/AbstractJobExecutionDaoTests.java index 61b645458b..ea845b03d9 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/dao/AbstractJobExecutionDaoTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/dao/AbstractJobExecutionDaoTests.java @@ -202,6 +202,7 @@ public void testFindRunningExecutions() { exec.setCreateTime(new Date(0)); exec.setStartTime(new Date(1L)); exec.setEndTime(new Date(2L)); + exec.setStatus(BatchStatus.COMPLETED); exec.setLastUpdated(new Date(5L)); dao.saveJobExecution(exec); @@ -212,9 +213,17 @@ public void testFindRunningExecutions() { exec.setLastUpdated(new Date(5L)); dao.saveJobExecution(exec); + //Stopping JobExecution as status is STOPPING + exec = new JobExecution(jobInstance, jobParameters); + exec.setStartTime(new Date(3L)); + exec.setStatus(BatchStatus.STOPPING); + exec.setLastUpdated(new Date(5L)); + dao.saveJobExecution(exec); + //Running JobExecution as StartTime is populated but EndTime is null exec = new JobExecution(jobInstance, jobParameters); exec.setStartTime(new Date(2L)); + exec.setStatus(BatchStatus.STARTED); exec.setLastUpdated(new Date(5L)); exec.createStepExecution("step"); dao.saveJobExecution(exec); @@ -228,7 +237,7 @@ public void testFindRunningExecutions() { Set values = dao.findRunningJobExecutions(exec.getJobInstance().getJobName()); - assertEquals(1, values.size()); + assertEquals(3, values.size()); JobExecution value = values.iterator().next(); assertEquals(exec, value); assertEquals(5L, value.getLastUpdated().getTime());