Skip to content

Commit

Permalink
support get storage path by dataset id
Browse files Browse the repository at this point in the history
  • Loading branch information
ywy2090 committed Sep 11, 2024
1 parent 6fe4d97 commit 9ee461a
Show file tree
Hide file tree
Showing 18 changed files with 108 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,12 @@ public static UserInfo getUserInfo(
String[] splitStrings = token.split("\\|");

List<GroupInfo> groupInfoList = new ArrayList<>();
GroupInfo groupInfo = new GroupInfo();
groupInfo.setGroupId(splitStrings[2]);
groupInfo.setGroupName(splitStrings[2]);
groupInfoList.add(groupInfo);

UserInfo userInfo =
UserInfo.builder()
.role(splitStrings[0])
.agency(splitStrings[1])
.user(splitStrings[2])
.groupInfos(groupInfoList)
.user(splitStrings[3])
.build();

logger.info("debug model, try to solve use info from test field, user: {}", userInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@
import com.webank.wedpr.components.project.dao.JobDO;
import com.webank.wedpr.core.protocol.JobType;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class JobChecker {

public interface JobCheckHandler {
public Object checkAndParseParam(JobDO jobDO) throws Exception;
public Object checkAndParseParam(JobDO jobDO, List<String> datasetIdList) throws Exception;
}

private Map<String, JobCheckHandler> handlers = new HashMap<>();
Expand All @@ -36,10 +37,10 @@ public void registerJobCheckHandler(JobType jobType, JobCheckHandler handler) {
handlers.put(jobType.getType(), handler);
}

public Object checkAndParseParam(JobDO jobDO) throws Exception {
public Object checkAndParseParam(JobDO jobDO, List<String> datasetIdList) throws Exception {
if (!handlers.containsKey(jobDO.getType().getType())) {
return null;
}
return handlers.get(jobDO.getType().getType()).checkAndParseParam(jobDO);
return handlers.get(jobDO.getType().getType()).checkAndParseParam(jobDO, datasetIdList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ public WeDPRResponse submitJob(String user, JobRequest request) {
request.getJob().checkCreate();
request.getJob().setDatasetList(datasetList);
// check the job param
jobChecker.checkAndParseParam(request.getJob());
jobChecker.checkAndParseParam(request.getJob(), datasetList);

request.getJob().setStatus(JobStatus.Submitted.getStatus());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.webank.wedpr.components.scheduler.config;

import com.webank.wedpr.components.dataset.service.DatasetServiceApi;
import com.webank.wedpr.components.scheduler.executor.impl.model.FileMetaBuilder;
import com.webank.wedpr.components.storage.builder.StoragePathBuilder;
import com.webank.wedpr.components.storage.config.HdfsStorageConfig;
Expand All @@ -23,6 +24,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
Expand All @@ -37,11 +39,16 @@ public class FileMetaBuilderConfig {
@Autowired private LocalStorageConfig localStorageConfig;
@Autowired private HdfsStorageConfig hdfsConfig;

@Qualifier("datasetService")
@Autowired
private DatasetServiceApi datasetService;

@Bean(name = "fileMetaBuilder")
@Scope(value = ConfigurableBeanFactory.SCOPE_SINGLETON)
@ConditionalOnMissingBean
public FileMetaBuilder fileMetaBuilder() throws Exception {

return new FileMetaBuilder(new StoragePathBuilder(hdfsConfig, localStorageConfig));
return new FileMetaBuilder(
new StoragePathBuilder(hdfsConfig, localStorageConfig), datasetService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.webank.wedpr.components.scheduler.executor.impl.model.FileMetaBuilder;
import com.webank.wedpr.components.scheduler.executor.impl.psi.PSIExecutorParamChecker;
import com.webank.wedpr.core.protocol.JobType;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -60,8 +61,9 @@ public void registerPSIJobChecker(JobChecker jobChecker, FileMetaBuilder fileMet
jobType,
new JobChecker.JobCheckHandler() {
@Override
public Object checkAndParseParam(JobDO jobDO) throws Exception {
return psiExecutorParamChecker.checkAndParseJob(jobDO);
public Object checkAndParseParam(JobDO jobDO, List<String> datasetIdList)
throws Exception {
return psiExecutorParamChecker.checkAndParseJob(jobDO, datasetIdList);
}
});
}
Expand All @@ -75,8 +77,9 @@ public void registerMLJobChecker(JobChecker jobChecker, FileMetaBuilder fileMeta
jobType,
new JobChecker.JobCheckHandler() {
@Override
public Object checkAndParseParam(JobDO jobDO) throws Exception {
return mlExecutorParamChecker.checkAndParseJob(jobDO);
public Object checkAndParseParam(JobDO jobDO, List<String> datasetIdList)
throws Exception {
return mlExecutorParamChecker.checkAndParseJob(jobDO, datasetIdList);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.webank.wedpr.components.scheduler.config;

import com.webank.wedpr.components.dataset.service.DatasetServiceApi;
import com.webank.wedpr.components.project.JobChecker;
import com.webank.wedpr.components.project.dao.ProjectMapperWrapper;
import com.webank.wedpr.components.scheduler.SchedulerBuilder;
Expand Down Expand Up @@ -50,6 +51,10 @@ public class SchedulerLoader {
@Autowired
private FileStorageInterface storage;

@Qualifier("datasetService")
@Autowired
private DatasetServiceApi datasetService;

@Autowired private ResourceSyncer resourceSyncer;
@Autowired private JobChecker jobChecker;

Expand All @@ -62,7 +67,9 @@ public SchedulerTaskImpl schedulerTaskImpl() throws Exception {
projectMapperWrapper,
storage,
resourceSyncer,
new FileMetaBuilder(new StoragePathBuilder(hdfsConfig, localStorageConfig)),
new FileMetaBuilder(
new StoragePathBuilder(hdfsConfig, localStorageConfig),
datasetService),
jobChecker);
schedulerTask.start();
return schedulerTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ private static FileMetaBuilder createFileMetaBuilder() {
hdfsStorageConfig.setBaseDir("/user/ppc/webank");
hdfsStorageConfig.setUser("ppc");
LocalStorageConfig localStorageConfig = new LocalStorageConfig();
return new FileMetaBuilder(new StoragePathBuilder(hdfsStorageConfig, localStorageConfig));
return new FileMetaBuilder(
new StoragePathBuilder(hdfsStorageConfig, localStorageConfig), null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import lombok.SneakyThrows;

public class ExecuteResult {
public static enum ResultStatus {
public enum ResultStatus {
RUNNING("Running"),
SUCCESS("SUCCESS"),
FAILED("Failed");
Expand Down Expand Up @@ -80,6 +80,7 @@ public boolean finished() {
return this.resultStatus.finished();
}

// {"msg":"task is running","resultStatus":"SUCCESS"}
@SneakyThrows(Exception.class)
public String serialize() {
return ObjectMapperFactory.getObjectMapper().writeValueAsString(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@
public interface ExecutorParamChecker {
public abstract List<JobType> getJobTypeList();

public abstract Object checkAndParseJob(JobDO jobDO) throws Exception;
public abstract Object checkAndParseJob(JobDO jobDO, List<String> datasetIdList)
throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ public List<JobType> getJobTypeList() {
}

@Override
public Object checkAndParseJob(JobDO jobDO) throws Exception {
public Object checkAndParseJob(JobDO jobDO, List<String> datasetIdList) throws Exception {
ModelJobParam modelJobParam = ModelJobParam.deserialize(jobDO.getParam());
modelJobParam.setDatasetIdList(datasetIdList);
modelJobParam.setJobID(jobDO.getId());
modelJobParam.setJobType(jobDO.getType());
// check the param
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class ModelJobParam {
// the dataset information
private List<DatasetInfo> dataSetList;

@JsonIgnore private transient List<String> datasetIdList;
@JsonIgnore private transient DatasetInfo selfDataset;
@JsonIgnore private transient DatasetInfo labelProviderDataset;
@JsonIgnore private transient ModelJobRequest modelRequest = new ModelJobRequest();
Expand All @@ -58,6 +59,7 @@ public void check() throws Exception {
}
modelRequest.setJobID(jobID);
for (DatasetInfo datasetInfo : dataSetList) {
datasetInfo.setDatasetIdList(datasetIdList);
datasetInfo.check();
if (datasetInfo.getReceiveResult()) {
modelRequest
Expand Down Expand Up @@ -233,6 +235,14 @@ public void setLabelProviderDataset(DatasetInfo labelProviderDataset) {
this.labelProviderDataset = labelProviderDataset;
}

public List<String> getDatasetIdList() {
return datasetIdList;
}

public void setDatasetIdList(List<String> datasetIdList) {
this.datasetIdList = datasetIdList;
}

public boolean usePSI() {
if (this.modelRequest == null || this.modelRequest.getModelParam() == null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.webank.wedpr.components.scheduler.executor.impl.model;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.webank.wedpr.core.utils.Constant;
import com.webank.wedpr.core.utils.WeDPRException;
import java.util.ArrayList;
Expand All @@ -31,6 +32,16 @@ public class DatasetInfo {
protected Boolean receiveResult = false;
protected List<String> idFields = new ArrayList<>(Arrays.asList(Constant.DEFAULT_ID_FIELD));

@JsonIgnore protected List<String> datasetIdList;

public List<String> getDatasetIdList() {
return datasetIdList;
}

public void setDatasetIdList(List<String> datasetIdList) {
this.datasetIdList = datasetIdList;
}

public FileMeta getDataset() {
return dataset;
}
Expand Down Expand Up @@ -63,7 +74,7 @@ public void check() {
if (this.dataset == null) {
throw new WeDPRException("Invalid ML job param for no dataset defined!");
}
dataset.check();
dataset.check(datasetIdList);
}

public List<String> getIdFields() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.webank.wedpr.core.protocol.StorageType;
import com.webank.wedpr.core.utils.Common;
import com.webank.wedpr.core.utils.WeDPRException;
import java.util.List;
import lombok.SneakyThrows;

public class FileMeta {
Expand Down Expand Up @@ -129,15 +130,24 @@ public void resetType() {
}

@SneakyThrows(Exception.class)
public void check() {
public void check(List<String> datasetIdList) {
if (this.storageType == null) {
throw new WeDPRException("Not supported storageType: " + storageTypeStr);
}
Common.requireNonEmpty(owner, "owner");
Common.requireNonEmpty(ownerAgency, "ownerAgency");
if (this.path == null) {

// Note: both support datasetID and dataset path
if (this.datasetID != null && this.datasetID.isEmpty() && (datasetIdList != null)) {
if (!datasetIdList.contains(this.datasetID)) {
throw new WeDPRException(
"Invalid datasetID, datasetID must in datasetIdList set, datasetID: "
+ datasetID);
}
} else if (this.path == null) {
throw new WeDPRException("Invalid FileMeta, must define the filePath!");
}

Common.requireNonEmpty(path, "filePath");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,25 @@

package com.webank.wedpr.components.scheduler.executor.impl.model;

import com.webank.wedpr.components.dataset.service.DatasetServiceApi;
import com.webank.wedpr.components.storage.builder.StoragePathBuilder;
import com.webank.wedpr.core.protocol.StorageType;
import lombok.SneakyThrows;

public class FileMetaBuilder {
private final StoragePathBuilder storagePathBuilder;
private final DatasetServiceApi datasetService;

public FileMetaBuilder(StoragePathBuilder storagePathBuilder) {
public FileMetaBuilder(
StoragePathBuilder storagePathBuilder, DatasetServiceApi datasetService) {
this.datasetService = datasetService;
this.storagePathBuilder = storagePathBuilder;
}

public DatasetServiceApi getDatasetService() {
return datasetService;
}

@SneakyThrows(Exception.class)
public FileMeta build(StorageType storageType, String path, String owner, String agency) {
return new FileMeta(storageType, path, owner, agency);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public PSIExecutor(
@Override
public void prepare(JobDO jobDO) throws Exception {
// deserialize the jobParam
PSIJobParam psiJobParam = (PSIJobParam) this.jobChecker.checkAndParseParam(jobDO);
PSIJobParam psiJobParam = (PSIJobParam) this.jobChecker.checkAndParseParam(jobDO, null);
psiJobParam.setTaskID(jobDO.getTaskID());
preparePSIJob(jobDO, psiJobParam);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public List<JobType> getJobTypeList() {
}

@Override
public Object checkAndParseJob(JobDO jobDO) throws Exception {
public Object checkAndParseJob(JobDO jobDO, List<String> datasetIdList) throws Exception {
// deserialize the jobParam
PSIJobParam psiJobParam = PSIJobParam.deserialize(jobDO.getParam());
psiJobParam.setJobID(jobDO.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
package com.webank.wedpr.components.scheduler.executor.impl.psi.model;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.webank.wedpr.components.dataset.service.DatasetServiceApi;
import com.webank.wedpr.components.scheduler.executor.impl.ExecutorConfig;
import com.webank.wedpr.components.scheduler.executor.impl.model.DatasetInfo;
import com.webank.wedpr.components.scheduler.executor.impl.model.FileMeta;
import com.webank.wedpr.components.scheduler.executor.impl.model.FileMetaBuilder;
import com.webank.wedpr.components.storage.api.FileStorageInterface;
import com.webank.wedpr.components.storage.api.StoragePath;
import com.webank.wedpr.core.config.WeDPRCommonConfig;
import com.webank.wedpr.core.utils.CSVFileParser;
import com.webank.wedpr.core.utils.Common;
Expand Down Expand Up @@ -65,7 +67,7 @@ public void checkAndResetPath(FileMetaBuilder fileMetaBuilder, String jobID)
if (dataset == null) {
throw new WeDPRException("Invalid PSI Request, must define the input dataset!");
}
dataset.check();
dataset.check(datasetIdList);
if (idFields == null || idFields.isEmpty()) {
throw new WeDPRException("Must define the field list to run PSI!");
}
Expand Down Expand Up @@ -226,8 +228,28 @@ public void prepare(
partyInfo.getDataset().toString());
long startT = System.currentTimeMillis();

StoragePath storagePath = null;
String datasetID = partyInfo.getDataset().getDatasetID();
if (datasetID != null) {
// Note: get the storage path by dataset id
DatasetServiceApi datasetService = fileMetaBuilder.getDatasetService();
storagePath = datasetService.getDatasetStoragePath(datasetID);
startT = System.currentTimeMillis();
logger.info(
"Prepare PSI, get storage path {} by datasetID success, timecost: {}",
storagePath,
System.currentTimeMillis() - startT);
} else {
storagePath = partyInfo.getDataset().getStoragePath();
startT = System.currentTimeMillis();
logger.info(
"Prepare PSI, get storage path {} success, timecost: {}",
storagePath,
System.currentTimeMillis() - startT);
}

// download the input dataset according to the path
storage.download(partyInfo.getDataset().getStoragePath(), downloadedFilePath);
storage.download(storagePath, downloadedFilePath);
logger.info(
"Prepare PSI, download file from {}=>{} success, timecost: {}",
partyInfo.getDataset().getPath(),
Expand Down
Loading

0 comments on commit 9ee461a

Please sign in to comment.