Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

partitioned-batch-job raises NPE when used with mutiple Partiton handlers #793

Open
nicolasduminil opened this issue Aug 6, 2021 · 15 comments

Comments

@nicolasduminil
Copy link

nicolasduminil commented Aug 6, 2021

Hello,

I modified the partitioned-batch-job sample such that to add the following method:

  @Bean
  public PartitionHandler partitionHandler2(TaskLauncher taskLauncher, JobExplorer jobExplorer, TaskRepository taskRepository)
  {
    Resource resource = this.resourceLoader.getResource("maven://fr.simplex_software.tests:partitioned-job:1.0-SNAPSHOT");
    DeployerPartitionHandler partitionHandler = new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep");
    List<String> commandLineArgs = new ArrayList<>(3);
    commandLineArgs.add("--spring.profiles.active=worker");
    commandLineArgs.add("--spring.cloud.task.initialize-enabled=false");
    commandLineArgs.add("--spring.batch.initializer.enabled=false");
    partitionHandler.setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
    partitionHandler.setEnvironmentVariablesProvider(new SimpleEnvironmentVariablesProvider(this.environment));
    partitionHandler.setMaxWorkers(2);
    partitionHandler.setApplicationName("PartitionedBatchJobTask");
    return partitionHandler;
  }

Running the job raises the following exception:

2021-08-06 13:12:47.071 DEBUG 13084 --- [           main] o.s.c.t.r.support.SimpleTaskRepository   : Creating: 
TaskExecution{executionId=149, parentExecutionId=null, exitCode=null, taskName='null', startTime=null, endTime=null, 
exitMessage='null', externalExecutionId='null', errorMessage='null', arguments=[]}
2021-08-06 13:12:47.074 ERROR 13084 --- [           main] o.s.batch.core.step.AbstractStep         : Encountered an error 
executing step step1 in job partitionedJob-494091066

java.lang.NullPointerException: null
     at 
    org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.launchWorker(DeployerPartitionHandler.java:347) 
    ~[spring-cloud-task-batch-2.3.3.jar:2.3.3]
     at 
    org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.launchWorkers(DeployerPartitionHandler.java:313) 
    ~ [spring-cloud-task-batch-2.3.3.jar:2.3.3]
    ...

Please advise.

Many thanks in advance.

Just to mention that the NPE is raised at the line #347 of the DeployerPartitionHanlder class cause the taskExecution lcal property is null. This only happens as soon as there are 2 or more partition handlers in the job. Using different flows doesn't change anything.

@danilko
Copy link

danilko commented Aug 6, 2021

Thanks for writing the issue as also faced similar issue

Thought to add more info and hope it may help to dig up

This NPE will also be caused if the partitionHandler is at @StepScope/@JobScope or later created (not use @Bean)

Below is a stackoverflow article created with sample code on each attempt
https://stackoverflow.com/questions/68647761/spring-batch-with-multi-step-spring-cloud-task-partitionhandler-for-remote-p

Summary of issue

Spring batch job 1 (received job parameter for setting for step 1/setting for step 2)

    Step 1 -> remote partition (partitionhandler (cpu/memory for step 1 + grid) + partitioner) with setting from step1 (job configuration or step configuration)

    Step 2 -> remote partition (partitionhandler (cpu/memory for step 2 + grid) + partitioner) with setting from step2 (job configuration or step configuration, and diff from step 1)

The reason we want is to have different step with different k8s setting (like cpu/memory/grid)

enter image description here

Attempts:

  1. Create two partition handler (partitionHandlerReader + partitionHandlerProcessor) and their corresponding launcher (LauncherReader + LauncherProcessor)
    enter image description here

Complete Project can be found in
https://github.com/danilko/spring-batch-remote-k8s-paritition-example/tree/attempt_1_two_partitionhandlers

The main class of configuration is try to simplify into one class
https://github.com/danilko/spring-batch-remote-k8s-paritition-example/blob/attempt_1_two_partitionhandlers/src/main/java/com/example/batchprocessing/BatchConfiguration.java

  1. Use one PartitionerHandler + one TaskLauncher but with @StepScope for late binding for dynamic change base on step and job setup

enter image description here

Complete Project can be found in
https://github.com/danilko/spring-batch-remote-k8s-paritition-example/tree/attempt_2_partitionhandler_with_stepscope

The main class of configuration is try to simplify into one class
https://github.com/danilko/spring-batch-remote-k8s-paritition-example/blob/attempt_2_partitionhandler_with_stepscope/src/main/java/com/example/batchprocessing/BatchConfiguration.java

Both Result Following (full trace at above git repo):

During job trigger, it will error (it seem pass initial start up, but error during execution)

Because below will only occur when there are multiple PartitionHandler or when that Bean is at @StepScope or @JobScope


java.lang.NullPointerException: null
    at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.launchWorker(DeployerPartitionHandler.java:347) ~[spring-cloud-task-batch-2.3.1-SNAPSHOT.jar!/:2.3.1-SNAPSHOT]
    at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.launchWorkers(DeployerPartitionHandler.java:313) ~[spring-cloud-task-batch-2.3.1-SNAPSHOT.jar!/:2.3.1-SNAPSHOT]
    at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.handle(DeployerPartitionHandler.java:302) ~[spring-cloud-task-batch-2.3.1-SNAPSHOT.jar!/:2.3.1-SNAPSHOT]

Full Log

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.4.6)

2021-08-06 11:24:29.242  INFO 90294 --- [           main] c.e.b.BatchProcessingApplication         : Starting BatchProcessingApplication v0.0.1-SNAPSHOT using Java 11.0.7 on localhost.localdomain with PID 90294 (/home/danilko/IdeaProjects/partition/target/batchprocessing-0.0.1-SNAPSHOT.jar started by danilko in /home/danilko/IdeaProjects/partition)
2021-08-06 11:24:29.244  INFO 90294 --- [           main] c.e.b.BatchProcessingApplication         : The following profiles are active: controller
2021-08-06 11:24:29.790  INFO 90294 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-08-06 11:24:29.794  INFO 90294 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-08-06 11:24:29.797  INFO 90294 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-08-06 11:24:29.833  INFO 90294 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-06 11:24:29.947  INFO 90294 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-06 11:24:29.947  INFO 90294 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-06 11:24:29.959  INFO 90294 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.cloud.task.batch.configuration.TaskBatchAutoConfiguration' of type [org.springframework.cloud.task.batch.configuration.TaskBatchAutoConfiguration$$EnhancerBySpringCGLIB$$83e6c2be] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-06 11:24:29.968  INFO 90294 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.cloud.task.batch.listener.BatchEventAutoConfiguration' of type [org.springframework.cloud.task.batch.listener.BatchEventAutoConfiguration$$EnhancerBySpringCGLIB$$cc3cccc1] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-06 11:24:30.093  INFO 90294 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2021-08-06 11:24:30.160  INFO 90294 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.
2021-08-06 11:24:30.724  INFO 90294 --- [           main] o.s.b.c.r.s.JobRepositoryFactoryBean     : No database type set, using meta data indicating: MYSQL
2021-08-06 11:24:30.736  INFO 90294 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : No TaskExecutor has been set, defaulting to synchronous executor.
2021-08-06 11:24:30.897  INFO 90294 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-08-06 11:24:30.897  INFO 90294 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2021-08-06 11:24:30.897  INFO 90294 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2021-08-06 11:24:30.974  INFO 90294 --- [           main] c.e.b.BatchProcessingApplication         : Started BatchProcessingApplication in 2.024 seconds (JVM running for 2.366)
2021-08-06 11:24:30.975  INFO 90294 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: []
2021-08-06 11:24:31.010  INFO 90294 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=partitionedJob-1538890488]] launched with the following parameters: [{}]
Set readerGridSize == 1
2021-08-06 11:24:31.020  INFO 90294 --- [           main] o.s.c.t.b.l.TaskBatchExecutionListener   : The job execution id 22 was run within the task execution 54
2021-08-06 11:24:31.046  INFO 90294 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [partitionReaderStep]
2021-08-06 11:24:31.101 ERROR 90294 --- [           main] o.s.batch.core.step.AbstractStep         : Encountered an error executing step partitionReaderStep in job partitionedJob-1538890488

