Skip to content

Commit

Permalink
Merge pull request #41 from fqliao/feature-milestone2
Browse files Browse the repository at this point in the history
add report job for admin
  • Loading branch information
fqliao authored Sep 6, 2024
2 parents 34fbc9e + d3f96c6 commit f52d150
Show file tree
Hide file tree
Showing 14 changed files with 190 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
@EqualsAndHashCode(callSuper = true)
@Data
@ToString
@EqualsAndHashCode(callSuper = false)
public class CredentialRequest extends PageRequest {
private ApiCredentialDO condition = new ApiCredentialDO(true);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
@EqualsAndHashCode(callSuper = true)
@Data
@ToString
@EqualsAndHashCode(callSuper = false)
public class JupyterRequest extends PageRequest {
private JupyterInfoDO condition = new JupyterInfoDO(true);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.webank.wedpr.components.admin.response;

import java.util.List;
import lombok.Data;

/** Created by caryliao on 2024/9/5 11:28 */
@Data
public class JobReportResponse {
private Integer code;
private String msg;
private List<String> jobIdList;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
@EqualsAndHashCode(callSuper = true)
@Data
@ToString
@EqualsAndHashCode(callSuper = false)
public class JupyterInfoDO extends TimeRange {
private String id = WeDPRUuidGenerator.generateID();
private String owner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ public String toString() {
+ ", result='"
+ result
+ '\''
+ ", startTime="
+ startTime
+ ", timeCostMs="
+ timeCostMs
+ '}';
}
}
Expand Down Expand Up @@ -196,6 +200,7 @@ public String toString() {
private String parties;
// the job type
private String jobType;
private Integer reportStatus;

@JsonIgnore private transient JobType type;
@JsonIgnore private transient JobType originalJobType;
Expand Down Expand Up @@ -301,6 +306,14 @@ public String getResult() {
return result;
}

public Integer getReportStatus() {
return reportStatus;
}

public void setReportStatus(Integer reportStatus) {
this.reportStatus = reportStatus;
}

public void setResult(String result) {
this.result = result;
this.jobResult = JobResult.deserialize(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public interface ProjectMapper {
// update the project information
public int updateProjectInfo(
@Param("owner") String owner, @Param("projectDO") ProjectDO projectDO);

public int batchUpdateProjectInfo(@Param("projectDOList") List<ProjectDO> projectDOList);
// delete projects
public int deleteProjects(
@Param("owner") String owner, @Param("projectList") List<String> projectList);
Expand Down Expand Up @@ -62,8 +64,4 @@ public List<JobDO> queryFollowerJobByCondition(
@Param("condition") JobDO condition);

public List<JobDO> queryJobsByDatasetID(@Param("datasetID") String datasetID);

List<ProjectDO> queryProjectForAdmin();

ProjectDO queryProjectById(@Param("id") String id);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@

package com.webank.wedpr.components.project.service;

import com.webank.wedpr.components.project.dao.ProjectDO;
import com.webank.wedpr.components.project.model.*;
import com.webank.wedpr.core.utils.WeDPRException;
import com.webank.wedpr.core.utils.WeDPRResponse;
import java.util.List;

Expand Down Expand Up @@ -52,8 +50,4 @@ public abstract WeDPRResponse queryJobsByDatasetID(

// job kill
public abstract WeDPRResponse killJobs(String user, JobListRequest request);

List<ProjectDO> queryProjectForReport() throws WeDPRException;

ProjectDO queryProjectById(String id);
}
Original file line number Diff line number Diff line change
Expand Up @@ -479,24 +479,6 @@ public WeDPRResponse killJobs(String user, JobListRequest request) {
user, WeDPRCommonConfig.getAgency(), request.getJobs(), JobStatus.WaitToKill);
}

@Override
public List<ProjectDO> queryProjectForReport() throws WeDPRException {
PageRequest pageRequest = new PageRequest();
pageRequest.setPageNum(Constant.DEFAULT_PAGE_NUM);
pageRequest.setPageSize(Constant.DEFAULT_REPORT_PAGE_SIZE);
try (PageHelperWrapper pageHelperWrapper = new PageHelperWrapper(pageRequest)) {
return this.projectMapperWrapper.getProjectMapper().queryProjectForAdmin();
} catch (Exception e) {
logger.warn("queryProjectForAdmin exception, error: ", e);
throw new WeDPRException(e);
}
}

@Override
public ProjectDO queryProjectById(String id) {
return this.projectMapperWrapper.getProjectMapper().queryProjectById(id);
}

@Override
public Object queryJobOverview(String user, JobOverviewRequest jobOverviewRequest)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,26 @@
<if test="projectDO.label != null and projectDO.label !=''">
`label` = #{projectDO.label},
</if>
<if test="projectDO.reportStatus != null">
`report_status` = #{projectDO.reportStatus},
</if>
</set>
where `id` = #{projectDO.id}
<if test="owner != null and owner !=''">
and `owner` = #{owner}
</if>
</update>
<update id="batchUpdateProjectInfo" parameterType="java.util.List">
<foreach collection="projectDOList" item = "project" separator=";">
update `wedpr_project_table`
<set>
<if test="project.reportStatus != null">
`report_status` = #{project.reportStatus},
</if>
</set>
where `id` = #{project.id}
</foreach>
</update>
<delete id="deleteProjects" parameterType="java.util.List">
delete from `wedpr_project_table`
<foreach collection="projectList" item = "projectID" index = "index" separator=",">
Expand Down Expand Up @@ -108,6 +122,9 @@
<if test="condition.ownerAgency != null and condition.ownerAgency !=''">
and `owner_agency` = #{condition.ownerAgency}
</if>
<if test="condition.reportStatus != null">
and `report_status` = #{condition.reportStatus}
</if>
<if test="condition.startTime != null and condition.startTime != ''">
<![CDATA[ and datediff(create_time, #{condition.startTime}) >= 0]]>
</if>
Expand All @@ -119,10 +136,6 @@
order by `last_update_time` desc
</select>

<select id="queryProjectForAdmin" resultMap="ProjectDOMap">
select * from `wedpr_project_table` where `report_status` = 0
</select>

<select id="queryProjectCount" resultType="java.lang.Long">
select count(1) from `wedpr_project_table` where 1 = 1
<choose>
Expand Down Expand Up @@ -199,6 +212,9 @@
<if test="condition.jobType != null and condition.jobType !=''">
and `job_type` = #{condition.jobType}
</if>
<if test="condition.reportStatus != null">
and `report_status` = #{condition.reportStatus}
</if>
<if test="condition.startTime != null and condition.startTime != ''">
<![CDATA[ and datediff(create_time, #{condition.startTime}) >= 0]]>
</if>
Expand Down Expand Up @@ -359,11 +375,6 @@
on job_table.id = follower_table.resource_id where job_table.owner is not NULL
</select>

<select id="queryProjectById" resultMap="ProjectDOMap">
select * from `wedpr_project_table` where `id` = #{id}
</select>


<update id="batchUpdateJobInfo" parameterType="java.util.List">
<foreach collection="jobDOList" item = "job" separator=";">
update `wedpr_job_table`
Expand All @@ -377,6 +388,9 @@
<if test="job.result != null and job.result !=''">
`job_result` = #{job.result},
</if>
<if test="job.reportStatus != null">
`report_status` = #{job.reportStatus},
</if>
</set>
where `id` = #{job.id}
<if test="job.owner != null and job.owner !=''">
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.webank.wedpr.components.report.handler;

import com.webank.wedpr.components.project.dao.JobDO;
import com.webank.wedpr.components.project.dao.ProjectMapper;
import com.webank.wedpr.components.transport.Transport;
import com.webank.wedpr.components.transport.model.Message;
import com.webank.wedpr.core.utils.Constant;
import com.webank.wedpr.core.utils.ObjectMapperFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;

/** Created by caryliao on 2024/9/4 10:54 */
@Slf4j
public class JobReportMessageHandler implements Transport.MessageHandler {
private ProjectMapper projectMapper;

public JobReportMessageHandler(ProjectMapper projectMapper) {
this.projectMapper = projectMapper;
}

@Override
public void call(Message msg) {
byte[] payload = msg.getPayload();
try {
JobReportResponse jobReportResponse =
ObjectMapperFactory.getObjectMapper()
.readValue(payload, JobReportResponse.class);
if (Constant.WEDPR_SUCCESS == jobReportResponse.getCode()) {
// report ok ,then set report status to 1
List<String> jobIdList = jobReportResponse.getJobIdList();
ArrayList<JobDO> jobDOList = new ArrayList<>();
for (String jobId : jobIdList) {
JobDO jobDO = new JobDO();
jobDO.setId(jobId);
jobDO.setReportStatus(1);
jobDOList.add(jobDO);
}
projectMapper.batchUpdateJobInfo(jobDOList);
} else {
log.warn("report job error:{}", jobReportResponse);
}
} catch (IOException e) {
log.warn("handle JobReportResponse error", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.webank.wedpr.components.report.handler;

import java.util.List;
import lombok.Data;
import lombok.ToString;

/** Created by caryliao on 2024/9/5 11:28 */
@Data
@ToString
public class JobReportResponse {
private Integer code;
private String msg;
private List<String> jobIdList;
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,49 @@
package com.webank.wedpr.components.report.handler;

import com.webank.wedpr.components.project.dao.ProjectDO;
import com.webank.wedpr.components.project.dao.ProjectMapper;
import com.webank.wedpr.components.transport.Transport;
import com.webank.wedpr.components.transport.model.Message;
import com.webank.wedpr.core.utils.Constant;
import com.webank.wedpr.core.utils.ObjectMapperFactory;
import com.webank.wedpr.core.utils.WeDPRException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;

/** Created by caryliao on 2024/9/4 10:54 */
@Slf4j
public class ProjectReportMessageHandler implements Transport.MessageHandler {
private ProjectMapper projectMapper;

public ProjectReportMessageHandler(ProjectMapper projectMapper) {
this.projectMapper = projectMapper;
}

@Override
public void call(Message msg) throws WeDPRException {
byte[] payload = msg.getPayload();
try {
ProjectReportResponse projectReportResponse =
ObjectMapperFactory.getObjectMapper()
.readValue(payload, ProjectReportResponse.class);
if (Constant.WEDPR_SUCCESS == projectReportResponse.getCode()) {
// report ok ,then set report status to 1
List<String> projectIdList = projectReportResponse.getProjectIdList();
ArrayList<ProjectDO> projectDOList = new ArrayList<>();
for (String projectId : projectIdList) {
ProjectDO projectDO = new ProjectDO();
projectDO.setId(projectId);
projectDO.setReportStatus(1);
projectDOList.add(projectDO);
}
projectMapper.batchUpdateProjectInfo(projectDOList);
} else {
log.warn("report project error:{}", projectReportResponse);
}
} catch (IOException e) {
log.warn("handle ProjectReportResponse error", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.webank.wedpr.components.report.handler;

import java.util.List;
import lombok.Data;
import lombok.ToString;

/** Created by caryliao on 2024/9/5 11:28 */
@Data
@ToString
public class ProjectReportResponse {
private Integer code;
private String msg;
private List<String> projectIdList;
}
Loading

0 comments on commit f52d150

Please sign in to comment.