Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support remote scheduler service #46

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions wedpr-adm/conf/wedpr.properties
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ wedpr.executor.pir.connect.timeout.ms=5000
wedpr.executor.pir.request.timeout.ms=60000
wedpr.executor.pir.max.total.connection=5

# scheduler service config items
wedpr.remote.scheduler.enable=true
wedpr.remote.scheduler.max.total.connection=10
wedpr.remote.scheduler.url=http://127.0.0.1:43471
wedpr.remote.scheduler.api.path=/api/wedpr/v3/scheduler/job/

# the jupyter config
wedpr.jupyter.max_count_per_host=3
wedpr.jupyter.host_configuration_key=jupyter_entrypoints
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

package com.webank.wedpr.adm.controller;

import com.webank.wedpr.components.scheduler.service.SchedulerService;
import com.webank.wedpr.components.scheduler.local.service.SchedulerService;
import com.webank.wedpr.components.token.auth.TokenUtils;
import com.webank.wedpr.core.config.WeDPRCommonConfig;
import com.webank.wedpr.core.utils.Constant;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Dataset getDatasetByDatasetId(
* @return
*/
List<Dataset> queryVisibleDatasetsForUser(
@Param("loginUser") String loginUser,
@Param("loginAgency") String loginAgency,
@Param("loginUserSubject") String loginUserSubject,
@Param("loginUserGroupSubjectList") List<String> loginUserGroupSubjectList,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ public ListDatasetResponse listDataset(

List<Dataset> datasetList =
datasetMapper.queryVisibleDatasetsForUser(
user,
agency,
userSubject,
userGroupSubjectList,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,12 @@ public static UserInfo getUserInfo(
String[] splitStrings = token.split("\\|");

List<GroupInfo> groupInfoList = new ArrayList<>();
GroupInfo groupInfo = new GroupInfo();
groupInfo.setGroupId(splitStrings[2]);
groupInfo.setGroupName(splitStrings[2]);
groupInfoList.add(groupInfo);

UserInfo userInfo =
UserInfo.builder()
.role(splitStrings[0])
.agency(splitStrings[1])
.user(splitStrings[2])
.groupInfos(groupInfoList)
.user(splitStrings[3])
.build();

logger.info("debug model, try to solve use info from test field, user: {}", userInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,28 @@
<if test="ownerAgency != null and ownerAgency.trim() != ''">
AND t1.owner_agency_name = #{ownerAgency}
</if>

<if test="loginAgency != null and loginAgency.trim() != ''">
<if test="loginUser != null and loginUser.trim() != ''">
AND (
(
t1.owner_user_name = #{loginUser}
AND
t1.owner_agency_name = #{loginAgency})
OR
(
(
t1.owner_user_name != #{loginUser}
OR
t1.owner_agency_name != #{loginAgency}
)
AND
t1.status = 0
)
)
</if>
</if>

ORDER BY
t1.create_at DESC
</select>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
Expand Down Expand Up @@ -58,23 +59,23 @@ public BaseResponse executePost(BaseRequest request) throws Exception {
return factory.build(executePostAndGetString(request, null));
}

public String executePostAndGetString(BaseRequest request, Integer successCode)
public String executePostAndGetString(String url, String request, Integer successCode)
throws Exception {
StringEntity requestEntity = null;
CloseableHttpResponse response = null;
try {
CloseableHttpClient httpClient =
HttpClientPool.getHttpClient(this.maxConnTotal, this.requestConfig);
HttpPost httpPost = new HttpPost(this.url);
requestEntity = new StringEntity(request.serialize());
HttpPost httpPost = new HttpPost(url);
requestEntity = new StringEntity(request);
httpPost.setEntity(requestEntity);
httpPost.setHeader(CONTENT_TYPE_KEY, DEFAULT_CONTENT_TYPE);
response = httpClient.execute(httpPost);
if (successCode != null) {
if (response.getStatusLine().getStatusCode() != successCode) {
throw new WeDPRException(
"send request: "
+ request.serialize()
+ request
+ " failed, status: "
+ response.getStatusLine().toString()
+ ", detail: "
Expand All @@ -84,15 +85,24 @@ public String executePostAndGetString(BaseRequest request, Integer successCode)
String result = EntityUtils.toString(response.getEntity());
logger.info(
"##### executePostAndGetString, request: {}, response: {}, response: {}",
request.serialize(),
request,
result,
response.toString());
response);
return result;
} finally {
releaseResource(response, requestEntity);
}
}

public String executePostAndGetString(String request, Integer successCode) throws Exception {
return executePostAndGetString(this.url, request, successCode);
}

public String executePostAndGetString(BaseRequest request, Integer successCode)
throws Exception {
return executePostAndGetString(request.serialize(), successCode);
}

private void releaseResource(CloseableHttpResponse response, StringEntity requestEntity)
throws Exception {
HttpClientPool.consume(requestEntity);
Expand All @@ -101,23 +111,44 @@ private void releaseResource(CloseableHttpResponse response, StringEntity reques
}
}

public BaseResponse execute(String url, boolean delete) throws Exception {
public String executeAndGetString(HttpRequestBase httpRequestBase) throws Exception {
CloseableHttpResponse response = null;
try {
CloseableHttpClient httpClient =
HttpClientPool.getHttpClient(this.maxConnTotal, this.requestConfig);
if (!delete) {
HttpGet httpGet = new HttpGet(url);
httpGet.setHeader(CONTENT_TYPE_KEY, DEFAULT_CONTENT_TYPE);
response = httpClient.execute(httpGet);
} else {
HttpDelete httpDelete = new HttpDelete(url);
httpDelete.setHeader(CONTENT_TYPE_KEY, DEFAULT_CONTENT_TYPE);
response = httpClient.execute(httpDelete);
}

httpRequestBase.setHeader(CONTENT_TYPE_KEY, DEFAULT_CONTENT_TYPE);
response = httpClient.execute(httpRequestBase);

return EntityUtils.toString(response.getEntity());
} finally {
releaseResource(response, null);
}
}

public BaseResponse execute(HttpRequestBase httpRequestBase) throws Exception {
CloseableHttpResponse response = null;
try {
CloseableHttpClient httpClient =
HttpClientPool.getHttpClient(this.maxConnTotal, this.requestConfig);

httpRequestBase.setHeader(CONTENT_TYPE_KEY, DEFAULT_CONTENT_TYPE);
response = httpClient.execute(httpRequestBase);

return factory.build(EntityUtils.toString(response.getEntity()));
} finally {
releaseResource(response, null);
}
}

public BaseResponse execute(String url, boolean delete) throws Exception {
HttpRequestBase httpRequestBase = null;
if (delete) {
httpRequestBase = new HttpDelete(url);
} else {
httpRequestBase = new HttpGet(url);
}

return execute(httpRequestBase);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public long queryFollowerJobCount(

public int insertJobInfo(@Param("jobDO") JobDO jobDO);

public int updateJobInfo(@Param("jobDO") JobDO jobDO);

public int batchUpdateJobInfo(@Param("jobDOList") List<JobDO> jobDOList);

public int batchInsertJobDatasetRelationInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public List<JobDO> queryJobDetail(String jobID, String user, String agency) {
return jobDOList;
}

@Transactional(rollbackFor = Exception.class)
public void batchUpdateJobStatus(
String user, String agency, List<JobDO> jobs, JobStatus status) {
if (jobs == null || jobs.isEmpty()) {
Expand All @@ -109,8 +110,15 @@ public void batchUpdateJobStatus(
updatedJob.setOwnerAgency(agency);
updatedJob.setReportStatus(ReportStatusEnum.NO_REPORT.getReportStatus());
updatedJobList.add(updatedJob);
this.projectMapper.updateJobInfo(updatedJob);
}
this.projectMapper.batchUpdateJobInfo(updatedJobList);

logger.info(
"batch update job status, user: {}, agency: {}, jobsSize: {}",
user,
agency,
updatedJobList.size());
// this.projectMapper.batchUpdateJobInfo(updatedJobList);
}

public WeDPRResponse updateJobStatus(
Expand Down Expand Up @@ -156,7 +164,7 @@ public void recordJobStatus(JobDO jobDO) {
public void insertJob(JobDO jobDO) {
String id = jobDO.getId();

logger.info(" => insert job, jobID: {}", id);
// logger.info(" ## insert job, jobID: {}", id);

this.projectMapper.insertJobInfo(jobDO);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,12 @@ public WeDPRResponse submitJob(String user, JobRequest request) {
validateUserPermissionToDatasets(user, agency, datasetList);

this.projectMapperWrapper.insertJob(request.getJob());

String jobId = request.getJob().getId();

logger.info(
"submitJob, user: {}, agency: {}, datasetIds: {}, detail: {}",
"## submitJob, jobId: {}, user: {}, agency: {}, datasetIDs: {}, detail: {}",
jobId,
user,
agency,
datasetList,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,31 @@
on job_table.id = follower_table.resource_id where job_table.owner is not NULL
</select>

<update id="updateJobInfo" parameterType="com.webank.wedpr.components.project.dao.JobDO">
update `wedpr_job_table`
<set>
<if test="jobDO.param != null and jobDO.param !=''">
`param` = #{jobDO.param},
</if>
<if test="jobDO.status != null and jobDO.status !=''">
`status` = #{jobDO.status},
</if>
<if test="jobDO.result != null and jobDO.result !=''">
`job_result` = #{jobDO.result},
</if>
<if test="jobDO.reportStatus != null">
`report_status` = #{jobDO.reportStatus},
</if>
</set>
where `id` = #{jobDO.id}
<if test="jobDO.owner != null and jobDO.owner !=''">
and `owner` = #{jobDO.owner}
</if>
<if test="jobDO.ownerAgency != null and jobDO.ownerAgency !=''">
and `owner_agency` = #{jobDO.ownerAgency}
</if>
</update>

<update id="batchUpdateJobInfo" parameterType="java.util.List">
<foreach collection="jobDOList" item = "job" separator=";">
update `wedpr_job_table`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.List;

public interface Scheduler {
public abstract void batchKillJobs(List<JobDO> jobs);
void batchKillJobs(List<JobDO> jobs);

public abstract void batchRunJobs(List<JobDO> jobs);
void batchRunJobs(List<JobDO> jobs);
}
Loading
Loading