java.lang.NullPointerException: null
	at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.launchWorker(DeployerPartitionHandler.java:347) ~[spring-cloud-task-batch-2.3.1-SNAPSHOT.jar!/:2.3.1-SNAPSHOT]
	at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.launchWorkers(DeployerPartitionHandler.java:313) ~[spring-cloud-task-batch-2.3.1-SNAPSHOT.jar!/:2.3.1-SNAPSHOT]
	at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.handle(DeployerPartitionHandler.java:302) ~[spring-cloud-task-batch-2.3.1-SNAPSHOT.jar!/:2.3.1-SNAPSHOT]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.7.jar!/:5.3.7]
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.7.jar!/:5.3.7]
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.7.jar!/:5.3.7]
	at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:137) ~[spring-aop-5.3.7.jar!/:5.3.7]
	at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124) ~[spring-aop-5.3.7.jar!/:5.3.7]
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.7.jar!/:5.3.7]
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.7.jar!/:5.3.7]
	at com.sun.proxy.$Proxy65.handle(Unknown Source) ~[na:na]
	at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:106) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
	at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
	at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:152) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
	at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:413) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
	at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
	at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:320) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
	at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:149) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
	at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) ~[spring-core-5.3.7.jar!/:5.3.7]
	at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:140) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.7.jar!/:5.3.7]
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.7.jar!/:5.3.7]
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.7.jar!/:5.3.7]
	at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:128) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.7.jar!/:5.3.7]
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.7.jar!/:5.3.7]
	at com.sun.proxy.$Proxy51.run(Unknown Source) ~[na:na]
	at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.execute(JobLauncherApplicationRunner.java:199) ~[spring-boot-autoconfigure-2.4.6.jar!/:2.4.6]
	at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.executeLocalJobs(JobLauncherApplicationRunner.java:173) ~[spring-boot-autoconfigure-2.4.6.jar!/:2.4.6]
	at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.launchJobFromProperties(JobLauncherApplicationRunner.java:160) ~[spring-boot-autoconfigure-2.4.6.jar!/:2.4.6]
	at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.run(JobLauncherApplicationRunner.java:155) ~[spring-boot-autoconfigure-2.4.6.jar!/:2.4.6]
	at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.run(JobLauncherApplicationRunner.java:150) ~[spring-boot-autoconfigure-2.4.6.jar!/:2.4.6]
	at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:799) ~[spring-boot-2.4.6.jar!/:2.4.6]
	at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:789) ~[spring-boot-2.4.6.jar!/:2.4.6]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:346) ~[spring-boot-2.4.6.jar!/:2.4.6]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1329) ~[spring-boot-2.4.6.jar!/:2.4.6]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1318) ~[spring-boot-2.4.6.jar!/:2.4.6]
	at com.example.batchprocessing.BatchProcessingApplication.main(BatchProcessingApplication.java:10) ~[classes!/:0.0.1-SNAPSHOT]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
	at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49) ~[batchprocessing-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
	at org.springframework.boot.loader.Launcher.launch(Launcher.java:108) ~[batchprocessing-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
	at org.springframework.boot.loader.Launcher.launch(Launcher.java:58) ~[batchprocessing-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
	at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:88) ~[batchprocessing-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]


Study/Reference:
Most tutorial I found online only involved one partition step.
https://dataflow.spring.io/docs/feature-guides/batch/partitioning/

Thanks for info/helps in advance

@danilko
Copy link

danilko commented Aug 6, 2021

The symptom seem to be the taskExecution is null when PartitionHandler is not declare at @Bean level with @EnableTask

https://github.com/spring-cloud/spring-cloud-task/blob/main/spring-cloud-task-batch/src/main/java/org/springframework/cloud/task/batch/partition/DeployerPartitionHandler.java @ 347

							String.format("%s_%s_%s", this.taskExecution.getTaskName(), <-- null seem to be this line
									workerStepExecution.getJobExecution().getJobInstance()
											.getJobName(),
									workerStepExecution.getStepName())));

In database, what we see is all null (56 is the one map to batch job, and 57 is the worker execution id base on above reproducing code)

If PartitionHandler is deploy as @Bean in configuration and only one with no @Stepscope then that task execution will eventually be populated

+-------------------+---------------------+---------------------+--------------------------------------------------------------+-----------+--------------+---------------+---------------------+-------------------------------------+---------------------+
| TASK_EXECUTION_ID | START_TIME          | END_TIME            | TASK_NAME                                                    | EXIT_CODE | EXIT_MESSAGE | ERROR_MESSAGE | LAST_UPDATED        | EXTERNAL_EXECUTION_ID               | PARENT_EXECUTION_ID |
+-------------------+---------------------+---------------------+--------------------------------------------------------------+-----------+--------------+---------------+---------------------+-------------------------------------+---------------------+

|                56 | 2021-08-06 14:48:59 | 2021-08-06 14:48:59 | application                                                  |         0 | NULL         | NULL          | 2021-08-06 14:48:59 | NULL                                |                NULL |
|                57 | NULL                | NULL                | NULL                                                         |      NULL | NULL         | NULL          | 2021-08-06 14:48:59 | NULL                                |                NULL |
+-------------------+---------------------+---------------------+--------------------------------------------------------------+-----------+--------------+---------------+---------------------+-------------------------------------+---------------------+

My current hypothesis is f more than one PartitionHandler or PartitionHandler is at @StepScope this PartitionHandler will not create at same time as job, then inside the DeployerPartitionHandler, the @BeforeJob tag method will not be triggered and result taskExecution to be null

