Skip to content

Commit

Permalink
Added query for composed-task-runner status. (#5792)
Browse files Browse the repository at this point in the history
* Added query for composed-task-runner status.
#Fixes 5782

* Added index on task_execution parent_execution_id.
Added check for taskExecutionStatus on task execution list.

* Fix ctr status query.

* Reduce minimum improvement to 2 times.
  • Loading branch information
corneil authored May 6, 2024
1 parent 872ee4c commit 6f97589
Show file tree
Hide file tree
Showing 29 changed files with 241 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ public AggregateTaskExecution mapRow(ResultSet rs, int rowNum) throws SQLExcepti
rs.getString("EXTERNAL_EXECUTION_ID"),
parentExecutionId,
null,
null,
schemaTarget
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public AggregateTaskExecution from(TaskExecution execution, String schemaTarget,
execution.getExternalExecutionId(),
execution.getParentExecutionId(),
platformName,
null,
schemaTarget);
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public class TaskExecutionThinResource extends RepresentationModel<TaskExecution

private String taskExecutionStatus;

private String composedTaskJobExecutionStatus;

/**
* @since 2.11.0
*/
Expand All @@ -94,6 +96,7 @@ public TaskExecutionThinResource(AggregateTaskExecution aggregateTaskExecution)
this.exitCode = aggregateTaskExecution.getExitCode();
this.exitMessage = aggregateTaskExecution.getExitMessage();
this.errorMessage = aggregateTaskExecution.getErrorMessage();
this.composedTaskJobExecutionStatus = aggregateTaskExecution.getCtrTaskStatus();
}

