Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

partitioned-batch-job raises org.springframework.cloud.task.listener.TaskExecutionException when Partitioner is @StepScope #792

Open
nicolasduminil opened this issue Aug 6, 2021 · 1 comment

Comments

@nicolasduminil
Copy link

Hello,

I modified the partitioned-batch-job sample such that the PartitonHandler class reads as follows:

  @Bean
  @StepScope
  public DeployerPartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer, TaskRepository taskRepository, String stepName)
  {
    Resource resource = this.resourceLoader.getResource("...");
    DeployerPartitionHandler partitionHandler = new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, stepName);
    List<String> commandLineArgs = new ArrayList<>(3);
    commandLineArgs.add("--spring.profiles.active=worker");
    commandLineArgs.add("--spring.cloud.task.initialize-enabled=false");
    commandLineArgs.add("--spring.batch.initializer.enabled=false");
    partitionHandler.setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
    partitionHandler.setEnvironmentVariablesProvider(new SimpleEnvironmentVariablesProvider(this.environment));
    partitionHandler.setMaxWorkers(2);
    partitionHandler.setApplicationName("PartitionedBatchJobTask");
    return partitionHandler;
  }

And I also modified the method partitonJob as follows:

@Bean
@Profile("!worker")
public Job partitionedJob(/*PartitionHandler partitionHandler*/)
{
  Random random = new Random();
  return this.jobBuilderFactory.get("partitionedJob" + random.nextInt())
    .start(step1(partitionHandler(taskLauncher, jobExplorer, taskRepository, "workerStep")))
    .build();
}

This is in order to use multiple steps and, hence, to be able to pass the step name as an input argument of the partitinHandler() method.

Running the job raises the following exception:

org.springframework.context.ApplicationContextException: Failed to start bean 'taskLifecycleListener'; nested exception is 
org.springframework.cloud.task.listener.TaskExecutionException: Failed to process @BeforeTask or @AfterTask annotation 
because: Error creating bean with name 'scopedTarget.partitionHandler': Scope 'step' is not active for the current thread; 
consider defining a scoped proxy for this bean if you intend to refer to it from a singleton; nested exception is 
java.lang.IllegalStateException: No context holder available for step scope
      at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring- context-5.3.8.jar:5.3.8]
...

I have found many similar cases on stackoverflow but no working solutions. Please advise.

Many thanks in advance.

@danilko
Copy link

danilko commented Aug 9, 2021

Thought to paste here from #793 #793 (comment), as this is the approach my workaround + my own code tested without issue (and actually more relate to this issue rather than #793)

My hypothesis on the issue can be found in
#793 (comment)

For me the manually pass stepName as normal String object has issue (maybe the bean is already pre-loaded at that point). The only way I am able to get it work is through stepExecution parameter that is passed in by Spring Batch (sample below)

I ended up with only 1 partitionerhandler + 1 splitter and utilize StepExecutionContext to diff between steps (and actually two partitionerHandler use case is not need for me after that, as I was able to generate different config within incoming stepExecution.getStepName() to let me know which step is using this partitionerhandler and therefore provide different worker step name and resource for docker image . Other setting (to partitioner, this means gridSize, and to taskLauncher different cpu/memory setting) can use similar fashion (code example before)

This also answer your first question about how to pass in step dynamically (sort of) by I am using stepExecutionContext -> find out the trigger step name (from your example, one step is step1 and then base on it, assign proper resource + worker step) (code example before)

You can see the full config in
https://github.com/danilko/spring-batch-remote-k8s-paritition-example/blob/attempt_2_partitionhandler_with_stepscope_workaround_resolution/src/main/java/com/example/batchprocessing/BatchConfiguration.java

PartitionerHandler (note it utilize stepExecution context to find out the current trigger step name and therefore assign different worker step)
Worker name in this case is coming from pre-defined job execution, but may able to come from jobparameter or another place too)

    @Bean
    @StepScope
    public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
                                                   JobExplorer jobExplorer,
                                             @Value("#{stepExecution}") StepExecution stepExecution) throws Exception {

        String step ="processor";

        if(stepExecution.getStepName().equalsIgnoreCase("partitionReaderStep")) {
            step = "reader";
        }

        // Use local build image
        DockerResource resource = new DockerResource(stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerImage"));


        DeployerPartitionHandler partitionHandler =
                new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
                        stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerStep")
                        , taskRepository);

        // Issue https://github.com/spring-cloud/spring-cloud-task/issues/793
        // Perform the setting of execution as this partitioner now not created at task level so @beforetask is no longer vaild
        // The problem is DeployerPartitionHandler utilize annoation @BeforeTask to force task to pass in TaskExecution object as part of Task setup
        // But as this partionerHandler is now at @StepScope (instead of directly at @Bean level with @Enable Task), that setup is no longer triggered
        // Resulted created DeployerHandler faced a null

        // Below is essentially a workaround to use the current job execution id to retrieve the associated task execution id
        // From there, got that task execution and passed to deploy handler to fulfill its need of taskExecution reference
        // It seem to work, but still not clear if there is other side effect (so far during test not found any)
        long executionId = taskExplorer.getTaskExecutionIdByJobExecutionId(stepExecution.getJobExecutionId());

        System.out.println("Current execution job to task execution id " + executionId);
        TaskExecution taskExecution = taskExplorer.getTaskExecution(taskExplorer.getTaskExecutionIdByJobExecutionId(stepExecution.getJobExecutionId()));
        System.out.println("Current execution job to task execution is not null: " + (taskExecution != null));
        partitionHandler.beforeTask(taskExecution);

        List<String> commandLineArgs = new ArrayList<>(3);
        commandLineArgs.add("--spring.profiles.active=worker");
        commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
        commandLineArgs.add("--spring.batch.initializer.enabled=false");
        partitionHandler
                .setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
        partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());
        partitionHandler.setMaxWorkers(Integer.parseInt(stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerGridSize")));

            partitionHandler.setApplicationName(taskName_prefix + step);

        return partitionHandler;
    }

Partitioner (note it utilize stepExecution context to find out the step name and therefore assign different gridSize)

    @Bean
    @StepScope
    public Partitioner partitioner( @Value("#{stepExecution}") StepExecution stepExecution) {
        return new Partitioner() {
            @Override
            public Map<String, ExecutionContext> partition(int gridSize) {

                Map<String, ExecutionContext> partitions = new HashMap<>(gridSize);

                int targetGridSize = 0;

                if(stepExecution.getStepName().equalsIgnoreCase("partitionReaderStep"))
                {
                    targetGridSize = Integer.parseInt(stepExecution.getJobExecution().getExecutionContext().getString("readerGridSize"));
                }
                else
                {
                    targetGridSize = Integer.parseInt(stepExecution.getJobExecution().getExecutionContext().getString("processorGridSize"));
                }


                for (int i = 0; i < targetGridSize; i++) {
                    ExecutionContext context1 = new ExecutionContext();
                    context1.put("partitionNumber", i);

                    partitions.put("partition" + i, context1);
                }

                return partitions;
            }
        };
    }

Worker steps define, notice there are three steps

    @Bean
    @Profile("worker")
    public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
        return new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);
    }

    @Bean(name = "workerStepReader")
    public Step workerStepReader() {
        return this.stepBuilderFactory.get("workerStepReader")
                .tasklet(workerTaskletReader(null))
                .build();
    }

    @Bean(name = "workerStepProcessor")
    public Step workerStepProcessor() {
        return this.stepBuilderFactory.get("workerStepProcessor")
                .tasklet(workerTaskletProcessor(null))
                .build();
    }

Job setup

    @Bean(name = "partitionedJob")
    @Profile("!worker")
    public Job partitionedJob()throws Exception {
        Random random = new Random();
        return jobBuilderFactory.get("partitionedJob" + random.nextInt())
                .start(partitionReaderStep())
                .listener(jobExecutionListener())
                .next(partitionProcessorStep())
                .build();
    }

    @Bean(name = "partitionReaderStep")
    public Step partitionReaderStep() throws Exception {

        return stepBuilderFactory.get("partitionReaderStep")
                .partitioner(workerStepReader().getName(),  partitioner( null))
                .step(workerStepReader())
                .partitionHandler(partitionHandler(
                        taskLauncher( null),
                        jobExplorer, null))
                .build();
    }

    @Bean(name = "partitionProcessorStep")
    public Step partitionProcessorStep() throws Exception {

        return stepBuilderFactory.get("partitionProcessorStep")
                .partitioner(workerStepProcessor().getName(), partitioner( null))
                .step(workerStepProcessor())
                .partitionHandler(partitionHandler(
                        taskLauncher( null),
                        jobExplorer, null))
                .build();
    }

Below can be in job parameters (so then can configure to use pass in, I just used job execution)


    @Bean
    public JobExecutionListener jobExecutionListener() {
    JobExecutionListener listener = new JobExecutionListener(){
        @Override
        public void beforeJob(JobExecution jobExecution)
        {
            jobExecution.getExecutionContext().putString("readerCPURequest", "1");
            jobExecution.getExecutionContext().putString("readerCPULimit", "2");

            jobExecution.getExecutionContext().putString("readerGridSize", "1");

            // For now using same image for reader/processor, but if it work, will split them
            jobExecution.getExecutionContext().putString("readerWorkerImage", "worker:latest");
            jobExecution.getExecutionContext().putString("readerWorkerStep", "workerStepReader");

            jobExecution.getExecutionContext().putString("processorCPURequest", "3");
            jobExecution.getExecutionContext().putString("processorCPULimit", "4");

            jobExecution.getExecutionContext().putString("processorGridSize", "2");

            // For now using same image for reader/processor, but if it work, will split them
            jobExecution.getExecutionContext().putString("processorWorkerImage", "worker:latest");
            jobExecution.getExecutionContext().putString("processorWorkerStep", "workerStepProcessor");

            System.out.println("Set readerGridSize == " + jobExecution.getExecutionContext().getString("readerGridSize", "IT IS NULL WHICH IS INCORRECT"));

        }

        @Override
        public void afterJob(JobExecution jobExecution) {
        }
    };

    return listener;
    }

Full class implementation

package com.example.batchprocessing;

import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.batch.JobList;
import io.fabric8.kubernetes.api.model.batch.JobSpec;
import io.fabric8.kubernetes.api.model.batch.JobStatus;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.deployer.resource.docker.DockerResource;
import org.springframework.cloud.deployer.resource.support.DelegatingResourceLoader;
import org.springframework.cloud.deployer.spi.kubernetes.*;
import org.springframework.cloud.deployer.spi.task.TaskLauncher;
import org.springframework.cloud.task.batch.partition.*;
import org.springframework.cloud.task.configuration.EnableTask;
import org.springframework.cloud.task.repository.TaskExecution;
import org.springframework.cloud.task.repository.TaskExplorer;
import org.springframework.cloud.task.repository.TaskRepository;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.core.env.Environment;
import org.springframework.core.env.SystemEnvironmentPropertySource;
import org.springframework.core.io.Resource;
import org.springframework.core.task.TaskExecutor;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.util.StringUtils;

import java.util.*;


@Configuration
@EnableBatchProcessing
@EnableTask
public class BatchConfiguration {

    private static int BACK_OFF_LIMIT = 6;

    // Set the kuberentes job name
    private String taskName_prefix="partitionedbatchjob";

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    public JobExplorer jobExplorer;

    @Autowired
    public JobRepository jobRepository;

    @Autowired
    public TaskExecutor taskExecutor;

    @Autowired
    public TaskRepository taskRepository;

    @Autowired
    public TaskExplorer taskExplorer;