https://github.com/spring-cloud/spring-cloud-task/blob/main/spring-cloud-task-batch/src/main/java/org/springframework/cloud/task/batch/partition/DeployerPartitionHandler.java @ 269

	@BeforeTask
	public void beforeTask(TaskExecution taskExecution) {
		this.taskExecution = taskExecution;

		if (this.commandLineArgsProvider == null) {
			SimpleCommandLineArgsProvider provider = new SimpleCommandLineArgsProvider(
					taskExecution);
			this.commandLineArgsProvider = provider;

		}
	}

@nicolasduminil
Copy link
Author

Hi, just a short note to mention that, in my case, this same behaviour appears while the partition handlers are annotated with @bean. However, if the partition handler is annotated with @bean and @StepScope, then the exception raised is as per Issue #792. In this last case, even if there is only one partition handler and if it is annotated with @StepScope, the exception is raised.

@danilko
Copy link

danilko commented Aug 7, 2021

After digging more, identify a theory and a workaround (but may need others to confirm, after testing not found any issue)

Root cause analyze (hypothesis)

The problem is DeployerPartitionHandler utilize annoation @BeforeTask to force task to pass in TaskExecution object as part of Task setup

But as this partionerHandler is now at @StepScope (instead of directly at @bean level with @enable Task) or there are two partitionHandler, that setup is no longer triggered, as @EnableTask seem not able to locate one partitionhandler during creation.

https://github.com/spring-cloud/spring-cloud-task/blob/main/spring-cloud-task-batch/src/main/java/org/springframework/cloud/task/batch/partition/DeployerPartitionHandler.java @ 269

Resulted created DeployerHandler faced a null with taskExecution when trying to launch (as it is never setup)

https://github.com/spring-cloud/spring-cloud-task/blob/main/spring-cloud-task-batch/src/main/java/org/springframework/cloud/task/batch/partition/DeployerPartitionHandler.java @ 347

Workaround Resolution

Below is essentially a workaround to use the current job execution id to retrieve the associated task execution id
From there, got that task execution and passed to deploy handler to fulfill its need of taskExecution reference
It seem to work, but still not clear if there is other side effect (so far during test not found any)

Full code can be found in https://github.com/danilko/spring-batch-remote-k8s-paritition-example/tree/attempt_2_partitionhandler_with_stepscope_workaround_resolution

In the partitionHandler method

    @Bean
    @StepScope
    public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
                                                   JobExplorer jobExplorer,
                                             @Value("#{stepExecution}") StepExecution stepExecution) throws Exception {

...

      // After the declaration of partitionhandler
        DeployerPartitionHandler partitionHandler =
                new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
                        stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerStep")
                        , taskRepository);

        // Issue https://github.com/spring-cloud/spring-cloud-task/issues/793
        // Perform the setting of execution as this partitioner now not created at task level so @beforetask is no longer vaild
        // The problem is DeployerPartitionHandler utilize annoation @BeforeTask to force task to pass in TaskExecution object as part of Task setup
        // But as this partionerHandler is now at @StepScope (instead of directly at @Bean level with @Enable Task), that setup is no longer triggered
        // Resulted created DeployerHandler faced a null

        // Below is essentially a workaround to use the current job execution id to retrieve the associated task execution id
        // From there, got that task execution and passed to deploy handler to fulfill its need of taskExecution reference
        // It seem to work, but still not clear if there is other side effect (so far during test not found any)
        long executionId = taskExplorer.getTaskExecutionIdByJobExecutionId(stepExecution.getJobExecutionId());

        System.out.println("Current execution job to task execution id " + executionId);
        TaskExecution taskExecution = taskExplorer.getTaskExecution(taskExplorer.getTaskExecutionIdByJobExecutionId(stepExecution.getJobExecutionId()));
        System.out.println("Current execution job to task execution is not null: " + (taskExecution != null));
        partitionHandler.beforeTask(taskExecution);
...

// rest of code continue

@nicolasduminil
Copy link
Author

nicolasduminil commented Aug 7, 2021

Hi, thanks for your comments. I'm not sure what do you exactly mean by:
"But as this partionerHandler is now at @StepScope (instead of directly at @bean level with @enable Task) or there are two partitionHandler, that setup is no longer triggered, as @EnableTask seem not able to locate one partitionhandler during creation."
In my case, (issue #793) the partition handler is at bean level as is is annotated with @bean.
My other case (Issue #792) shows that having the partition handler annotated with @bean and @StepScope raises org.springframework.cloud.task.listener.TaskExecutionException and not NPE.
In any case, the @EnableTask annotation is in effect as it decorates the main class.
I'll try your workaround next week and I'll provide my results.
Many thanks for your work and efforts.

@danilko
Copy link

danilko commented Aug 8, 2021

Hello,

Sorry post wrong, but above should work for both

What I mean above is,
DeployerPartitionHandler partitionHandler 's beforeTask (which is triggered using @BeforeTask) is only called by default under following conditions:

  1. There is one partitionHandler method
  2. It is @Bean (not @StepScope or @JobScope), so it is created at same time as entire @EnableTask
    @Bean
    public PartitionHandler partitionHandler(...
``
Other condition will not be called, and result that taskExecution inside it to be null

 #793 related to #1 case,  that beforeTask in DeployerPartitionHandler is not populated if there are two bean with PartitionHandler (maybe somehow Task only populate one or some strange behavior?)
#792 related to #2, as #2 is created, that bean is not created same time of the Task bean (but rather per step bean), result that `beforeTask` in `DeployerPartitionHandler` not be called

So in both case, resolution may then need to manually populated that beforeTask with correct taskExecution object after the creation to workaround

@nicolasduminil
Copy link
Author

nicolasduminil commented Aug 9, 2021

Hello,
Many thanks again for your update. Here are my results:

In the initial issue I've registered (#791) I was wondering how could I avoid hard coding the step name into the partitionHandler() method ? For the records, the current example does that:

@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer, TaskRepository taskRepository) throws Exception {
	Resource resource = this.resourceLoader
		.getResource("maven://io.spring.cloud:partitioned-batch-job:2.2.0.BUILD-SNAPSHOT");

	DeployerPartitionHandler partitionHandler =
		new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep");
          ...

So, my question was how to avoid hard-coding the setp name (here "workerStep") ? Looking for an answer to that question, I thought I'll be adding an additional parameter of type String, containing the step name. This way, the partitionHandler() methos would have become:

  @Bean
  public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer, TaskRepository taskRepository, String stepName) throws Exception {
	Resource resource = this.resourceLoader
		.getResource("maven://io.spring.cloud:partitioned-batch-job:2.2.0.BUILD-SNAPSHOT");

	DeployerPartitionHandler partitionHandler =
		new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, stepName);
            ...

And in order to initialize the stepName I would have had:

  @Bean
  public Step step1()
  {
    return this.stepBuilderFactory.get("step1")
      .partitioner(workerStep().getName(), partitioner())
      .step(workerStep())
      .partitionHandler(partitionHandler(taskLauncher, jobExplorer, taskRepository, "workerStep"))
      .build();
  }

And in order to be able to pass the step name from the step definition, I needed to decorate the partitionHandler method with @ScopeStep. Then, I experienced the exception described in the 2nd issue (#792).

The next attempt I tried was to have several dedicated partition handlers, each one hard-coding the name of the associated partitioned worker step. This didn't require any more to annotate the partition handler methods with @StepScope, but I got the the case of the NPE, as described in the issue #793.

Now, I tested your workaround and I think it works. So the partitionHandler() method loks now as follows:

  @Bean
  @StepScope
  public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer, @Value("#{stepName}") String stepName, @Value("#{stepExecution}") StepExecution stepExecution)
  {
    Resource resource = this.resourceLoader
      .getResource("maven://fr.simplex_software.tests:partitioned-job:1.0-SNAPSHOT");
    DeployerPartitionHandler partitionHandler =
      new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, stepName, taskRepository);
    TaskExecution taskExecution = taskExplorer.getTaskExecution(taskExplorer.getTaskExecutionIdByJobExecutionId(stepExecution.getJobExecutionId()));
    partitionHandler.beforeTask(taskExecution);
    List<String> commandLineArgs = new ArrayList<>(3);
    commandLineArgs.add("--spring.profiles.active=worker");
    commandLineArgs.add("--spring.cloud.task.initialize-enabled=false");
    commandLineArgs.add("--spring.batch.initializer.enabled=false");
    partitionHandler
      .setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
    partitionHandler
      .setEnvironmentVariablesProvider(new SimpleEnvironmentVariablesProvider(this.environment));
    partitionHandler.setMaxWorkers(2);
    partitionHandler.setApplicationName("PartitionedBatchJobTask");
    return partitionHandler;
  }

Running it, I don't have any more neither the NPE, nore the java.lang.IllegalStateException: No context holder available for step scope one. The commands are being started now, as the log file shows it:

2021-08-09 19:27:51.419  INFO 2872 --- [           main] o.s.c.d.spi.local.LocalTaskLauncher      : Command to be executed:  /usr/lib/jvm/jdk-11.0.11/bin/java -jar /home/nicolas/.m2/repository/fr/simplex_software/tests/partitioned-job/1.0-SNAPSHOT /partitioned-job-1.0-SNAPSHOT.jar --spring.profiles.active=worker --spring.cloud.task.initialize-enabled=false  --spring.batch.initializer.enabled=false --spring.cloud.task.job-execution-id=1 --spring.cloud.task.step-execution-id=2 --spring.cloud.task.step-name=step1 --spring.cloud.task.name=PartitionedBatchJobTask_partitionedJob1258533368_workerStep:partition3 --spring.cloud.task.parentExecutionId=1 --spring.cloud.task.executionid=4
2021-08-09 19:27:51.422  INFO 2872 --- [           main] o.s.c.d.spi.local.LocalTaskLauncher      : launching task  PartitionedBatchJobTask-c35a02f7-9fd4-4409-a447-e3100cd8a765
   Logs will be in /tmp/271916935658/PartitionedBatchJobTask-c35a02f7-9fd4-4409-a447-e3100cd8a765

However, the command execution fails:

org.springframework.batch.core.JobExecutionException: Partition handler returned an unsuccessful step
at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:112) ~[spring-batch-core-4.3.3.jar:4.3.3]

Looking in the log files I got:

2021-08-09 19:27:48.927 ERROR 2947 --- [           main] o.s.batch.core.step.AbstractStep         : Encountered an error  executing step step1 in job partitionedJob1258533368

org.springframework.batch.core.JobExecutionException: Cannot restart step from STARTING status.  The old execution may still be executing, so you may need to verify manually that this is the case.
    at org.springframework.batch.core.partition.support.SimpleStepExecutionSplitter.shouldStart(SimpleStepExecutionSplitter.java:319) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
    at org.springframework.batch.core.partition.support.SimpleStepExecutionSplitter.getStartable(SimpleStepExecutionSplitter.java:279) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
    at org.springframework.batch.core.partition.support.SimpleStepExecutionSplitter.isStartable(SimpleStepExecutionSplitter.java:250) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
    at org.springframework.batch.core.partition.support.SimpleStepExecutionSplitter.split(SimpleStepExecutionSplitter.java:188) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
    at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.handle(DeployerPartitionHandler.java:285) ~[spring-cloud-task-batch-2.3.3.jar!/:2.3.3]
    at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:106) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
    at org.springframework.cloud.task.batch.partition.DeployerStepExecutionHandler.run(DeployerStepExecutionHandler.java:113) ~[spring-cloud-task-batch-2.3.3.jar!/:2.3.3]
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:791) ~[spring-boot-2.5.2.jar!/:2.5.2]
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:775) ~[spring-boot-2.5.2.jar!/:2.5.2]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:345) ~[spring-boot-2.5.2.jar!/:2.5.2]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1343) ~[spring-boot-2.5.2.jar!/:2.5.2]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1332) ~[spring-boot-2.5.2.jar!/:2.5.2]

Not sure whether this exception has anything to do with your workaround or if it's just an error of mine somewhere, I need to double check. Could you please confirm that it is not raised in your case ?

Also, could you please suggest a recommended way such that to avoid hard-coding the step name ? Would it be better to have only a partition handler bean taking as an input argument of its constructor the step name ? And if yes, what would be the best way to do it ? Or would it be better to have as many dedicated partition handler beans as partitioned worker steps, each one hard-coding the step name ?

Many thanks in advance.

@danilko
Copy link

danilko commented Aug 9, 2021

Hello,

Sorry will need to answer question in reverse order as it may explain better.

I actually faced that above issue with The old execution may still be executing if there are two partitionhandler method (in your case, it is partitionhandler and partitionhandler2). Still not able to resolve it.

