Skip to content

Commit

Permalink
last task execution performance (#5843)
Browse files Browse the repository at this point in the history
* Change @nested to static since many tests were not execution.

* Improve performance of task definition list retrieving last task execution for each task.

Fixes #5841
  • Loading branch information
corneil authored Jun 26, 2024
1 parent a59fd98 commit 42b6346
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -254,21 +255,36 @@ public Set<Long> getJobExecutionIdsByTaskExecutionId(long taskExecutionId, Strin
Assert.notNull(taskExplorer, "Expected TaskExplorer for " + schemaTarget);
return taskExplorer.getJobExecutionIdsByTaskExecutionId(taskExecutionId);
}

private static void add(Map<String, Set<String>> setMap, String key, String value) {
Set<String> set = setMap.computeIfAbsent(key, (v) -> new HashSet<>());
set.add(value);
}
@Override
public List<AggregateTaskExecution> getLatestTaskExecutionsByTaskNames(String... taskNames) {
List<AggregateTaskExecution> result = new ArrayList<>();
Map<String, Set<String>> targetToTaskNames = new HashMap<>();
Map<String, String> taskNamePlatform = new HashMap<>();
for (String taskName : taskNames) {
SchemaVersionTarget target = aggregateExecutionSupport.findSchemaVersionTarget(taskName, taskDefinitionReader);
String platformName = getPlatformName(taskName);
Assert.notNull(target, "Expected to find SchemaVersionTarget for " + taskName);
TaskExplorer taskExplorer = taskExplorers.get(target.getName());
Assert.notNull(taskExplorer, "Expected TaskExplorer for " + target.getName());
List<AggregateTaskExecution> taskExecutions = taskExplorer.getLatestTaskExecutionsByTaskNames(taskNames)
add(targetToTaskNames, target.getName(), taskName);
if(platformName != null) {
taskNamePlatform.put(taskName, platformName);
}
}
for(String target : targetToTaskNames.keySet()) {
Set<String> tasks = targetToTaskNames.get(target);
if(!tasks.isEmpty()) {
TaskExplorer taskExplorer = taskExplorers.get(target);
Assert.notNull(taskExplorer, "Expected TaskExplorer for " + target);
List<AggregateTaskExecution> taskExecutions = taskExplorer
.getLatestTaskExecutionsByTaskNames(tasks.toArray(new String[0]))
.stream()
.map(execution -> aggregateExecutionSupport.from(execution, target.getName(), platformName))
.map(execution -> aggregateExecutionSupport.from(execution, target, taskNamePlatform.get(execution.getTaskName())))
.collect(Collectors.toList());
result.addAll(taskExecutions);
result.addAll(taskExecutions);
}
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,14 @@
import org.springframework.cloud.dataflow.server.controller.JobStepExecutionProgressController;
import org.springframework.cloud.dataflow.server.controller.RestControllerAdvice;
import org.springframework.cloud.dataflow.server.controller.SchemaController;
import org.springframework.cloud.dataflow.server.controller.TaskDefinitionController;
import org.springframework.cloud.dataflow.server.controller.TaskExecutionController;
import org.springframework.cloud.dataflow.server.controller.TaskExecutionThinController;
import org.springframework.cloud.dataflow.server.controller.TaskLogsController;
import org.springframework.cloud.dataflow.server.controller.TaskPlatformController;
import org.springframework.cloud.dataflow.server.controller.TasksInfoController;
import org.springframework.cloud.dataflow.server.controller.assembler.DefaultTaskDefinitionAssemblerProvider;
import org.springframework.cloud.dataflow.server.controller.assembler.TaskDefinitionAssemblerProvider;
import org.springframework.cloud.dataflow.server.job.LauncherRepository;
import org.springframework.cloud.dataflow.server.repository.DataflowJobExecutionDaoContainer;
import org.springframework.cloud.dataflow.server.repository.DataflowTaskExecutionDaoContainer;
Expand Down Expand Up @@ -409,7 +412,28 @@ public TaskExecutionInfoService taskDefinitionRetriever(
composedTaskRunnerConfigurationProperties
);
}

@Bean
public TaskDefinitionAssemblerProvider taskDefinitionAssemblerProvider(
TaskExecutionService taskExecutionService,
TaskJobService taskJobService,
AggregateTaskExplorer taskExplorer,
AggregateExecutionSupport aggregateExecutionSupport
) {
return new DefaultTaskDefinitionAssemblerProvider(taskExecutionService, taskJobService, taskExplorer, aggregateExecutionSupport);
}
@Bean
public TaskDefinitionController taskDefinitionController(
AggregateTaskExplorer explorer, TaskDefinitionRepository repository,
TaskSaveService taskSaveService, TaskDeleteService taskDeleteService,
TaskDefinitionAssemblerProvider taskDefinitionAssemblerProvider
) {
return new TaskDefinitionController(explorer,
repository,
taskSaveService,
taskDeleteService,
taskDefinitionAssemblerProvider
);
}
@Bean
@Primary
public PlatformTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@
import org.springframework.cloud.dataflow.schema.AppBootSchemaVersion;
import org.springframework.cloud.dataflow.schema.SchemaVersionTarget;
import org.springframework.cloud.dataflow.schema.service.SchemaService;
import org.springframework.cloud.dataflow.server.config.DataFlowControllerAutoConfiguration;
import org.springframework.cloud.dataflow.server.config.DataflowAsyncAutoConfiguration;
import org.springframework.cloud.dataflow.server.config.apps.CommonApplicationProperties;
import org.springframework.cloud.dataflow.server.configuration.JobDependencies;
import org.springframework.cloud.dataflow.server.configuration.TestDependencies;
import org.springframework.cloud.dataflow.server.job.LauncherRepository;
import org.springframework.cloud.dataflow.server.repository.JobRepositoryContainer;
import org.springframework.cloud.dataflow.server.repository.TaskBatchDaoContainer;
Expand Down Expand Up @@ -109,9 +111,13 @@
* @author Chris Bono
* @author Corneil du Plessis
*/
@SpringBootTest(
classes = { JobDependencies.class, TaskExecutionAutoConfiguration.class, DataflowAsyncAutoConfiguration.class,
PropertyPlaceholderAutoConfiguration.class, BatchProperties.class})
@SpringBootTest(classes = {
JobDependencies.class,
TaskExecutionAutoConfiguration.class,
DataflowAsyncAutoConfiguration.class,
PropertyPlaceholderAutoConfiguration.class,
BatchProperties.class
})
@EnableConfigurationProperties({CommonApplicationProperties.class})
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@AutoConfigureTestDatabase(replace = Replace.ANY)
Expand Down Expand Up @@ -509,6 +515,13 @@ void getExecutionsByName() throws Exception {
.andExpect(jsonPath("$._embedded.taskExecutionResourceList", hasSize(2)));
}

@Test
void getDefinitionsWithLastExecution() throws Exception {
mockMvc.perform(get("/tasks/definitions").accept(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andDo(print())
.andExpect(jsonPath("$._embedded.taskDefinitionResourceList", hasSize(1)));
}
@Test
void getExecutionsByNameNotFound() throws Exception {
mockMvc.perform(get("/tasks/executions/").param("name", "BAZ").accept(MediaType.APPLICATION_JSON))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,7 @@ public abstract class DefaultTaskExecutionServiceTests {
ApplicationContext applicationContext;

@AutoConfigureTestDatabase(replace = Replace.ANY)
@Nested
public class SimpleDefaultPlatformTests extends DefaultTaskExecutionServiceTests {
public static class SimpleDefaultPlatformTests extends DefaultTaskExecutionServiceTests {

@Autowired
DataSource dataSource;
Expand Down Expand Up @@ -295,9 +294,8 @@ public void setupTest(DataSource dataSource) {
}

@AutoConfigureTestDatabase(replace = Replace.ANY)
@Nested
@TestPropertySource(properties = {"spring.cloud.dataflow.task.use-kubernetes-secrets-for-db-credentials=true"})
public class SimpleDefaultPlatformForKubernetesTests extends DefaultTaskExecutionServiceTests {
public static class SimpleDefaultPlatformForKubernetesTests extends DefaultTaskExecutionServiceTests {

@Autowired
DataSource dataSource;
Expand Down Expand Up @@ -333,8 +331,7 @@ public void executeSingleTaskDefaultsToExistingSinglePlatformTestForKubernetes()

@TestPropertySource(properties = {"spring.cloud.dataflow.task.maximum-concurrent-tasks=10"})
@AutoConfigureTestDatabase(replace = Replace.ANY)
@Nested
public class CICDTaskTests extends DefaultTaskExecutionServiceTests {
public static class CICDTaskTests extends DefaultTaskExecutionServiceTests {

private Launcher launcher;

Expand Down Expand Up @@ -817,8 +814,7 @@ public void testUpgradeFailureTaskCurrentlyRunning() throws MalformedURLExceptio

@TestPropertySource(properties = {"spring.cloud.dataflow.task.maximum-concurrent-tasks=10"})
@AutoConfigureTestDatabase(replace = Replace.ANY)
@Nested
public class SimpleTaskTests extends DefaultTaskExecutionServiceTests {
public static class SimpleTaskTests extends DefaultTaskExecutionServiceTests {

@Autowired
TaskDefinitionReader taskDefinitionReader;
Expand Down Expand Up @@ -1245,8 +1241,7 @@ public void validateNullResourceTaskTest() {

@TestPropertySource(properties = {"spring.cloud.dataflow.task.auto-create-task-definitions=true"})
@AutoConfigureTestDatabase(replace = Replace.ANY)
@Nested
public class AutoCreateTaskDefinitionTests extends DefaultTaskExecutionServiceTests {
public static class AutoCreateTaskDefinitionTests extends DefaultTaskExecutionServiceTests {

@Autowired
TaskDefinitionRepository taskDefinitionRepository;
Expand All @@ -1272,8 +1267,7 @@ public void executeTaskWithNullDefinitionCreatesDefinitionIfConfigured() {

@TestPropertySource(properties = {"spring.cloud.dataflow.applicationProperties.task.globalkey=globalvalue", "spring.cloud.dataflow.applicationProperties.stream.globalstreamkey=nothere"})
@AutoConfigureTestDatabase(replace = Replace.ANY)
@Nested
public class Boot3TaskTests extends DefaultTaskExecutionServiceTests {
public static class Boot3TaskTests extends DefaultTaskExecutionServiceTests {

public static final String TIMESTAMP_3 = "timestamp3";

Expand Down Expand Up @@ -1387,9 +1381,7 @@ public void launchBoot3WithVersion() throws IOException {
"spring.cloud.dataflow.applicationProperties.stream.globalstreamkey=nothere"
})
@AutoConfigureTestDatabase(replace = Replace.ANY)
@Nested

public class ComposedTaskTests extends DefaultTaskExecutionServiceTests {
public static class ComposedTaskTests extends DefaultTaskExecutionServiceTests {

@Autowired
TaskRepositoryContainer taskRepositoryContainer;
Expand Down Expand Up @@ -1962,8 +1954,7 @@ public void createDuplicateChildTaskComposedTask() {

@TestPropertySource(properties = {"spring.cloud.dataflow.applicationProperties.task.globalkey=globalvalue", "spring.cloud.dataflow.applicationProperties.stream.globalstreamkey=nothere", "spring.cloud.dataflow.task.useUserAccessToken=true"})
@AutoConfigureTestDatabase(replace = Replace.ANY)
@Nested
public class ComposedTaskWithSystemUseUserAccessTokenTests extends DefaultTaskExecutionServiceTests {
public static class ComposedTaskWithSystemUseUserAccessTokenTests extends DefaultTaskExecutionServiceTests {

@Autowired
TaskRepositoryContainer taskRepositoryContainer;
Expand Down

0 comments on commit 42b6346

Please sign in to comment.