Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JobOperator#stop can not stop JobExecution correctly in some cases #4064

Closed
lcmarvin opened this issue Feb 13, 2022 · 1 comment
Closed

JobOperator#stop can not stop JobExecution correctly in some cases #4064

lcmarvin opened this issue Feb 13, 2022 · 1 comment
Labels
for: backport-to-4.3.x Issues that will be back-ported to the 4.3.x line has: minimal-example Bug reports that provide a minimal complete reproducible example in: core type: bug
Milestone

Comments

@lcmarvin
Copy link
Contributor

lcmarvin commented Feb 13, 2022

Bug description
JobOperator#stop can not stop JobExecution correctly in the case that the stop command is executed after step finish but before job finish. JobExecute will ends up with status STOPPING instead of STOPPED.

Since STOPPING is a running state, so I think this stop command does not stop JobExecution correctly.

Steps to reproduce

  1. Build a tasklet or chunk step with an ItemStream being set using AbstractTaskletStepBuilder#stream. The ItemStream should override close method.
  2. Build a simple job with this step.
  3. Launch the job in one thread.
  4. Stop the job execution in another thread using JobOperator#stop. Make sure this method execute after step finish but before job finish. (You can refer to the method in below example to do this)

Expected behavior
JobExecution ends up with status STOPPED.

Actual behavior
JobExecution ends up with status STOPPING.

Minimal Complete Reproducible example
This following example can be run under src/test/java/org/springframework/batch/test in spring-batch-test module.

package org.springframework.batch.test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import javax.sql.DataSource;

import org.junit.Test;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
import org.springframework.batch.core.configuration.support.MapJobRegistry;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.launch.support.SimpleJobOperator;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemStreamSupport;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;

import static org.junit.Assert.assertEquals;

public class StopOperatorTests {

	static JobOperator jobOperator;

	static CountDownLatch startStopLatch = new CountDownLatch(1);

	static CountDownLatch finishStopLatch = new CountDownLatch(1);

	/**
	 * Use two signal to simulate the case that the stop command executed after step finish but before job finish.
	 *
	 * @throws Exception
	 */
	@Test
	public void testStop() throws Exception {
		ApplicationContext context = new AnnotationConfigApplicationContext(StopJobConfiguration.class);

		JobLauncherTestUtils testUtils = context.getBean(JobLauncherTestUtils.class);

		jobOperator = context.getBean(JobOperator.class);

		ExecutorService executorService = Executors.newFixedThreadPool(2);

		// Launch job in one thread
		Future<JobExecution> future = executorService.submit(() -> {
			try {
				return testUtils.launchJob();
			}
			catch (Exception e) {
				throw new RuntimeException(e);
			}
		});

		// Stop this job execution in another thread
		executorService.submit(() -> {
			try {
				// Wait startSop signal
				startStopLatch.await();
				jobOperator.stop(0L);
				// Send finishStop signal
				finishStopLatch.countDown();
			}
			catch (Exception e) {
				throw new RuntimeException(e);
			}
		});

		// Expected: this job execution can be stopped
		assertEquals(BatchStatus.STOPPED, future.get().getStatus());
	}

	@Configuration
	@EnableBatchProcessing
	static class StopJobConfiguration {
		@Autowired
		public JobBuilderFactory jobBuilderFactory;

		@Autowired
		public StepBuilderFactory stepBuilderFactory;

		@Bean
		public JobRegistry jobRegistry() {
			return new MapJobRegistry();
		}

		@Bean
		public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() {
			JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor();
			jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry());
			return jobRegistryBeanPostProcessor;
		}

		@Bean
		public JobOperator jobOperator(JobLauncher jobLauncher, JobRepository jobRepository,
				JobExplorer jobExplorer, JobRegistry jobRegistry) {
			SimpleJobOperator jobOperator = new SimpleJobOperator();
			jobOperator.setJobExplorer(jobExplorer);
			jobOperator.setJobLauncher(jobLauncher);
			jobOperator.setJobRegistry(jobRegistry);
			jobOperator.setJobRepository(jobRepository);
			return jobOperator;
		}

		@Bean
		public Step step() {
			return stepBuilderFactory.get("step")
					.tasklet((contribution, chunkContext) -> RepeatStatus.FINISHED)
					.stream(new MockStream())
					.build();
		}

		@Bean
		public Job job() {
			return jobBuilderFactory.get("job").start(step()).build();
		}

		@Bean
		public JobLauncherTestUtils testUtils() {
			JobLauncherTestUtils jobLauncherTestUtils = new JobLauncherTestUtils();
			jobLauncherTestUtils.setJob(job());
			return jobLauncherTestUtils;
		}

		@Bean
		public DataSource dataSource() {
			return new EmbeddedDatabaseBuilder()
					.addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql")
					.addScript("/org/springframework/batch/core/schema-hsqldb.sql")
					.generateUniqueName(true)
					.build();
		}
	}

	static class MockStream extends ItemStreamSupport {

		@Override
		public void close() {
			try {
				// Send startStop signal
				startStopLatch.countDown();
				// Wait finishStop signal
				finishStopLatch.await();
			}
			catch (Exception e) {
				throw new RuntimeException(e);
			}
		}
	}
}

Or refer to this link to see the source code.

Run this test should generate following output.

java.lang.AssertionError: 
Expected :STOPPED
Actual   :STOPPING
@lcmarvin lcmarvin added status: waiting-for-triage Issues that we did not analyse yet type: bug labels Feb 13, 2022
@lcmarvin
Copy link
Contributor Author

lcmarvin commented Feb 15, 2022

Please refer to the PR #4067

@fmbenhassine fmbenhassine added in: core has: minimal-example Bug reports that provide a minimal complete reproducible example for: backport-to-4.3.x Issues that will be back-ported to the 4.3.x line and removed status: waiting-for-triage Issues that we did not analyse yet labels Apr 8, 2022
@fmbenhassine fmbenhassine added this to the 5.0.0-M3 milestone Apr 8, 2022
@fmbenhassine fmbenhassine modified the milestones: 5.0.0-M3, 5.0.0-M4 May 17, 2022
@fmbenhassine fmbenhassine modified the milestones: 5.0.0-M4, 5.0.0-M5 Jul 20, 2022
@fmbenhassine fmbenhassine modified the milestones: 5.0.0-M5, 5.0.0-M6 Aug 24, 2022
@fmbenhassine fmbenhassine modified the milestones: 5.0.0-M6, 5.0.0-M7 Sep 21, 2022
@fmbenhassine fmbenhassine modified the milestones: 5.0.0-M7, 5.0.0-M8 Oct 4, 2022
@fmbenhassine fmbenhassine modified the milestones: 5.0.0-M8, 5.0.0-RC1 Oct 12, 2022
@fmbenhassine fmbenhassine modified the milestones: 5.0.0-RC1, 5.0.0-RC2 Oct 20, 2022
@fmbenhassine fmbenhassine modified the milestones: 5.0.0-RC2, 5.0.0 Nov 8, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for: backport-to-4.3.x Issues that will be back-ported to the 4.3.x line has: minimal-example Bug reports that provide a minimal complete reproducible example in: core type: bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants