diff --git a/wedpr-adm/src/main/java/com/webank/wedpr/adm/controller/request/CredentialRequest.java b/wedpr-adm/src/main/java/com/webank/wedpr/adm/controller/request/CredentialRequest.java index 6d0b427a..bf866739 100644 --- a/wedpr-adm/src/main/java/com/webank/wedpr/adm/controller/request/CredentialRequest.java +++ b/wedpr-adm/src/main/java/com/webank/wedpr/adm/controller/request/CredentialRequest.java @@ -23,7 +23,6 @@ @EqualsAndHashCode(callSuper = true) @Data @ToString -@EqualsAndHashCode(callSuper = false) public class CredentialRequest extends PageRequest { private ApiCredentialDO condition = new ApiCredentialDO(true); } diff --git a/wedpr-adm/src/main/java/com/webank/wedpr/adm/controller/request/JupyterRequest.java b/wedpr-adm/src/main/java/com/webank/wedpr/adm/controller/request/JupyterRequest.java index fdf08f31..2bb577f3 100644 --- a/wedpr-adm/src/main/java/com/webank/wedpr/adm/controller/request/JupyterRequest.java +++ b/wedpr-adm/src/main/java/com/webank/wedpr/adm/controller/request/JupyterRequest.java @@ -23,7 +23,6 @@ @EqualsAndHashCode(callSuper = true) @Data @ToString -@EqualsAndHashCode(callSuper = false) public class JupyterRequest extends PageRequest { private JupyterInfoDO condition = new JupyterInfoDO(true); } diff --git a/wedpr-components/admin/src/main/java/com/webank/wedpr/components/admin/response/JobReportResponse.java b/wedpr-components/admin/src/main/java/com/webank/wedpr/components/admin/response/JobReportResponse.java new file mode 100644 index 00000000..b6790026 --- /dev/null +++ b/wedpr-components/admin/src/main/java/com/webank/wedpr/components/admin/response/JobReportResponse.java @@ -0,0 +1,12 @@ +package com.webank.wedpr.components.admin.response; + +import java.util.List; +import lombok.Data; + +/** Created by caryliao on 2024/9/5 11:28 */ +@Data +public class JobReportResponse { + private Integer code; + private String msg; + private List jobIdList; +} diff --git a/wedpr-components/env-integration/jupyter/src/main/java/com/webank/wedpr/components/integration/jupyter/dao/JupyterInfoDO.java b/wedpr-components/env-integration/jupyter/src/main/java/com/webank/wedpr/components/integration/jupyter/dao/JupyterInfoDO.java index ba25f088..7b316414 100644 --- a/wedpr-components/env-integration/jupyter/src/main/java/com/webank/wedpr/components/integration/jupyter/dao/JupyterInfoDO.java +++ b/wedpr-components/env-integration/jupyter/src/main/java/com/webank/wedpr/components/integration/jupyter/dao/JupyterInfoDO.java @@ -26,7 +26,6 @@ @EqualsAndHashCode(callSuper = true) @Data @ToString -@EqualsAndHashCode(callSuper = false) public class JupyterInfoDO extends TimeRange { private String id = WeDPRUuidGenerator.generateID(); private String owner; diff --git a/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/dao/JobDO.java b/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/dao/JobDO.java index 3282ea47..606f887a 100644 --- a/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/dao/JobDO.java +++ b/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/dao/JobDO.java @@ -104,6 +104,10 @@ public String toString() { + ", result='" + result + '\'' + + ", startTime=" + + startTime + + ", timeCostMs=" + + timeCostMs + '}'; } } @@ -196,6 +200,7 @@ public String toString() { private String parties; // the job type private String jobType; + private Integer reportStatus; @JsonIgnore private transient JobType type; @JsonIgnore private transient JobType originalJobType; @@ -301,6 +306,14 @@ public String getResult() { return result; } + public Integer getReportStatus() { + return reportStatus; + } + + public void setReportStatus(Integer reportStatus) { + this.reportStatus = reportStatus; + } + public void setResult(String result) { this.result = result; this.jobResult = JobResult.deserialize(result); diff --git a/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/dao/ProjectMapper.java b/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/dao/ProjectMapper.java index 52fd9755..567e7286 100644 --- a/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/dao/ProjectMapper.java +++ b/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/dao/ProjectMapper.java @@ -27,6 +27,8 @@ public interface ProjectMapper { // update the project information public int updateProjectInfo( @Param("owner") String owner, @Param("projectDO") ProjectDO projectDO); + + public int batchUpdateProjectInfo(@Param("projectDOList") List projectDOList); // delete projects public int deleteProjects( @Param("owner") String owner, @Param("projectList") List projectList); @@ -62,8 +64,4 @@ public List queryFollowerJobByCondition( @Param("condition") JobDO condition); public List queryJobsByDatasetID(@Param("datasetID") String datasetID); - - List queryProjectForAdmin(); - - ProjectDO queryProjectById(@Param("id") String id); } diff --git a/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/service/ProjectService.java b/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/service/ProjectService.java index 42ff7c2d..67e6d629 100644 --- a/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/service/ProjectService.java +++ b/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/service/ProjectService.java @@ -15,9 +15,7 @@ package com.webank.wedpr.components.project.service; -import com.webank.wedpr.components.project.dao.ProjectDO; import com.webank.wedpr.components.project.model.*; -import com.webank.wedpr.core.utils.WeDPRException; import com.webank.wedpr.core.utils.WeDPRResponse; import java.util.List; @@ -52,8 +50,4 @@ public abstract WeDPRResponse queryJobsByDatasetID( // job kill public abstract WeDPRResponse killJobs(String user, JobListRequest request); - - List queryProjectForReport() throws WeDPRException; - - ProjectDO queryProjectById(String id); } diff --git a/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/service/impl/ProjectServiceImpl.java b/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/service/impl/ProjectServiceImpl.java index 0072b918..7e8ef565 100644 --- a/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/service/impl/ProjectServiceImpl.java +++ b/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/service/impl/ProjectServiceImpl.java @@ -479,24 +479,6 @@ public WeDPRResponse killJobs(String user, JobListRequest request) { user, WeDPRCommonConfig.getAgency(), request.getJobs(), JobStatus.WaitToKill); } - @Override - public List queryProjectForReport() throws WeDPRException { - PageRequest pageRequest = new PageRequest(); - pageRequest.setPageNum(Constant.DEFAULT_PAGE_NUM); - pageRequest.setPageSize(Constant.DEFAULT_REPORT_PAGE_SIZE); - try (PageHelperWrapper pageHelperWrapper = new PageHelperWrapper(pageRequest)) { - return this.projectMapperWrapper.getProjectMapper().queryProjectForAdmin(); - } catch (Exception e) { - logger.warn("queryProjectForAdmin exception, error: ", e); - throw new WeDPRException(e); - } - } - - @Override - public ProjectDO queryProjectById(String id) { - return this.projectMapperWrapper.getProjectMapper().queryProjectById(id); - } - @Override public Object queryJobOverview(String user, JobOverviewRequest jobOverviewRequest) throws Exception { diff --git a/wedpr-components/meta/project/src/main/resources/mapper/ProjectMapper.xml b/wedpr-components/meta/project/src/main/resources/mapper/ProjectMapper.xml index e36b50b8..298a92ae 100644 --- a/wedpr-components/meta/project/src/main/resources/mapper/ProjectMapper.xml +++ b/wedpr-components/meta/project/src/main/resources/mapper/ProjectMapper.xml @@ -62,12 +62,26 @@ `label` = #{projectDO.label}, + + `report_status` = #{projectDO.reportStatus}, + where `id` = #{projectDO.id} and `owner` = #{owner} + + + update `wedpr_project_table` + + + `report_status` = #{project.reportStatus}, + + + where `id` = #{project.id} + + delete from `wedpr_project_table` @@ -108,6 +122,9 @@ and `owner_agency` = #{condition.ownerAgency} + + and `report_status` = #{condition.reportStatus} + = 0]]> @@ -119,10 +136,6 @@ order by `last_update_time` desc - - - - - update `wedpr_job_table` @@ -377,6 +388,9 @@ `job_result` = #{job.result}, + + `report_status` = #{job.reportStatus}, + where `id` = #{job.id} diff --git a/wedpr-components/report/src/main/java/com/webank/wedpr/components/report/handler/JobReportMessageHandler.java b/wedpr-components/report/src/main/java/com/webank/wedpr/components/report/handler/JobReportMessageHandler.java new file mode 100644 index 00000000..d5555ab1 --- /dev/null +++ b/wedpr-components/report/src/main/java/com/webank/wedpr/components/report/handler/JobReportMessageHandler.java @@ -0,0 +1,48 @@ +package com.webank.wedpr.components.report.handler; + +import com.webank.wedpr.components.project.dao.JobDO; +import com.webank.wedpr.components.project.dao.ProjectMapper; +import com.webank.wedpr.components.transport.Transport; +import com.webank.wedpr.components.transport.model.Message; +import com.webank.wedpr.core.utils.Constant; +import com.webank.wedpr.core.utils.ObjectMapperFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import lombok.extern.slf4j.Slf4j; + +/** Created by caryliao on 2024/9/4 10:54 */ +@Slf4j +public class JobReportMessageHandler implements Transport.MessageHandler { + private ProjectMapper projectMapper; + + public JobReportMessageHandler(ProjectMapper projectMapper) { + this.projectMapper = projectMapper; + } + + @Override + public void call(Message msg) { + byte[] payload = msg.getPayload(); + try { + JobReportResponse jobReportResponse = + ObjectMapperFactory.getObjectMapper() + .readValue(payload, JobReportResponse.class); + if (Constant.WEDPR_SUCCESS == jobReportResponse.getCode()) { + // report ok ,then set report status to 1 + List jobIdList = jobReportResponse.getJobIdList(); + ArrayList jobDOList = new ArrayList<>(); + for (String jobId : jobIdList) { + JobDO jobDO = new JobDO(); + jobDO.setId(jobId); + jobDO.setReportStatus(1); + jobDOList.add(jobDO); + } + projectMapper.batchUpdateJobInfo(jobDOList); + } else { + log.warn("report job error:{}", jobReportResponse); + } + } catch (IOException e) { + log.warn("handle JobReportResponse error", e); + } + } +} diff --git a/wedpr-components/report/src/main/java/com/webank/wedpr/components/report/handler/JobReportResponse.java b/wedpr-components/report/src/main/java/com/webank/wedpr/components/report/handler/JobReportResponse.java new file mode 100644 index 00000000..f0dc2465 --- /dev/null +++ b/wedpr-components/report/src/main/java/com/webank/wedpr/components/report/handler/JobReportResponse.java @@ -0,0 +1,14 @@ +package com.webank.wedpr.components.report.handler; + +import java.util.List; +import lombok.Data; +import lombok.ToString; + +/** Created by caryliao on 2024/9/5 11:28 */ +@Data +@ToString +public class JobReportResponse { + private Integer code; + private String msg; + private List jobIdList; +} diff --git a/wedpr-components/report/src/main/java/com/webank/wedpr/components/report/handler/ProjectReportMessageHandler.java b/wedpr-components/report/src/main/java/com/webank/wedpr/components/report/handler/ProjectReportMessageHandler.java index a8a8bc98..a297252c 100644 --- a/wedpr-components/report/src/main/java/com/webank/wedpr/components/report/handler/ProjectReportMessageHandler.java +++ b/wedpr-components/report/src/main/java/com/webank/wedpr/components/report/handler/ProjectReportMessageHandler.java @@ -1,13 +1,49 @@ package com.webank.wedpr.components.report.handler; +import com.webank.wedpr.components.project.dao.ProjectDO; +import com.webank.wedpr.components.project.dao.ProjectMapper; import com.webank.wedpr.components.transport.Transport; import com.webank.wedpr.components.transport.model.Message; +import com.webank.wedpr.core.utils.Constant; +import com.webank.wedpr.core.utils.ObjectMapperFactory; import com.webank.wedpr.core.utils.WeDPRException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import lombok.extern.slf4j.Slf4j; /** Created by caryliao on 2024/9/4 10:54 */ +@Slf4j public class ProjectReportMessageHandler implements Transport.MessageHandler { + private ProjectMapper projectMapper; + + public ProjectReportMessageHandler(ProjectMapper projectMapper) { + this.projectMapper = projectMapper; + } + @Override public void call(Message msg) throws WeDPRException { byte[] payload = msg.getPayload(); + try { + ProjectReportResponse projectReportResponse = + ObjectMapperFactory.getObjectMapper() + .readValue(payload, ProjectReportResponse.class); + if (Constant.WEDPR_SUCCESS == projectReportResponse.getCode()) { + // report ok ,then set report status to 1 + List projectIdList = projectReportResponse.getProjectIdList(); + ArrayList projectDOList = new ArrayList<>(); + for (String projectId : projectIdList) { + ProjectDO projectDO = new ProjectDO(); + projectDO.setId(projectId); + projectDO.setReportStatus(1); + projectDOList.add(projectDO); + } + projectMapper.batchUpdateProjectInfo(projectDOList); + } else { + log.warn("report project error:{}", projectReportResponse); + } + } catch (IOException e) { + log.warn("handle ProjectReportResponse error", e); + } } } diff --git a/wedpr-components/report/src/main/java/com/webank/wedpr/components/report/handler/ProjectReportResponse.java b/wedpr-components/report/src/main/java/com/webank/wedpr/components/report/handler/ProjectReportResponse.java new file mode 100644 index 00000000..84fb0b48 --- /dev/null +++ b/wedpr-components/report/src/main/java/com/webank/wedpr/components/report/handler/ProjectReportResponse.java @@ -0,0 +1,14 @@ +package com.webank.wedpr.components.report.handler; + +import java.util.List; +import lombok.Data; +import lombok.ToString; + +/** Created by caryliao on 2024/9/5 11:28 */ +@Data +@ToString +public class ProjectReportResponse { + private Integer code; + private String msg; + private List projectIdList; +} diff --git a/wedpr-components/report/src/main/java/com/webank/wedpr/components/report/job/ReportQuartzJob.java b/wedpr-components/report/src/main/java/com/webank/wedpr/components/report/job/ReportQuartzJob.java index 1fe97ded..00e2efd3 100644 --- a/wedpr-components/report/src/main/java/com/webank/wedpr/components/report/job/ReportQuartzJob.java +++ b/wedpr-components/report/src/main/java/com/webank/wedpr/components/report/job/ReportQuartzJob.java @@ -2,14 +2,15 @@ package com.webank.wedpr.components.report.job; import com.fasterxml.jackson.core.JsonProcessingException; +import com.webank.wedpr.components.project.dao.JobDO; import com.webank.wedpr.components.project.dao.ProjectDO; -import com.webank.wedpr.components.project.service.ProjectService; +import com.webank.wedpr.components.project.dao.ProjectMapper; +import com.webank.wedpr.components.report.handler.JobReportMessageHandler; import com.webank.wedpr.components.report.handler.ProjectReportMessageHandler; import com.webank.wedpr.components.transport.Transport; import com.webank.wedpr.core.config.WeDPRCommonConfig; import com.webank.wedpr.core.protocol.TransportTopicEnum; import com.webank.wedpr.core.utils.ObjectMapperFactory; -import com.webank.wedpr.core.utils.WeDPRException; import java.time.LocalDateTime; import java.util.List; import lombok.extern.slf4j.Slf4j; @@ -22,7 +23,7 @@ @DisallowConcurrentExecution @Slf4j public class ReportQuartzJob implements Job { - @Autowired private ProjectService projectService; + @Autowired private ProjectMapper projectMapper; private Transport transport; @@ -42,14 +43,34 @@ private void doReport() { try { String agency = WeDPRCommonConfig.getAgency(); reportProjectInfo(agency); + reportJobInfo(agency); } catch (Exception e) { - throw new RuntimeException("report error", e); + log.warn("report error", e); } } - private void reportProjectInfo(String agency) throws WeDPRException, JsonProcessingException { - ProjectReportMessageHandler projectReportMessageHandler = new ProjectReportMessageHandler(); - List projectDOList = projectService.queryProjectForReport(); + private void reportJobInfo(String agency) throws JsonProcessingException { + JobReportMessageHandler jobReportMessageHandler = + new JobReportMessageHandler(projectMapper); + JobDO jobDO = new JobDO(); + jobDO.setReportStatus(0); + List jobDOList = projectMapper.queryJobs(false, jobDO, null); + byte[] payload = ObjectMapperFactory.getObjectMapper().writeValueAsBytes(jobDOList); + transport.asyncSendMessageByAgency( + TransportTopicEnum.JOB_REPORT.name(), + agency, + payload, + 0, + WeDPRCommonConfig.getReportTimeout(), + jobReportMessageHandler); + } + + private void reportProjectInfo(String agency) throws JsonProcessingException { + ProjectReportMessageHandler projectReportMessageHandler = + new ProjectReportMessageHandler(projectMapper); + ProjectDO projectDO = new ProjectDO(); + projectDO.setReportStatus(0); + List projectDOList = projectMapper.queryProject(false, projectDO); byte[] payload = ObjectMapperFactory.getObjectMapper().writeValueAsBytes(projectDOList); transport.asyncSendMessageByAgency( TransportTopicEnum.PROJECT_REPORT.name(),