public long getExecutionId() {
Expand Down Expand Up @@ -187,6 +190,14 @@ public void setTaskExecutionStatus(String taskExecutionStatus) {
this.taskExecutionStatus = taskExecutionStatus;
}

public String getComposedTaskJobExecutionStatus() {
return composedTaskJobExecutionStatus;
}

public void setComposedTaskJobExecutionStatus(String composedTaskJobExecutionStatus) {
this.composedTaskJobExecutionStatus = composedTaskJobExecutionStatus;
}

/**
* Returns the calculated status of this {@link TaskExecution}.
*
Expand All @@ -211,7 +222,12 @@ public TaskExecutionStatus getTaskExecutionStatus() {
if (this.endTime == null) {
return TaskExecutionStatus.RUNNING;
}

if (this.composedTaskJobExecutionStatus != null) {
return (this.composedTaskJobExecutionStatus.equals("ABANDONED") ||
this.composedTaskJobExecutionStatus.equals("FAILED") ||
this.composedTaskJobExecutionStatus.equals("STOPPED")) ?
TaskExecutionStatus.ERROR : TaskExecutionStatus.COMPLETE;
}
return (this.exitCode == null) ? TaskExecutionStatus.RUNNING :
((this.exitCode == 0) ? TaskExecutionStatus.COMPLETE : TaskExecutionStatus.ERROR);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public class AggregateTaskExecution {
private String schemaTarget;

private String platformName;

private String ctrTaskStatus;
/**
* The arguments that were used for this task execution.
*/
Expand All @@ -92,7 +94,8 @@ public AggregateTaskExecution() {

public AggregateTaskExecution(long executionId, Integer exitCode, String taskName,
Date startTime, Date endTime, String exitMessage, List<String> arguments,
String errorMessage, String externalExecutionId, Long parentExecutionId, String platformName, String schemaTarget) {
String errorMessage, String externalExecutionId, Long parentExecutionId, String platformName,
String ctrTaskStatus, String schemaTarget) {

Assert.notNull(arguments, "arguments must not be null");
this.executionId = executionId;
Expand All @@ -107,14 +110,15 @@ public AggregateTaskExecution(long executionId, Integer exitCode, String taskNam
this.parentExecutionId = parentExecutionId;
this.schemaTarget = schemaTarget;
this.platformName = platformName;
this.ctrTaskStatus = ctrTaskStatus;
}

public AggregateTaskExecution(long executionId, Integer exitCode, String taskName,
Date startTime, Date endTime, String exitMessage, List<String> arguments,
String errorMessage, String externalExecutionId, String platformName, String schemaTarget) {
String errorMessage, String externalExecutionId, String platformName, String ctrTaskStatus, String schemaTarget) {

this(executionId, exitCode, taskName, startTime, endTime, exitMessage, arguments,
errorMessage, externalExecutionId, null, platformName, schemaTarget);
errorMessage, externalExecutionId, null, platformName, ctrTaskStatus, schemaTarget);
}

public long getExecutionId() {
Expand Down Expand Up @@ -209,22 +213,31 @@ public void setPlatformName(String platformName) {
this.platformName = platformName;
}

public String getCtrTaskStatus() {
return ctrTaskStatus;
}

public void setCtrTaskStatus(String ctrTaskStatus) {
this.ctrTaskStatus = ctrTaskStatus;
}

@Override
public String toString() {
return "AggregateTaskExecution{" +
"executionId=" + executionId +
", parentExecutionId=" + parentExecutionId +
", exitCode=" + exitCode +
", taskName='" + taskName + '\'' +
", startTime=" + startTime +
", endTime=" + endTime +
", exitMessage='" + exitMessage + '\'' +
", externalExecutionId='" + externalExecutionId + '\'' +
", errorMessage='" + errorMessage + '\'' +
", schemaTarget='" + schemaTarget + '\'' +
", platformName='" + platformName + '\'' +
", arguments=" + arguments +
'}';
"executionId=" + executionId +
", parentExecutionId=" + parentExecutionId +
", exitCode=" + exitCode +
", taskName='" + taskName + '\'' +
", startTime=" + startTime +
", endTime=" + endTime +
", exitMessage='" + exitMessage + '\'' +
", externalExecutionId='" + externalExecutionId + '\'' +
", errorMessage='" + errorMessage + '\'' +
", schemaTarget='" + schemaTarget + '\'' +
", platformName='" + platformName + '\'' +
", ctrTaskStatus='" + ctrTaskStatus + '\'' +
", arguments=" + arguments +
'}';
}

public TaskExecution toTaskExecution() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,8 @@ public TaskExecutionController taskExecutionController(
}

@Bean
public TaskExecutionThinController taskExecutionThinController(AggregateTaskExplorer aggregateTaskExplorer) {
return new TaskExecutionThinController(aggregateTaskExplorer);
public TaskExecutionThinController taskExecutionThinController(AggregateTaskExplorer aggregateTaskExplorer, TaskJobService taskJobService) {
return new TaskExecutionThinController(aggregateTaskExplorer, taskJobService);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.springframework.cloud.dataflow.aggregate.task.AggregateTaskExplorer;
import org.springframework.cloud.dataflow.rest.resource.TaskExecutionThinResource;
import org.springframework.cloud.dataflow.schema.AggregateTaskExecution;
import org.springframework.cloud.dataflow.server.service.TaskJobService;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.web.PagedResourcesAssembler;
import org.springframework.hateoas.PagedModel;
Expand All @@ -26,6 +28,8 @@
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;

Expand All @@ -44,15 +48,29 @@ public class TaskExecutionThinController {
private final AggregateTaskExplorer explorer;
private final TaskExecutionThinResourceAssembler resourceAssembler;

public TaskExecutionThinController(AggregateTaskExplorer explorer) {
private final TaskJobService taskJobService;

public TaskExecutionThinController(AggregateTaskExplorer explorer, TaskJobService taskJobService) {
this.explorer = explorer;
this.taskJobService = taskJobService;
this.resourceAssembler = new TaskExecutionThinResourceAssembler();
}

@GetMapping(produces = "application/json")
@ResponseStatus(HttpStatus.OK)
public PagedModel<TaskExecutionThinResource> listTasks(Pageable pageable, PagedResourcesAssembler<AggregateTaskExecution> pagedAssembler) {
return pagedAssembler.toModel(explorer.findAll(pageable, true), resourceAssembler);
Page<AggregateTaskExecution> page = explorer.findAll(pageable, true);
taskJobService.populateComposeTaskRunnerStatus(page.getContent());
return pagedAssembler.toModel(page, resourceAssembler);
}

@RequestMapping(value = "", method = RequestMethod.GET, params = "name")
@ResponseStatus(HttpStatus.OK)
public PagedModel<TaskExecutionThinResource> retrieveTasksByName(@RequestParam("name") String taskName,
Pageable pageable, PagedResourcesAssembler<AggregateTaskExecution> pagedAssembler) {
Page<AggregateTaskExecution> page = this.explorer.findTaskExecutionsByName(taskName, pageable);
taskJobService.populateComposeTaskRunnerStatus(page.getContent());
return pagedAssembler.toModel(page, resourceAssembler);
}

static class TaskExecutionThinResourceAssembler extends RepresentationModelAssemblerSupport<AggregateTaskExecution, TaskExecutionThinResource> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.dataflow.server.db.migration;

import java.util.Arrays;
import java.util.List;

import org.springframework.cloud.dataflow.common.flyway.AbstractMigration;
import org.springframework.cloud.dataflow.common.flyway.SqlCommand;

/**
* Provide indexes to improve performance of finding child tasks.
* @author Corneil du Plessis
*/
public abstract class AbstractCreateTaskParentIndexMigration extends AbstractMigration {
protected static final String CREATE_TASK_PARENT_INDEX =
"create index TASK_EXECUTION_PARENT_IX on TASK_EXECUTION(PARENT_EXECUTION_ID)";
protected static final String CREATE_BOOT3_TASK_PARENT_INDEX =
"create index BOOT3_TASK_EXECUTION_PARENT_IX on BOOT3_TASK_EXECUTION(PARENT_EXECUTION_ID)";

public AbstractCreateTaskParentIndexMigration() {
super(null);
}

@Override
public List<SqlCommand> getCommands() {
return Arrays.asList(
SqlCommand.from(CREATE_TASK_PARENT_INDEX),
SqlCommand.from(CREATE_BOOT3_TASK_PARENT_INDEX)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.springframework.cloud.dataflow.server.db.migration.db2;

import org.springframework.cloud.dataflow.server.db.migration.AbstractCreateTaskParentIndexMigration;

public class V11__CreateTaskParentIndex extends AbstractCreateTaskParentIndexMigration {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.springframework.cloud.dataflow.server.db.migration.mariadb;

import org.springframework.cloud.dataflow.server.db.migration.AbstractCreateTaskParentIndexMigration;

public class V12__CreateTaskParentIndex extends AbstractCreateTaskParentIndexMigration {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.springframework.cloud.dataflow.server.db.migration.mysql;

import org.springframework.cloud.dataflow.server.db.migration.AbstractCreateTaskParentIndexMigration;

public class V12__CreateTaskParentIndex extends AbstractCreateTaskParentIndexMigration {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.springframework.cloud.dataflow.server.db.migration.oracle;

import org.springframework.cloud.dataflow.server.db.migration.AbstractCreateTaskParentIndexMigration;

public class V12__CreateTaskParentIndex extends AbstractCreateTaskParentIndexMigration {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.springframework.cloud.dataflow.server.db.migration.postgresql;

import org.springframework.cloud.dataflow.server.db.migration.AbstractCreateTaskParentIndexMigration;

public class V13__CreateTaskParentIndex extends AbstractCreateTaskParentIndexMigration {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.springframework.cloud.dataflow.server.db.migration.sqlserver;

import org.springframework.cloud.dataflow.server.db.migration.AbstractCreateTaskParentIndexMigration;

public class V11__CreateTaskParentIndex extends AbstractCreateTaskParentIndexMigration {

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
package org.springframework.cloud.dataflow.server.repository;


import java.util.Collection;
import java.util.Date;
import java.util.List;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobInstance;
Expand All @@ -26,6 +26,7 @@
import org.springframework.batch.core.launch.NoSuchJobInstanceException;
import org.springframework.cloud.dataflow.rest.job.JobInstanceExecutions;
import org.springframework.cloud.dataflow.rest.job.TaskJobExecution;
import org.springframework.cloud.dataflow.schema.AggregateTaskExecution;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;

Expand Down Expand Up @@ -60,4 +61,6 @@ public interface AggregateJobQueryDao {

JobInstance getJobInstance(long id, String schemaTarget) throws NoSuchJobInstanceException;

void populateCtrStatus(Collection<AggregateTaskExecution> aggregateTaskExecutions);

}
Loading

0 comments on commit 6f97589

Please sign in to comment.