So it is why I ended up with only 1 partitionerhandler + 1 splitter and utilize StepExecutionContext to diff between steps (and actually two partitionerHandler use case is not need for me after that, as I was able to generate different config within incoming stepExecution.getStepName() to let me know which step is using this partitionerhandler and therefore provide different worker step name and resource for docker image . Other setting (to partitioner, this means gridSize, and to taskLauncher different cpu/memory setting) can use similar fashion (code example before)

Actually after thinking about it, I may not have a solution for this #793, but rather suggest to use #792 but with this workaround + stepExecution to decorate the partitioner handler on different use case.

This also answer your first question about how to pass in step dynamically (sort of) by I am using stepExecutionContext -> find out the trigger step name (from your example, one step is step1 and then base on it, assign proper resource + worker step) (code example before)

You can see the full config in
https://github.com/danilko/spring-batch-remote-k8s-paritition-example/blob/attempt_2_partitionhandler_with_stepscope_workaround_resolution/src/main/java/com/example/batchprocessing/BatchConfiguration.java

PartitionerHandler (note it utilize stepExecution context to find out the current trigger step name and therefore assign different worker step)
Worker name in this case is coming from pre-defined job execution, but may able to come from jobparameter or another place too)

    @Bean
    @StepScope
    public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
                                                   JobExplorer jobExplorer,
                                             @Value("#{stepExecution}") StepExecution stepExecution) throws Exception {

        String step ="processor";

        if(stepExecution.getStepName().equalsIgnoreCase("partitionReaderStep")) {
            step = "reader";
        }

        // Use local build image
        DockerResource resource = new DockerResource(stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerImage"));


        DeployerPartitionHandler partitionHandler =
                new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
                        stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerStep")
                        , taskRepository);

        // Issue https://github.com/spring-cloud/spring-cloud-task/issues/793
        // Perform the setting of execution as this partitioner now not created at task level so @beforetask is no longer vaild
        // The problem is DeployerPartitionHandler utilize annoation @BeforeTask to force task to pass in TaskExecution object as part of Task setup
        // But as this partionerHandler is now at @StepScope (instead of directly at @Bean level with @Enable Task), that setup is no longer triggered
        // Resulted created DeployerHandler faced a null

        // Below is essentially a workaround to use the current job execution id to retrieve the associated task execution id
        // From there, got that task execution and passed to deploy handler to fulfill its need of taskExecution reference
        // It seem to work, but still not clear if there is other side effect (so far during test not found any)
        long executionId = taskExplorer.getTaskExecutionIdByJobExecutionId(stepExecution.getJobExecutionId());

        System.out.println("Current execution job to task execution id " + executionId);
        TaskExecution taskExecution = taskExplorer.getTaskExecution(taskExplorer.getTaskExecutionIdByJobExecutionId(stepExecution.getJobExecutionId()));
        System.out.println("Current execution job to task execution is not null: " + (taskExecution != null));
        partitionHandler.beforeTask(taskExecution);

        List<String> commandLineArgs = new ArrayList<>(3);
        commandLineArgs.add("--spring.profiles.active=worker");
        commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
        commandLineArgs.add("--spring.batch.initializer.enabled=false");
        partitionHandler
                .setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
        partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());
        partitionHandler.setMaxWorkers(Integer.parseInt(stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerGridSize")));

            partitionHandler.setApplicationName(taskName_prefix + step);

        return partitionHandler;
    }

Partitioner (note it utilize stepExecution context to find out the step name and therefore assign different gridSize)

    @Bean
    @StepScope
    public Partitioner partitioner( @Value("#{stepExecution}") StepExecution stepExecution) {
        return new Partitioner() {
            @Override
            public Map<String, ExecutionContext> partition(int gridSize) {

                Map<String, ExecutionContext> partitions = new HashMap<>(gridSize);

                int targetGridSize = 0;

                if(stepExecution.getStepName().equalsIgnoreCase("partitionReaderStep"))
                {
                    targetGridSize = Integer.parseInt(stepExecution.getJobExecution().getExecutionContext().getString("readerGridSize"));
                }
                else
                {
                    targetGridSize = Integer.parseInt(stepExecution.getJobExecution().getExecutionContext().getString("processorGridSize"));
                }


                for (int i = 0; i < targetGridSize; i++) {
                    ExecutionContext context1 = new ExecutionContext();
                    context1.put("partitionNumber", i);

                    partitions.put("partition" + i, context1);
                }

                return partitions;
            }
        };
    }

Worker steps define, notice there are three steps

    @Bean
    @Profile("worker")
    public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
        return new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);
    }

    @Bean(name = "workerStepReader")
    public Step workerStepReader() {
        return this.stepBuilderFactory.get("workerStepReader")
                .tasklet(workerTaskletReader(null))
                .build();
    }

    @Bean(name = "workerStepProcessor")
    public Step workerStepProcessor() {
        return this.stepBuilderFactory.get("workerStepProcessor")
                .tasklet(workerTaskletProcessor(null))
                .build();
    }

Job setup

    @Bean(name = "partitionedJob")
    @Profile("!worker")
    public Job partitionedJob()throws Exception {
        Random random = new Random();
        return jobBuilderFactory.get("partitionedJob" + random.nextInt())
                .start(partitionReaderStep())
                .listener(jobExecutionListener())
                .next(partitionProcessorStep())
                .build();
    }

    @Bean(name = "partitionReaderStep")
    public Step partitionReaderStep() throws Exception {

        return stepBuilderFactory.get("partitionReaderStep")
                .partitioner(workerStepReader().getName(),  partitioner( null))
                .step(workerStepReader())
                .partitionHandler(partitionHandler(
                        taskLauncher( null),
                        jobExplorer, null))
                .build();
    }

    @Bean(name = "partitionProcessorStep")
    public Step partitionProcessorStep() throws Exception {

        return stepBuilderFactory.get("partitionProcessorStep")
                .partitioner(workerStepProcessor().getName(), partitioner( null))
                .step(workerStepProcessor())
                .partitionHandler(partitionHandler(
                        taskLauncher( null),
                        jobExplorer, null))
                .build();
    }

Below can be in job parameters (so then can configure to use pass in, I just used job execution)


    @Bean
    public JobExecutionListener jobExecutionListener() {
    JobExecutionListener listener = new JobExecutionListener(){
        @Override
        public void beforeJob(JobExecution jobExecution)
        {
            jobExecution.getExecutionContext().putString("readerCPURequest", "1");
            jobExecution.getExecutionContext().putString("readerCPULimit", "2");

            jobExecution.getExecutionContext().putString("readerGridSize", "1");

            // For now using same image for reader/processor, but if it work, will split them
            jobExecution.getExecutionContext().putString("readerWorkerImage", "worker:latest");
            jobExecution.getExecutionContext().putString("readerWorkerStep", "workerStepReader");

            jobExecution.getExecutionContext().putString("processorCPURequest", "3");
            jobExecution.getExecutionContext().putString("processorCPULimit", "4");

            jobExecution.getExecutionContext().putString("processorGridSize", "2");

            // For now using same image for reader/processor, but if it work, will split them
            jobExecution.getExecutionContext().putString("processorWorkerImage", "worker:latest");
            jobExecution.getExecutionContext().putString("processorWorkerStep", "workerStepProcessor");

            System.out.println("Set readerGridSize == " + jobExecution.getExecutionContext().getString("readerGridSize", "IT IS NULL WHICH IS INCORRECT"));

        }

        @Override
        public void afterJob(JobExecution jobExecution) {
        }
    };

    return listener;
    }

Full class implementation

package com.example.batchprocessing;

import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.batch.JobList;
import io.fabric8.kubernetes.api.model.batch.JobSpec;
import io.fabric8.kubernetes.api.model.batch.JobStatus;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.deployer.resource.docker.DockerResource;
import org.springframework.cloud.deployer.resource.support.DelegatingResourceLoader;
import org.springframework.cloud.deployer.spi.kubernetes.*;
import org.springframework.cloud.deployer.spi.task.TaskLauncher;
import org.springframework.cloud.task.batch.partition.*;
import org.springframework.cloud.task.configuration.EnableTask;
import org.springframework.cloud.task.repository.TaskExecution;
import org.springframework.cloud.task.repository.TaskExplorer;
import org.springframework.cloud.task.repository.TaskRepository;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.core.env.Environment;
import org.springframework.core.env.SystemEnvironmentPropertySource;
import org.springframework.core.io.Resource;
import org.springframework.core.task.TaskExecutor;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.util.StringUtils;

import java.util.*;


@Configuration
@EnableBatchProcessing
@EnableTask
public class BatchConfiguration {

    private static int BACK_OFF_LIMIT = 6;

    // Set the kuberentes job name
    private String taskName_prefix="partitionedbatchjob";

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    public JobExplorer jobExplorer;

    @Autowired
    public JobRepository jobRepository;

    @Autowired
    public TaskExecutor taskExecutor;

    @Autowired
    public TaskRepository taskRepository;

    @Autowired
    public TaskExplorer taskExplorer;

    @Autowired
    private ConfigurableApplicationContext context;

    @Autowired
    private DelegatingResourceLoader resourceLoader;

    @Autowired
    private Environment environment;

    @Bean
    @StepScope
    public Partitioner partitioner( @Value("#{stepExecution}") StepExecution stepExecution) {
        return new Partitioner() {
            @Override
            public Map<String, ExecutionContext> partition(int gridSize) {

                Map<String, ExecutionContext> partitions = new HashMap<>(gridSize);

                int targetGridSize = 0;
                String step = "";
                if(stepExecution.getStepName().equalsIgnoreCase("partitionReaderStep"))
                {
                    step = "reader";
                }
                else
                {
                    step = "processor";
                }

                targetGridSize = Integer.parseInt(stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerGridSize"));

                for (int i = 0; i < targetGridSize; i++) {
                    ExecutionContext context1 = new ExecutionContext();
                    context1.put("partitionNumber", i);

                    partitions.put("partition" + i, context1);
                }

                return partitions;
            }
        };
    }

    @Bean
    public KubernetesClient kuberentesClient()
    {
        KubernetesDeployerProperties kubernetesDeployerProperties = new KubernetesDeployerProperties();

        return KubernetesClientFactory.getKubernetesClient(kubernetesDeployerProperties);
    }


    @Bean
    @StepScope
    public TaskLauncher taskLauncher( @Value("#{stepExecution}") StepExecution stepExecution)
    {
        KubernetesDeployerProperties kubernetesDeployerProperties = new KubernetesDeployerProperties();
        kubernetesDeployerProperties.setNamespace("default");

        kubernetesDeployerProperties.setCreateJob(true);

        // Database setup to reference configmap for database info
        List<KubernetesDeployerProperties.ConfigMapKeyRef> configMapKeyRefList = new ArrayList<KubernetesDeployerProperties.ConfigMapKeyRef>();
        KubernetesDeployerProperties.ConfigMapKeyRef configMapKeyRef = new KubernetesDeployerProperties.ConfigMapKeyRef();
        configMapKeyRef.setConfigMapName("mariadb");
        configMapKeyRef.setDataKey("SPRING_DATASOURCE_URL");
        configMapKeyRef.setEnvVarName("SPRING_DATASOURCE_URL");
        configMapKeyRefList.add(configMapKeyRef);

        configMapKeyRef = new KubernetesDeployerProperties.ConfigMapKeyRef();
        configMapKeyRef.setConfigMapName("mariadb");
        configMapKeyRef.setDataKey("SPRING_DATASOURCE_USERNAME");
        configMapKeyRef.setEnvVarName("SPRING_DATASOURCE_USERNAME");
        configMapKeyRefList.add(configMapKeyRef);

        configMapKeyRef = new KubernetesDeployerProperties.ConfigMapKeyRef();
        configMapKeyRef.setConfigMapName("mariadb");
        configMapKeyRef.setDataKey("SPRING_DATASOURCE_PASSWORD");
        configMapKeyRef.setEnvVarName("SPRING_DATASOURCE_PASSWORD");
        configMapKeyRefList.add(configMapKeyRef);

        configMapKeyRef = new KubernetesDeployerProperties.ConfigMapKeyRef();
        configMapKeyRef.setConfigMapName("mariadb");
        configMapKeyRef.setDataKey("SPRING_DATASOURCE_DRIVERCLASSNAME");
        configMapKeyRef.setEnvVarName("SPRING_DATASOURCE_DRIVERCLASSNAME");
        configMapKeyRefList.add(configMapKeyRef);

        configMapKeyRef = new KubernetesDeployerProperties.ConfigMapKeyRef();
        configMapKeyRef.setConfigMapName("mariadb");
        configMapKeyRef.setDataKey("SPRING_PROFILES_ACTIVE");
        configMapKeyRef.setEnvVarName("SPRING_PROFILES_ACTIVE");
        configMapKeyRefList.add(configMapKeyRef);


        kubernetesDeployerProperties.setConfigMapKeyRefs(configMapKeyRefList);

        // Set request resource
        KubernetesDeployerProperties.RequestsResources request = new KubernetesDeployerProperties.RequestsResources();
        KubernetesDeployerProperties.LimitsResources limit = new KubernetesDeployerProperties.LimitsResources();

        String step = "";

        if(stepExecution.getStepName().equalsIgnoreCase("partitionReaderStep"))
        {
            step="reader";
        }
        else
        {
            step="processor";
        }

        request.setCpu(stepExecution.getJobExecution().getExecutionContext().getString(step + "CPURequest"));
        request.setMemory("2000Mi");


        limit.setCpu(stepExecution.getJobExecution().getExecutionContext().getString(step +"CPULimit"));
        limit.setMemory("3000Mi");


        kubernetesDeployerProperties.setRequests(request);
        kubernetesDeployerProperties.setLimits(limit);

        // as build on local image, so need to use local
        kubernetesDeployerProperties.setImagePullPolicy(ImagePullPolicy.IfNotPresent);

        // Set task launcher properties to not repeat and not restart
        KubernetesTaskLauncherProperties kubernetesTaskLauncherProperties = new KubernetesTaskLauncherProperties();

        // https://kubernetes.io/docs/concepts/workloads/controllers/job/
        // Set to never to create new pod on restart
        kubernetesTaskLauncherProperties.setBackoffLimit(BACK_OFF_LIMIT);
        kubernetesTaskLauncherProperties.setRestartPolicy(RestartPolicy.Never);
        KubernetesTaskLauncher kubernetesTaskLauncher = new KubernetesTaskLauncher(kubernetesDeployerProperties,
                kubernetesTaskLauncherProperties, kuberentesClient());

        return kubernetesTaskLauncher;
    }


    @Bean(name = "partitionedJob")
    @Profile("!worker")
    public Job partitionedJob()throws Exception {
        Random random = new Random();
        return jobBuilderFactory.get("partitionedJob" + random.nextInt())
                .start(partitionReaderStep())
                .listener(jobExecutionListener())
                .next(partitionProcessorStep())
                .build();
    }

    @Bean(name = "partitionReaderStep")
    public Step partitionReaderStep() throws Exception {

        return stepBuilderFactory.get("partitionReaderStep")
                .partitioner(workerStepReader().getName(),  partitioner( null))
                .step(workerStepReader())
                .partitionHandler(partitionHandler(
                        taskLauncher( null),
                        jobExplorer, null))
                .build();
    }

    @Bean(name = "partitionProcessorStep")
    public Step partitionProcessorStep() throws Exception {

        return stepBuilderFactory.get("partitionProcessorStep")
                .partitioner(workerStepProcessor().getName(), partitioner( null))
                .step(workerStepProcessor())
                .partitionHandler(partitionHandler(
                        taskLauncher( null),
                        jobExplorer, null))
                .build();
    }


    @Bean
    @StepScope
    public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
                                                   JobExplorer jobExplorer,
                                             @Value("#{stepExecution}") StepExecution stepExecution) throws Exception {

        String step ="processor";

        if(stepExecution.getStepName().equalsIgnoreCase("partitionReaderStep")) {
            step = "reader";
        }

        // Use local build image
        DockerResource resource = new DockerResource(stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerImage"));


        DeployerPartitionHandler partitionHandler =
                new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
                        stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerStep")
                        , taskRepository);

        // Issue https://github.com/spring-cloud/spring-cloud-task/issues/793
        // Perform the setting of execution as this partitioner now not created at task level so @beforetask is no longer vaild
        // The problem is DeployerPartitionHandler utilize annoation @BeforeTask to force task to pass in TaskExecution object as part of Task setup
        // But as this partionerHandler is now at @StepScope (instead of directly at @Bean level with @Enable Task), that setup is no longer triggered
        // Resulted created DeployerHandler faced a null

        // Below is essentially a workaround to use the current job execution id to retrieve the associated task execution id
        // From there, got that task execution and passed to deploy handler to fulfill its need of taskExecution reference
        // It seem to work, but still not clear if there is other side effect (so far during test not found any)
        long executionId = taskExplorer.getTaskExecutionIdByJobExecutionId(stepExecution.getJobExecutionId());

        System.out.println("Current execution job to task execution id " + executionId);
        TaskExecution taskExecution = taskExplorer.getTaskExecution(taskExplorer.getTaskExecutionIdByJobExecutionId(stepExecution.getJobExecutionId()));
        System.out.println("Current execution job to task execution is not null: " + (taskExecution != null));
        partitionHandler.beforeTask(taskExecution);

        List<String> commandLineArgs = new ArrayList<>(3);
        commandLineArgs.add("--spring.profiles.active=worker");
        commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
        commandLineArgs.add("--spring.batch.initializer.enabled=false");
        partitionHandler
                .setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
        partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());
        partitionHandler.setMaxWorkers(Integer.parseInt(stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerGridSize")));

            partitionHandler.setApplicationName(taskName_prefix + step);

        return partitionHandler;
    }

    @Bean
    public JobExecutionListener jobExecutionListener() {
    JobExecutionListener listener = new JobExecutionListener(){
        @Override
        public void beforeJob(JobExecution jobExecution)
        {
            jobExecution.getExecutionContext().putString("readerCPURequest", "1");
            jobExecution.getExecutionContext().putString("readerCPULimit", "2");

            jobExecution.getExecutionContext().putString("readerWorkerGridSize", "1");

            // For now using same image for reader/processor, but if it work, can split them
            jobExecution.getExecutionContext().putString("readerWorkerImage", "worker:latest");
            jobExecution.getExecutionContext().putString("readerWorkerStep", "workerStepReader");

            jobExecution.getExecutionContext().putString("processorCPURequest", "3");
            jobExecution.getExecutionContext().putString("processorCPULimit", "4");

            jobExecution.getExecutionContext().putString("processorWorkerGridSize", "2");

            // For now using same image for reader/processor, but if it work, will split them
            jobExecution.getExecutionContext().putString("processorWorkerImage", "worker:latest");
            jobExecution.getExecutionContext().putString("processorWorkerStep", "workerStepProcessor");

            System.out.println("Set readerGridSize == " + jobExecution.getExecutionContext().getString("readerGridSize", "IT IS NULL WHICH IS INCORRECT"));

        }

        @Override
        public void afterJob(JobExecution jobExecution) {
        }
    };

    return listener;
    }

    @Bean
    @Profile("worker")
    public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
        return new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);
    }

    @Bean(name = "workerStepReader")
    public Step workerStepReader() {
        return this.stepBuilderFactory.get("workerStepReader")
                .tasklet(workerTaskletReader(null))
                .build();
    }

    @Bean(name = "workerStepProcessor")
    public Step workerStepProcessor() {
        return this.stepBuilderFactory.get("workerStepProcessor")
                .tasklet(workerTaskletProcessor(null))
                .build();
    }



    @Bean
    @StepScope
    public Tasklet workerTaskletReader(
            final @Value("#{stepExecution}") StepExecution stepExecution) {

        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                Integer partitionNumber = stepExecution.getExecutionContext().getInt("partitionNumber");
                System.out.println("This workerTaskletReader ran partition: " + partitionNumber);

                return RepeatStatus.FINISHED;
            }
        };
    }

    @Bean
    @StepScope
    public Tasklet workerTaskletProcessor(
            final @Value("#{stepExecution}") StepExecution stepExecution) {

        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                Integer partitionNumber = stepExecution.getExecutionContext().getInt("partitionNumber");
                System.out.println("This workerTaskletProcessor ran partition: " + partitionNumber);

                return RepeatStatus.FINISHED;
            }
        };
    }
}

