Skip to content

Commit

Permalink
Have applied the danilko workaround
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolasduminil committed Aug 10, 2021
1 parent 21975bc commit c664fb9
Showing 1 changed file with 25 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.*;
import org.springframework.batch.core.explore.*;
import org.springframework.batch.core.launch.support.*;
import org.springframework.batch.core.partition.*;
import org.springframework.batch.core.partition.support.*;
import org.springframework.batch.core.repository.*;
Expand All @@ -19,6 +20,7 @@
import org.springframework.context.annotation.*;
import org.springframework.core.env.*;
import org.springframework.core.io.*;
import org.springframework.util.*;

import javax.sql.*;
import java.util.*;
Expand All @@ -37,21 +39,31 @@ public class JobConfig
@Autowired
public JobRepository jobRepository;
@Autowired
public JobExplorer jobExplorer;
@Autowired
private ConfigurableApplicationContext context;
@Autowired
private DelegatingResourceLoader resourceLoader;
@Autowired
private Environment environment;
@Autowired
public TaskExplorer taskExplorer;
@Autowired
public TaskLauncher taskLauncher;
@Autowired
public TaskRepository taskRepository;

@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer, TaskRepository taskRepository)
@StepScope
public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer, @Value("#{stepName}") String stepName, @Value("#{stepExecution}") StepExecution stepExecution)
{
log.info ("### PartitionedJobApp.partitionHandler(): Entry tasklauncher {}, jobExplorer {}, taskRepository {}", taskLauncher, jobExplorer, taskRepository);
Resource resource = this.resourceLoader
.getResource("maven://fr.simplex_software.tests:partitioned-job:1.0-SNAPSHOT");
log.info ("### PartitionedJobApp.partitionHandler(): Got resource {}", resource);
DeployerPartitionHandler partitionHandler =
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep");
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
/*stepExecution.getJobExecution().getExecutionContext().getString("workerStep")*/stepName, taskRepository);
TaskExecution taskExecution = taskExplorer.getTaskExecution(taskExplorer.getTaskExecutionIdByJobExecutionId(stepExecution.getJobExecutionId()));
partitionHandler.beforeTask(taskExecution);
List<String> commandLineArgs = new ArrayList<>(3);
commandLineArgs.add("--spring.profiles.active=worker");
commandLineArgs.add("--spring.cloud.task.initialize-enabled=false");
Expand All @@ -62,14 +74,12 @@ public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer
.setEnvironmentVariablesProvider(new SimpleEnvironmentVariablesProvider(this.environment));
partitionHandler.setMaxWorkers(2);
partitionHandler.setApplicationName("PartitionedBatchJobTask");
log.info ("### PartitionedJobApp.partitionHandler(): Exit partitionHandler {}", partitionHandler);
return partitionHandler;
}

@Bean
public Partitioner partitioner()
{
log.info ("### PartitionedJobApp.partitioner(): Entry");
return gridSize ->
{
Map<String, ExecutionContext> partitions = new HashMap<>(gridSize);
Expand All @@ -79,7 +89,6 @@ public Partitioner partitioner()
context1.put("partitionNumber", i);
partitions.put("partition" + i, context1);
}
log.info ("### PartitionedJobApp.partitioner(): Exit partitions {}", partitions);
return partitions;
};
}
Expand All @@ -88,8 +97,7 @@ public Partitioner partitioner()
@Profile("worker")
public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer)
{
log.info ("### PartitionedJobApp.stepExecutionHandler(): Entry jobExplorer {}", jobExplorer);
return new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);
return new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);
}

@Bean
Expand All @@ -98,39 +106,35 @@ public Tasklet workerTasklet(final @Value("#{stepExecutionContext['partitionNumb
{
return (contribution, chunkContext) ->
{
log.info("### PartitionedJobApp.workerTasklet(): This tasklet ran partition: {}", partitionNumber);
return RepeatStatus.FINISHED;
return RepeatStatus.FINISHED;
};
}

@Bean
public Step step1(PartitionHandler partitionHandler)
public Step step1()
{
log.info("### PartitionedJobApp.step1(): Entry");
return this.stepBuilderFactory.get("step1")
.partitioner(workerStep().getName(), partitioner())
.step(workerStep())
.partitionHandler(partitionHandler)
.partitionHandler(partitionHandler(taskLauncher, jobExplorer, "workerStep",null))
.build();
}

@Bean
public Step workerStep()
{
log.info("### PartitionedJobApp.workerStep(): Entry");
return this.stepBuilderFactory.get("workerStep")
return this.stepBuilderFactory.get("workerStep")
.tasklet(workerTasklet(null))
.build();
}

@Bean
@Profile("!worker")
public Job partitionedJob(PartitionHandler partitionHandler)
public Job partitionedJob()
{
log.info("### PartitionedJobApp.partitionedJob(): Entry partitionHandler {}", partitionHandler);
Random random = new Random();
return this.jobBuilderFactory.get("partitionedJob" + random.nextInt())
.start(step1(partitionHandler))
return this.jobBuilderFactory.get("partitionedJob")
.incrementer(new RunIdIncrementer())
.start(step1())
.build();
}
}

0 comments on commit c664fb9

Please sign in to comment.