Skip to content

Commit

Permalink
add get dataset storage path by dataset id
Browse files Browse the repository at this point in the history
  • Loading branch information
ywy2090 committed Aug 30, 2024
1 parent fa836fd commit aaa9b49
Show file tree
Hide file tree
Showing 10 changed files with 251 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import static com.webank.wedpr.components.dataset.service.ChunkUploadImpl.UPLOAD_CHUNK_FILE_NAME_PREFIX;

import com.webank.wedpr.components.dataset.common.DatasetConstant;
import com.webank.wedpr.core.utils.Common;
import java.io.File;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
Expand Down Expand Up @@ -68,4 +70,21 @@ public String getDatasetChunksDir(String datasetId) {
String datasetChunksBaseDir = getDatasetChunksBaseDir();
return String.format("%s/%s", datasetChunksBaseDir, datasetId);
}

public String getDatasetStoragePath(String user, String datasetId, boolean dynamic) {
if (dynamic) {
// ${user}/dy/${currentTimeMillis}/${datasetId}
long currentTimeMillis = System.currentTimeMillis();
return user
+ File.separator
+ "dy"
+ File.separator
+ currentTimeMillis
+ File.separator
+ datasetId;
} else {
// ${user}/${datasetId}
return Common.joinPath(user, datasetId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.webank.wedpr.components.dataset.utils.TimeUtils;
import com.webank.wedpr.components.dataset.utils.UserTokenUtils;
import com.webank.wedpr.components.storage.api.FileStorageInterface;
import com.webank.wedpr.components.storage.api.StoragePath;
import com.webank.wedpr.core.utils.Common;
import com.webank.wedpr.core.utils.Constant;
import com.webank.wedpr.core.utils.WeDPRResponse;
Expand Down Expand Up @@ -456,4 +457,40 @@ public WeDPRResponse updateDatasetList(

return weDPRResponse;
}

@GetMapping(value = "getDatasetStoragePath")
public WeDPRResponse getDatasetStoragePath(
HttpServletRequest httpServletRequest,
@RequestParam(value = "datasetID", required = true) String datasetID) {

long startTimeMillis = System.currentTimeMillis();
logger.info("get dataset storage path begin, datasetID: {}", datasetID);

WeDPRResponse weDPRResponse =
new WeDPRResponse(Constant.WEDPR_SUCCESS, Constant.WEDPR_SUCCESS_MSG);

try {
StoragePath storagePath = datasetService.getDatasetStoragePath(datasetID);
weDPRResponse.setData(storagePath);

long endTimeMillis = System.currentTimeMillis();
logger.info(
"get dataset storage path success, datasetID: {}, cost(ms): {}",
datasetID,
(endTimeMillis - startTimeMillis));
} catch (Exception e) {

weDPRResponse.setCode(Constant.WEDPR_FAILED);
weDPRResponse.setMsg(e.getMessage());

long endTimeMillis = System.currentTimeMillis();
logger.error(
"get dataset storage path failed, datasetID: {}, cost(ms): {}, e: ",
datasetID,
(endTimeMillis - startTimeMillis),
e);
}

return weDPRResponse;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import com.webank.wedpr.components.dataset.utils.JsonUtils;
import com.webank.wedpr.components.storage.api.FileStorageInterface;
import com.webank.wedpr.components.storage.api.StoragePath;
import com.webank.wedpr.core.config.WeDPRCommonConfig;
import com.webank.wedpr.core.utils.Common;
import com.webank.wedpr.core.utils.ObjectMapperFactory;
import java.util.Arrays;
Expand Down Expand Up @@ -164,12 +163,13 @@ public void uploadData() throws DatasetException {

String csvFilePath = dataSourceProcessorContext.getCvsFilePath();
UserInfo userInfo = dataSourceProcessorContext.getUserInfo();
DatasetConfig datasetConfig = dataSourceProcessorContext.getDatasetConfig();

FileStorageInterface fileStorage = dataSourceProcessorContext.getFileStorage();

try {
String userDatasetPath =
WeDPRCommonConfig.getUserDatasetPath(userInfo.getUser(), datasetId);
datasetConfig.getDatasetStoragePath(userInfo.getUser(), datasetId, false);

StoragePath storagePath = fileStorage.upload(true, csvFilePath, userDatasetPath, false);

Expand All @@ -179,6 +179,7 @@ public void uploadData() throws DatasetException {
.getDataset()
.setDatasetStorageType(fileStorage.type().toString());
this.dataSourceProcessorContext.getDataset().setDatasetStoragePath(storagePathStr);
this.dataSourceProcessorContext.setStoragePath(storagePath);

long endTimeMillis = System.currentTimeMillis();
logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import com.webank.wedpr.components.dataset.utils.JsonUtils;
import com.webank.wedpr.components.storage.api.FileStorageInterface;
import com.webank.wedpr.components.storage.api.StoragePath;
import com.webank.wedpr.core.config.WeDPRCommonConfig;
import com.webank.wedpr.core.utils.Common;
import com.webank.wedpr.core.utils.ObjectMapperFactory;
import java.io.File;
Expand Down Expand Up @@ -149,12 +148,15 @@ public void uploadData() throws DatasetException {

String csvFilePath = dataSourceProcessorContext.getCvsFilePath();
UserInfo userInfo = dataSourceProcessorContext.getUserInfo();
DataSourceMeta dataSourceMeta = dataSourceProcessorContext.getDataSourceMeta();
DatasetConfig datasetConfig = dataSourceProcessorContext.getDatasetConfig();

FileStorageInterface fileStorage = dataSourceProcessorContext.getFileStorage();

try {
String userDatasetPath =
WeDPRCommonConfig.getUserDatasetPath(userInfo.getUser(), datasetId);
datasetConfig.getDatasetStoragePath(
userInfo.getUser(), datasetId, dataSourceMeta.dynamicDataSource());

StoragePath storagePath = fileStorage.upload(true, csvFilePath, userDatasetPath, false);

Expand All @@ -164,6 +166,7 @@ public void uploadData() throws DatasetException {
.getDataset()
.setDatasetStorageType(fileStorage.type().toString());
this.dataSourceProcessorContext.getDataset().setDatasetStoragePath(storagePathStr);
this.dataSourceProcessorContext.setStoragePath(storagePath);

long endTimeMillis = System.currentTimeMillis();
logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ public interface DataSourceProcessor {
default void setContext(DataSourceProcessorContext context) {}

// prepare data
// ie:
// merge chunk data
// convert excel to csv
// ie: merge chunk data 、convert excel to csv
void prepareData() throws DatasetException;

// analyze data
Expand All @@ -32,9 +30,9 @@ default void setContext(DataSourceProcessorContext context) {}
// process
default void processData(DataSourceProcessorContext context) throws DatasetException {
try {
// init context
setContext(context);
// preprocess data
// ie: convert data to .cvs format, and other operations
prepareData();
// data analysis, reading data fields and data volume
analyzeData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.webank.wedpr.components.dataset.mapper.wapper.DatasetTransactionalWrapper;
import com.webank.wedpr.components.dataset.service.ChunkUploadApi;
import com.webank.wedpr.components.storage.api.FileStorageInterface;
import com.webank.wedpr.components.storage.api.StoragePath;
import lombok.Builder;
import lombok.Data;

Expand All @@ -25,4 +26,5 @@ public class DataSourceProcessorContext {
// intermediate state
private String cvsFilePath;
private String mergedFilePath;
private StoragePath storagePath;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package com.webank.wedpr.components.dataset.datasource.storage;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.webank.wedpr.components.dataset.common.DatasetStatus;
import com.webank.wedpr.components.dataset.config.DatasetConfig;
import com.webank.wedpr.components.dataset.dao.Dataset;
import com.webank.wedpr.components.dataset.dao.UserInfo;
import com.webank.wedpr.components.dataset.datasource.DataSourceMeta;
import com.webank.wedpr.components.dataset.datasource.dispatch.DataSourceProcessorDispatcher;
import com.webank.wedpr.components.dataset.datasource.processor.DataSourceProcessor;
import com.webank.wedpr.components.dataset.datasource.processor.DataSourceProcessorContext;
import com.webank.wedpr.components.dataset.exception.DatasetException;
import com.webank.wedpr.components.dataset.mapper.DatasetMapper;
import com.webank.wedpr.components.dataset.mapper.wapper.DatasetTransactionalWrapper;
import com.webank.wedpr.components.storage.api.FileStorageInterface;
import com.webank.wedpr.components.storage.api.StoragePath;
import com.webank.wedpr.components.storage.builder.StoragePathBuilder;
import com.webank.wedpr.core.utils.WeDPRException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

@Component
public class DatasetStoragePathRetriever {

private static final Logger logger = LoggerFactory.getLogger(DatasetStoragePathRetriever.class);

@Autowired private DatasetConfig datasetConfig;
@Autowired private DatasetMapper datasetMapper;
@Autowired private DatasetTransactionalWrapper datasetTransactionalWrapper;

@Qualifier("fileStorage")
@Autowired
private FileStorageInterface fileStorage;

/**
* get the storage path of the dataset NOTICE: This interface may block for a long time
*
* @param datasetID
* @return
* @throws DatasetException
*/
public StoragePath getDatasetStoragePath(String datasetID) throws DatasetException {

DatasetMapper datasetMapper = datasetTransactionalWrapper.getDatasetMapper();
Dataset dataset = datasetMapper.getDatasetByDatasetId(datasetID, false);
if (dataset == null) {
logger.error("dataset not found, dataset id: {}", datasetID);
throw new DatasetException("dataset not found, dataset id: " + datasetID);
}

return getDatasetStoragePath(dataset);
}

/**
* get the storage path of the dataset NOTICE: This interface may block for a long time
*
* @param dataset
* @return
* @throws DatasetException
*/
public StoragePath getDatasetStoragePath(Dataset dataset) throws DatasetException {
String datasetID = dataset.getDatasetId();
int status = dataset.getStatus();
if (status != DatasetStatus.Success.getCode().intValue()) {
logger.error(
"dataset is not available status, dataset id: {}, status: {}",
datasetID,
status);
throw new DatasetException(
"dataset is not available status, dataset id: "
+ datasetID
+ " status: "
+ status);
}

String strDataSourceType = dataset.getDataSourceType();
String strDataSourceMeta = dataset.getDataSourceMeta();
String strDatasetStorageType = dataset.getDatasetStorageType();
String strDatasetStoragePath = dataset.getDatasetStoragePath();

DataSourceProcessorDispatcher dataSourceProcessorDispatcher =
new DataSourceProcessorDispatcher();

DataSourceProcessor dataSourceProcessor =
dataSourceProcessorDispatcher.getDataSourceProcessor(strDataSourceType);
DataSourceMeta dataSourceMeta = dataSourceProcessor.parseDataSourceMeta(strDataSourceMeta);

if (dataSourceMeta.dynamicDataSource()) {
// dynamic data source
return processDynamicDatasourceForStoragePath(
dataset, dataSourceMeta, dataSourceProcessor);
} else {
return createStoragePath(strDatasetStorageType, strDatasetStoragePath);
}
}

public StoragePath createStoragePath(String strStorageType, String strStoragePath)
throws DatasetException {
try {
return StoragePathBuilder.getInstance(strStorageType, strStoragePath);
} catch (WeDPRException e) {
throw new DatasetException(e);
} catch (JsonProcessingException e) {
throw new DatasetException(e);
}
}

public StoragePath processDynamicDatasourceForStoragePath(
Dataset dataset, DataSourceMeta dataSourceMeta, DataSourceProcessor dataSourceProcessor)
throws DatasetException {

String ownerUserName = dataset.getOwnerUserName();
String ownerAgencyName = dataset.getOwnerAgencyName();

UserInfo userInfo = UserInfo.builder().user(ownerUserName).agency(ownerAgencyName).build();

DataSourceProcessorContext context =
DataSourceProcessorContext.builder()
.dataset(dataset)
.dataSourceMeta(dataSourceMeta)
.datasetConfig(datasetConfig)
.userInfo(userInfo)
.datasetTransactionalWrapper(datasetTransactionalWrapper)
.fileStorage(fileStorage)
.build();

try {
dataSourceProcessor.setContext(context);
dataSourceProcessor.processData(context);
StoragePath storagePath = context.getStoragePath();

logger.info(
"process dynamic data source success, dataset id: {}, datasource type: {}, datasource meta: {}, storage path: {}",
dataset.getDatasetId(),
dataset.getDataSourceType(),
dataset.getDataSourceMeta(),
storagePath);

return storagePath;
} catch (Exception e) {
logger.error(
"process dynamic data source exception, dataset id: {}, datasource type: {}, datasource meta: {}, e: ",
dataset.getDatasetId(),
dataset.getDataSourceType(),
dataset.getDataSourceMeta(),
e);
throw e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.webank.wedpr.components.dataset.message.CreateDatasetResponse;
import com.webank.wedpr.components.dataset.message.ListDatasetResponse;
import com.webank.wedpr.components.dataset.message.UpdateDatasetRequest;
import com.webank.wedpr.components.storage.api.StoragePath;
import java.util.List;

public interface DatasetServiceApi {
Expand Down Expand Up @@ -88,4 +89,13 @@ ListDatasetResponse listDataset(
Integer pageOffset,
Integer pageSize)
throws DatasetException;

/**
* get dataset storage path
*
* @param datasetID
* @return
* @throws DatasetException
*/
StoragePath getDatasetStoragePath(String datasetID) throws DatasetException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.webank.wedpr.components.dataset.datasource.dispatch.DataSourceProcessorDispatcher;
import com.webank.wedpr.components.dataset.datasource.processor.DataSourceProcessor;
import com.webank.wedpr.components.dataset.datasource.processor.DataSourceProcessorContext;
import com.webank.wedpr.components.dataset.datasource.storage.DatasetStoragePathRetriever;
import com.webank.wedpr.components.dataset.exception.DatasetException;
import com.webank.wedpr.components.dataset.mapper.DatasetMapper;
import com.webank.wedpr.components.dataset.mapper.DatasetPermissionMapper;
Expand Down Expand Up @@ -57,6 +58,7 @@ public class DatasetServiceImpl implements DatasetServiceApi {
@Autowired private DatasetTransactionalWrapper datasetTransactionalWrapper;
@Autowired private DataSourceProcessorDispatcher dataSourceProcessorDispatcher;
@Autowired private DatasetWrapper datasetWrapper;
@Autowired private DatasetStoragePathRetriever datasetStoragePathRetriever;

@Qualifier("fileStorage")
@Autowired
Expand Down Expand Up @@ -552,4 +554,16 @@ public ListDatasetResponse listDataset(
"query visible datasets for user db operation exception, " + e.getMessage());
}
}

/**
* get dataset storage path
*
* @param datasetID
* @return
* @throws DatasetException
*/
@Override
public StoragePath getDatasetStoragePath(String datasetID) throws DatasetException {
return datasetStoragePathRetriever.getDatasetStoragePath(datasetID);
}
}
Loading

0 comments on commit aaa9b49

Please sign in to comment.