@nicolasduminil
Copy link
Author

nicolasduminil commented Aug 10, 2021

Hello,

Your answers as well as your suggestions are higly appreciated, thank you very much. In order to keep you updated, after having followed your workaround and your sample code, I finished up with the following partition handler:

@Bean
@StepScope
public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer, @Value("#{stepExecution}") StepExecution stepExecution)
{
  Resource resource = this.resourceLoader.getResource("maven://fr.simplex_software.tests:partitioned-job:1.0-SNAPSHOT");
  DeployerPartitionHandler partitionHandler =
    new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
      stepExecution.getJobExecution().getExecutionContext().getString("workerStep"), taskRepository);
  TaskExecution taskExecution = taskExplorer.getTaskExecution(taskExplorer.getTaskExecutionIdByJobExecutionId(stepExecution.getJobExecutionId()));
      partitionHandler.beforeTask(taskExecution);
  List<String> commandLineArgs = new ArrayList<>(3);
  commandLineArgs.add("--spring.profiles.active=worker");
  commandLineArgs.add("--spring.cloud.task.initialize-enabled=false");
  commandLineArgs.add("--spring.batch.initializer.enabled=false");
  partitionHandler.setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
  partitionHandler.setEnvironmentVariablesProvider(new SimpleEnvironmentVariablesProvider(this.environment));
  partitionHandler.setMaxWorkers(2);
  partitionHandler.setApplicationName("PartitionedBatchJobTask");
  return partitionHandler;
}

So, there is only one partition handler annotated with @beans and @StepScope. Running that doesn't unfortunatelly work in my case and it raises the following exception:

...
Caused by: java.lang.ClassCastException: Value for key=[workerStep] is not of type: [class java.lang.String], it is [null]

...

This happens in this line of code:

stepExecution.getJobExecution().getExecutionContext().getString("workerStep");

Setting a breakpoint there I can see that the execution context returned by stepExecution.getJobExecution().getExecutionContext
is empty. Which is not surprizing as the step invoking tha partition handler is as follows:

  @Bean
  public Step step1()
  {
    return this.stepBuilderFactory.get("step1")
      .partitioner(workerStep().getName(), partitioner())
      .step(workerStep())
      .partitionHandler(partitionHandler(taskLauncher, jobExplorer, null))
      .build();
  }

Here the argument passed as stepExecution is null which is probably why the execution context is empty.

Then I thought to modify the partition handler as follows:

  @Bean
  @StepScope
  public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer, @Value("#{stepName}") String stepName, @Value("#{stepExecution}") StepExecution stepExecution)
  {
    Resource resource = this.resourceLoader.getResource("maven://fr.simplex_software.tests:partitioned-job:1.0-SNAPSHOT");
    DeployerPartitionHandler partitionHandler = new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, stepName, taskRepository);
    ...

}

Hence to pass the step name as an input argument of the partition handler instead of taking it from the execution context. Accordingly, the step definition invoking the partition handler looks like this:

  @Bean
  public Step step1()
  {
    return this.stepBuilderFactory.get("step1")
      .partitioner(workerStep().getName(), partitioner())
      .step(workerStep())
      .partitionHandler(partitionHandler(taskLauncher, jobExplorer, "workerStep",null))
      .build();
  }

Now, running it the previous exception is gone, the commands are started but, after a while, all the commands return

org.springframework.batch.core.JobExecutionException: Cannot restart step from STARTED status.  The old execution may still be executing, so you may need to verify manually that this is the case.

I have committed my small project here: https://github.com/nicolasduminil/partitioned-job.git. It's essentially the original example slightly modified in order to apply your work-around. I'd be very obligated to you if you have time to have a look at it and to let me know if you see something.
Basically the issue is the initialization of the stepExecution argument, which is null in my case as I don't know how to initialize it. But looking at your code I can see it is null as well.

Many thanks in advance for enlightening me and clarify this point.

@nicolasduminil
Copy link
Author

Last but not least, if someone from this code maintainers group or team could provide some guidance such that to get the code working, it would be great !

@sksastry
Copy link

sksastry commented Aug 10, 2021

Thank you both for your comments. I was able to apply the workaround provided by @danilko and very much appreciate your effort on this. To get around the ClassCastException, I replaced the line

stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerStep")

with my own step-name specific to the step-scope.

My use case is a job with multiple steps, where each step is remote-partitioned, and each step may have completely different DeployerPartitionHandler, which could include a different Resource, command-line-args, and even max-workers.

This is a real issue and also request the code maintainers to address this issue with a more formal fix.

@cppwfs
Copy link
Collaborator

cppwfs commented Aug 10, 2021

In this case it seems we need to create a story for Spring Cloud Task to support multiple partition handlers.

@nicolasduminil
Copy link
Author

@cppwfs : which means that currently Spring Cloud Task isn't supposed to support multiple partition handlers ? I didn't see any note about that in the documentation. And what would be the idea of supporting only one partition handler ? Is there any assumption that a job or a flow only has to support one partitoned step ? Because since the annotating the partition handler with @StepScope, such that to be able to pass arguments from steps, isn't supported neither, having several step dedicated partition handlers is the only solution. Right ?

@danilko
Copy link

danilko commented Aug 15, 2021

@nicolasduminil
Regarding #793 (comment)

Sorry for delay, was recently being very busy with work related tasks.

I did look your code and create a branch under your repo after my suggestion fixes (after these fixes it is working for me in my local setup)

Into complete state and can see four java dynamic process spawn up during the run

2021-08-14 18:56:56.546  INFO 82359 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=partitionedJob]] completed with the following parameters: [{run.id=6}] and the following status: [COMPLETED] in 10s569ms
2021-08-14 18:56:56.547 DEBUG 82359 --- [           main] o.s.c.t.r.support.SimpleTaskRepository   : Updating: TaskExecution with executionId=17 with the following {exitCode=0, endTime=Sat Aug 14 18:56:56 PDT 2021, 

Following are list of changes I made within that branch

The first one with partitionhandler, in my test, actually you will not need to specify the jobexplorer and others as it is already autowired with bean, so I simplify a lot to only

public PartitionHandler partitionHandler(@Value("#{stepExecution}") StepExecution stepExecution)  {

Regarding step name, in my tests/my previous setups, will not recommend straight value (without bean or context) as it seem when system created them, they will act strange as bean and cause null or strange
So rather will always use context (so above simplify to input only stepexecution context for that reason), and in actual code, use following

public PartitionHandler partitionHandler(@Value("#{stepExecution}") StepExecution stepExecution)  {
  
    // This below is to demo how to get the current step (in job controller that trigger this partitionerhandler)
    // Note, in this code case, it is called Step1 as that is the step name defined trigger
    String currentStepName = stepExecution.getStepName();

    // Utilize the job execution context
    // Can also use step1_corresponding_worker_job, but for sake of demo how to show step -> worker step1 relation
    // use following
    String workerStepName = stepExecution.getJobExecution().getExecutionContext().getString(currentStepName + "_corresponding_worker_job");

That job context seem above is populated by adding a listener to job

  @Bean
  @Profile("!worker")
  public Job partitionedJob()
  {
    return this.jobBuilderFactory.get("partitionedJob")
      .incrementer(new RunIdIncrementer())
      .listener(jobExecutionListener()) // <--- Add job execution listener to populate some setting, but can also use job parameters 
      .start(step1())
      .build();
  }

Then in that jobExecutionListner, populate this relation, so can be used in partitionhandler (the other way is to use jobparameter or some sort of DB)

  @Bean
  public JobExecutionListener jobExecutionListener() {
    JobExecutionListener listener = new JobExecutionListener() {

      // Define some context to be later used in job partioner handler
      @Override
      public void beforeJob(JobExecution jobExecution) {
        // Set up value to map step1 to corresponding worker step
        // Can use other way like job parameters too
        jobExecution.getExecutionContext().putString("step1_corresponding_worker_job", "workerStep");

      }
...
    return listener;
  }

And I think the most important (but not 100% sure if my analyze is correct), I always need to ensure max worker is great than grid size, otherwise will face the problem with step (which kind make sense, consider that how to split grid to worker when worker is less than grid)

public PartitionHandler partitionHandler(@Value("#{stepExecution}") StepExecution stepExecution)  {
...
partitionHandler.setMaxWorkers(GRID_SIZE); // Understanding is workers should be equal/larger than number of grid

@nicolasduminil
Copy link
Author

@danilko: Many thanks again for your help and support. Your contribution has been highly appreciated. Many thanks also for having dramatically simplified the provided code and for having made it work.
Kind regards,

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants