diff --git a/src/main/java/fr/simplex_software/tests/partioned_job/config/JobConfig.java b/src/main/java/fr/simplex_software/tests/partioned_job/config/JobConfig.java index f862bbb..2d9bce3 100644 --- a/src/main/java/fr/simplex_software/tests/partioned_job/config/JobConfig.java +++ b/src/main/java/fr/simplex_software/tests/partioned_job/config/JobConfig.java @@ -4,6 +4,7 @@ import org.springframework.batch.core.*; import org.springframework.batch.core.configuration.annotation.*; import org.springframework.batch.core.explore.*; +import org.springframework.batch.core.launch.support.*; import org.springframework.batch.core.partition.*; import org.springframework.batch.core.partition.support.*; import org.springframework.batch.core.repository.*; @@ -19,6 +20,7 @@ import org.springframework.context.annotation.*; import org.springframework.core.env.*; import org.springframework.core.io.*; +import org.springframework.util.*; import javax.sql.*; import java.util.*; @@ -37,21 +39,31 @@ public class JobConfig @Autowired public JobRepository jobRepository; @Autowired + public JobExplorer jobExplorer; + @Autowired private ConfigurableApplicationContext context; @Autowired private DelegatingResourceLoader resourceLoader; @Autowired private Environment environment; + @Autowired + public TaskExplorer taskExplorer; + @Autowired + public TaskLauncher taskLauncher; + @Autowired + public TaskRepository taskRepository; @Bean - public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer, TaskRepository taskRepository) + @StepScope + public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer, @Value("#{stepName}") String stepName, @Value("#{stepExecution}") StepExecution stepExecution) { - log.info ("### PartitionedJobApp.partitionHandler(): Entry tasklauncher {}, jobExplorer {}, taskRepository {}", taskLauncher, jobExplorer, taskRepository); Resource resource = this.resourceLoader .getResource("maven://fr.simplex_software.tests:partitioned-job:1.0-SNAPSHOT"); - log.info ("### PartitionedJobApp.partitionHandler(): Got resource {}", resource); DeployerPartitionHandler partitionHandler = - new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep"); + new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, + /*stepExecution.getJobExecution().getExecutionContext().getString("workerStep")*/stepName, taskRepository); + TaskExecution taskExecution = taskExplorer.getTaskExecution(taskExplorer.getTaskExecutionIdByJobExecutionId(stepExecution.getJobExecutionId())); + partitionHandler.beforeTask(taskExecution); List commandLineArgs = new ArrayList<>(3); commandLineArgs.add("--spring.profiles.active=worker"); commandLineArgs.add("--spring.cloud.task.initialize-enabled=false"); @@ -62,14 +74,12 @@ public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer .setEnvironmentVariablesProvider(new SimpleEnvironmentVariablesProvider(this.environment)); partitionHandler.setMaxWorkers(2); partitionHandler.setApplicationName("PartitionedBatchJobTask"); - log.info ("### PartitionedJobApp.partitionHandler(): Exit partitionHandler {}", partitionHandler); return partitionHandler; } @Bean public Partitioner partitioner() { - log.info ("### PartitionedJobApp.partitioner(): Entry"); return gridSize -> { Map partitions = new HashMap<>(gridSize); @@ -79,7 +89,6 @@ public Partitioner partitioner() context1.put("partitionNumber", i); partitions.put("partition" + i, context1); } - log.info ("### PartitionedJobApp.partitioner(): Exit partitions {}", partitions); return partitions; }; } @@ -88,8 +97,7 @@ public Partitioner partitioner() @Profile("worker") public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) { - log.info ("### PartitionedJobApp.stepExecutionHandler(): Entry jobExplorer {}", jobExplorer); - return new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository); + return new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository); } @Bean @@ -98,39 +106,35 @@ public Tasklet workerTasklet(final @Value("#{stepExecutionContext['partitionNumb { return (contribution, chunkContext) -> { - log.info("### PartitionedJobApp.workerTasklet(): This tasklet ran partition: {}", partitionNumber); - return RepeatStatus.FINISHED; + return RepeatStatus.FINISHED; }; } @Bean - public Step step1(PartitionHandler partitionHandler) + public Step step1() { - log.info("### PartitionedJobApp.step1(): Entry"); return this.stepBuilderFactory.get("step1") .partitioner(workerStep().getName(), partitioner()) .step(workerStep()) - .partitionHandler(partitionHandler) + .partitionHandler(partitionHandler(taskLauncher, jobExplorer, "workerStep",null)) .build(); } @Bean public Step workerStep() { - log.info("### PartitionedJobApp.workerStep(): Entry"); - return this.stepBuilderFactory.get("workerStep") + return this.stepBuilderFactory.get("workerStep") .tasklet(workerTasklet(null)) .build(); } @Bean @Profile("!worker") - public Job partitionedJob(PartitionHandler partitionHandler) + public Job partitionedJob() { - log.info("### PartitionedJobApp.partitionedJob(): Entry partitionHandler {}", partitionHandler); - Random random = new Random(); - return this.jobBuilderFactory.get("partitionedJob" + random.nextInt()) - .start(step1(partitionHandler)) + return this.jobBuilderFactory.get("partitionedJob") + .incrementer(new RunIdIncrementer()) + .start(step1()) .build(); } }