    @Autowired
    private ConfigurableApplicationContext context;

    @Autowired
    private DelegatingResourceLoader resourceLoader;

    @Autowired
    private Environment environment;

    @Bean
    @StepScope
    public Partitioner partitioner( @Value("#{stepExecution}") StepExecution stepExecution) {
        return new Partitioner() {
            @Override
            public Map<String, ExecutionContext> partition(int gridSize) {

                Map<String, ExecutionContext> partitions = new HashMap<>(gridSize);

                int targetGridSize = 0;
                String step = "";
                if(stepExecution.getStepName().equalsIgnoreCase("partitionReaderStep"))
                {
                    step = "reader";
                }
                else
                {
                    step = "processor";
                }

                targetGridSize = Integer.parseInt(stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerGridSize"));

                for (int i = 0; i < targetGridSize; i++) {
                    ExecutionContext context1 = new ExecutionContext();
                    context1.put("partitionNumber", i);

                    partitions.put("partition" + i, context1);
                }

                return partitions;
            }
        };
    }

    @Bean
    public KubernetesClient kuberentesClient()
    {
        KubernetesDeployerProperties kubernetesDeployerProperties = new KubernetesDeployerProperties();

        return KubernetesClientFactory.getKubernetesClient(kubernetesDeployerProperties);
    }


    @Bean
    @StepScope
    public TaskLauncher taskLauncher( @Value("#{stepExecution}") StepExecution stepExecution)
    {
        KubernetesDeployerProperties kubernetesDeployerProperties = new KubernetesDeployerProperties();
        kubernetesDeployerProperties.setNamespace("default");

        kubernetesDeployerProperties.setCreateJob(true);

        // Database setup to reference configmap for database info
        List<KubernetesDeployerProperties.ConfigMapKeyRef> configMapKeyRefList = new ArrayList<KubernetesDeployerProperties.ConfigMapKeyRef>();
        KubernetesDeployerProperties.ConfigMapKeyRef configMapKeyRef = new KubernetesDeployerProperties.ConfigMapKeyRef();
        configMapKeyRef.setConfigMapName("mariadb");
        configMapKeyRef.setDataKey("SPRING_DATASOURCE_URL");
        configMapKeyRef.setEnvVarName("SPRING_DATASOURCE_URL");
        configMapKeyRefList.add(configMapKeyRef);

        configMapKeyRef = new KubernetesDeployerProperties.ConfigMapKeyRef();
        configMapKeyRef.setConfigMapName("mariadb");
        configMapKeyRef.setDataKey("SPRING_DATASOURCE_USERNAME");
        configMapKeyRef.setEnvVarName("SPRING_DATASOURCE_USERNAME");
        configMapKeyRefList.add(configMapKeyRef);

        configMapKeyRef = new KubernetesDeployerProperties.ConfigMapKeyRef();
        configMapKeyRef.setConfigMapName("mariadb");
        configMapKeyRef.setDataKey("SPRING_DATASOURCE_PASSWORD");
        configMapKeyRef.setEnvVarName("SPRING_DATASOURCE_PASSWORD");
        configMapKeyRefList.add(configMapKeyRef);

        configMapKeyRef = new KubernetesDeployerProperties.ConfigMapKeyRef();
        configMapKeyRef.setConfigMapName("mariadb");
        configMapKeyRef.setDataKey("SPRING_DATASOURCE_DRIVERCLASSNAME");
        configMapKeyRef.setEnvVarName("SPRING_DATASOURCE_DRIVERCLASSNAME");
        configMapKeyRefList.add(configMapKeyRef);

        configMapKeyRef = new KubernetesDeployerProperties.ConfigMapKeyRef();
        configMapKeyRef.setConfigMapName("mariadb");
        configMapKeyRef.setDataKey("SPRING_PROFILES_ACTIVE");
        configMapKeyRef.setEnvVarName("SPRING_PROFILES_ACTIVE");
        configMapKeyRefList.add(configMapKeyRef);


        kubernetesDeployerProperties.setConfigMapKeyRefs(configMapKeyRefList);

        // Set request resource
        KubernetesDeployerProperties.RequestsResources request = new KubernetesDeployerProperties.RequestsResources();
        KubernetesDeployerProperties.LimitsResources limit = new KubernetesDeployerProperties.LimitsResources();

        String step = "";

        if(stepExecution.getStepName().equalsIgnoreCase("partitionReaderStep"))
        {
            step="reader";
        }
        else
        {
            step="processor";
        }

        request.setCpu(stepExecution.getJobExecution().getExecutionContext().getString(step + "CPURequest"));
        request.setMemory("2000Mi");


        limit.setCpu(stepExecution.getJobExecution().getExecutionContext().getString(step +"CPULimit"));
        limit.setMemory("3000Mi");


        kubernetesDeployerProperties.setRequests(request);
        kubernetesDeployerProperties.setLimits(limit);

        // as build on local image, so need to use local
        kubernetesDeployerProperties.setImagePullPolicy(ImagePullPolicy.IfNotPresent);

        // Set task launcher properties to not repeat and not restart
        KubernetesTaskLauncherProperties kubernetesTaskLauncherProperties = new KubernetesTaskLauncherProperties();

        // https://kubernetes.io/docs/concepts/workloads/controllers/job/
        // Set to never to create new pod on restart
        kubernetesTaskLauncherProperties.setBackoffLimit(BACK_OFF_LIMIT);
        kubernetesTaskLauncherProperties.setRestartPolicy(RestartPolicy.Never);
        KubernetesTaskLauncher kubernetesTaskLauncher = new KubernetesTaskLauncher(kubernetesDeployerProperties,
                kubernetesTaskLauncherProperties, kuberentesClient());

        return kubernetesTaskLauncher;
    }


    @Bean(name = "partitionedJob")
    @Profile("!worker")
    public Job partitionedJob()throws Exception {
        Random random = new Random();
        return jobBuilderFactory.get("partitionedJob" + random.nextInt())
                .start(partitionReaderStep())
                .listener(jobExecutionListener())
                .next(partitionProcessorStep())
                .build();
    }

    @Bean(name = "partitionReaderStep")
    public Step partitionReaderStep() throws Exception {

        return stepBuilderFactory.get("partitionReaderStep")
                .partitioner(workerStepReader().getName(),  partitioner( null))
                .step(workerStepReader())
                .partitionHandler(partitionHandler(
                        taskLauncher( null),
                        jobExplorer, null))
                .build();
    }

    @Bean(name = "partitionProcessorStep")
    public Step partitionProcessorStep() throws Exception {

        return stepBuilderFactory.get("partitionProcessorStep")
                .partitioner(workerStepProcessor().getName(), partitioner( null))
                .step(workerStepProcessor())
                .partitionHandler(partitionHandler(
                        taskLauncher( null),
                        jobExplorer, null))
                .build();
    }


    @Bean
    @StepScope
    public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
                                                   JobExplorer jobExplorer,
                                             @Value("#{stepExecution}") StepExecution stepExecution) throws Exception {

        String step ="processor";

        if(stepExecution.getStepName().equalsIgnoreCase("partitionReaderStep")) {
            step = "reader";
        }

        // Use local build image
        DockerResource resource = new DockerResource(stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerImage"));


        DeployerPartitionHandler partitionHandler =
                new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
                        stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerStep")
                        , taskRepository);

        // Issue https://github.com/spring-cloud/spring-cloud-task/issues/793
        // Perform the setting of execution as this partitioner now not created at task level so @beforetask is no longer vaild
        // The problem is DeployerPartitionHandler utilize annoation @BeforeTask to force task to pass in TaskExecution object as part of Task setup
        // But as this partionerHandler is now at @StepScope (instead of directly at @Bean level with @Enable Task), that setup is no longer triggered
        // Resulted created DeployerHandler faced a null

        // Below is essentially a workaround to use the current job execution id to retrieve the associated task execution id
        // From there, got that task execution and passed to deploy handler to fulfill its need of taskExecution reference
        // It seem to work, but still not clear if there is other side effect (so far during test not found any)
        long executionId = taskExplorer.getTaskExecutionIdByJobExecutionId(stepExecution.getJobExecutionId());

        System.out.println("Current execution job to task execution id " + executionId);
        TaskExecution taskExecution = taskExplorer.getTaskExecution(taskExplorer.getTaskExecutionIdByJobExecutionId(stepExecution.getJobExecutionId()));
        System.out.println("Current execution job to task execution is not null: " + (taskExecution != null));
        partitionHandler.beforeTask(taskExecution);

        List<String> commandLineArgs = new ArrayList<>(3);
        commandLineArgs.add("--spring.profiles.active=worker");
        commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
        commandLineArgs.add("--spring.batch.initializer.enabled=false");
        partitionHandler
                .setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
        partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());
        partitionHandler.setMaxWorkers(Integer.parseInt(stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerGridSize")));

            partitionHandler.setApplicationName(taskName_prefix + step);

        return partitionHandler;
    }

    @Bean
    public JobExecutionListener jobExecutionListener() {
    JobExecutionListener listener = new JobExecutionListener(){
        @Override
        public void beforeJob(JobExecution jobExecution)
        {
            jobExecution.getExecutionContext().putString("readerCPURequest", "1");
            jobExecution.getExecutionContext().putString("readerCPULimit", "2");

            jobExecution.getExecutionContext().putString("readerWorkerGridSize", "1");

            // For now using same image for reader/processor, but if it work, can split them
            jobExecution.getExecutionContext().putString("readerWorkerImage", "worker:latest");
            jobExecution.getExecutionContext().putString("readerWorkerStep", "workerStepReader");

            jobExecution.getExecutionContext().putString("processorCPURequest", "3");
            jobExecution.getExecutionContext().putString("processorCPULimit", "4");

            jobExecution.getExecutionContext().putString("processorWorkerGridSize", "2");

            // For now using same image for reader/processor, but if it work, will split them
            jobExecution.getExecutionContext().putString("processorWorkerImage", "worker:latest");
            jobExecution.getExecutionContext().putString("processorWorkerStep", "workerStepProcessor");

            System.out.println("Set readerGridSize == " + jobExecution.getExecutionContext().getString("readerGridSize", "IT IS NULL WHICH IS INCORRECT"));

        }

        @Override
        public void afterJob(JobExecution jobExecution) {
        }
    };

    return listener;
    }

    @Bean
    @Profile("worker")
    public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
        return new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);
    }

    @Bean(name = "workerStepReader")
    public Step workerStepReader() {
        return this.stepBuilderFactory.get("workerStepReader")
                .tasklet(workerTaskletReader(null))
                .build();
    }

    @Bean(name = "workerStepProcessor")
    public Step workerStepProcessor() {
        return this.stepBuilderFactory.get("workerStepProcessor")
                .tasklet(workerTaskletProcessor(null))
                .build();
    }



    @Bean
    @StepScope
    public Tasklet workerTaskletReader(
            final @Value("#{stepExecution}") StepExecution stepExecution) {

        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                Integer partitionNumber = stepExecution.getExecutionContext().getInt("partitionNumber");
                System.out.println("This workerTaskletReader ran partition: " + partitionNumber);

                return RepeatStatus.FINISHED;
            }
        };
    }

    @Bean
    @StepScope
    public Tasklet workerTaskletProcessor(
            final @Value("#{stepExecution}") StepExecution stepExecution) {

        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                Integer partitionNumber = stepExecution.getExecutionContext().getInt("partitionNumber");
                System.out.println("This workerTaskletProcessor ran partition: " + partitionNumber);

                return RepeatStatus.FINISHED;
            }
        };
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants