Skip to content

Commit

Permalink
Merge pull request #30 from ywy2090/feature-milestone2
Browse files Browse the repository at this point in the history
add get dataset storage path by dataset id
  • Loading branch information
ywy2090 authored Sep 1, 2024
2 parents e46eacd + 37a1565 commit 89a39e4
Show file tree
Hide file tree
Showing 16 changed files with 413 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import static com.webank.wedpr.components.dataset.service.ChunkUploadImpl.UPLOAD_CHUNK_FILE_NAME_PREFIX;

import com.webank.wedpr.components.dataset.common.DatasetConstant;
import com.webank.wedpr.core.utils.Common;
import java.io.File;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
Expand Down Expand Up @@ -68,4 +70,21 @@ public String getDatasetChunksDir(String datasetId) {
String datasetChunksBaseDir = getDatasetChunksBaseDir();
return String.format("%s/%s", datasetChunksBaseDir, datasetId);
}

public String getDatasetStoragePath(String user, String datasetId, boolean dynamic) {
if (dynamic) {
// ${user}/dy/${currentTimeMillis}/${datasetId}
long currentTimeMillis = System.currentTimeMillis();
return user
+ File.separator
+ "dy"
+ File.separator
+ currentTimeMillis
+ File.separator
+ datasetId;
} else {
// ${user}/${datasetId}
return Common.joinPath(user, datasetId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.webank.wedpr.components.dataset.utils.TimeUtils;
import com.webank.wedpr.components.dataset.utils.UserTokenUtils;
import com.webank.wedpr.components.storage.api.FileStorageInterface;
import com.webank.wedpr.components.storage.api.StoragePath;
import com.webank.wedpr.core.utils.Common;
import com.webank.wedpr.core.utils.Constant;
import com.webank.wedpr.core.utils.WeDPRResponse;
Expand Down Expand Up @@ -456,4 +457,40 @@ public WeDPRResponse updateDatasetList(

return weDPRResponse;
}

@GetMapping(value = "getDatasetStoragePath")
public WeDPRResponse getDatasetStoragePath(
HttpServletRequest httpServletRequest,
@RequestParam(value = "datasetID", required = true) String datasetID) {

long startTimeMillis = System.currentTimeMillis();
logger.info("get dataset storage path begin, datasetID: {}", datasetID);

WeDPRResponse weDPRResponse =
new WeDPRResponse(Constant.WEDPR_SUCCESS, Constant.WEDPR_SUCCESS_MSG);

try {
StoragePath storagePath = datasetService.getDatasetStoragePath(datasetID);
weDPRResponse.setData(storagePath);

long endTimeMillis = System.currentTimeMillis();
logger.info(
"get dataset storage path success, datasetID: {}, cost(ms): {}",
datasetID,
(endTimeMillis - startTimeMillis));
} catch (Exception e) {

weDPRResponse.setCode(Constant.WEDPR_FAILED);
weDPRResponse.setMsg(e.getMessage());

long endTimeMillis = System.currentTimeMillis();
logger.error(
"get dataset storage path failed, datasetID: {}, cost(ms): {}, e: ",
datasetID,
(endTimeMillis - startTimeMillis),
e);
}

return weDPRResponse;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ public class DBDataSource implements DataSourceMeta {
private String sql;
// Data is loaded once when a data source is created, or on each access
Boolean dynamicDataSource = false;
// verify sql syntax and test connectivity
boolean verifySqlSyntaxAndTestCon = true;

@Override
public boolean dynamicDataSource() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import com.webank.wedpr.components.dataset.utils.JsonUtils;
import com.webank.wedpr.components.storage.api.FileStorageInterface;
import com.webank.wedpr.components.storage.api.StoragePath;
import com.webank.wedpr.core.config.WeDPRCommonConfig;
import com.webank.wedpr.core.utils.Common;
import com.webank.wedpr.core.utils.ObjectMapperFactory;
import java.util.Arrays;
Expand Down Expand Up @@ -164,12 +163,13 @@ public void uploadData() throws DatasetException {

String csvFilePath = dataSourceProcessorContext.getCvsFilePath();
UserInfo userInfo = dataSourceProcessorContext.getUserInfo();
DatasetConfig datasetConfig = dataSourceProcessorContext.getDatasetConfig();

FileStorageInterface fileStorage = dataSourceProcessorContext.getFileStorage();

try {
String userDatasetPath =
WeDPRCommonConfig.getUserDatasetPath(userInfo.getUser(), datasetId);
datasetConfig.getDatasetStoragePath(userInfo.getUser(), datasetId, false);

StoragePath storagePath = fileStorage.upload(true, csvFilePath, userDatasetPath, false);

Expand All @@ -179,6 +179,7 @@ public void uploadData() throws DatasetException {
.getDataset()
.setDatasetStorageType(fileStorage.type().toString());
this.dataSourceProcessorContext.getDataset().setDatasetStoragePath(storagePathStr);
this.dataSourceProcessorContext.setStoragePath(storagePath);

long endTimeMillis = System.currentTimeMillis();
logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import com.webank.wedpr.components.dataset.utils.JsonUtils;
import com.webank.wedpr.components.storage.api.FileStorageInterface;
import com.webank.wedpr.components.storage.api.StoragePath;
import com.webank.wedpr.core.config.WeDPRCommonConfig;
import com.webank.wedpr.core.utils.Common;
import com.webank.wedpr.core.utils.ObjectMapperFactory;
import java.io.File;
Expand Down Expand Up @@ -57,8 +56,12 @@ public DataSourceMeta parseDataSourceMeta(String strDataSourceMeta) throws Datas

// check if single select
SQLUtils.isSingleSelectStatement(sql);
// validate parameters, test db connectivity, validate SQL syntax
SQLUtils.validateDataSourceParameters(dbType, dbDataSource);

boolean verifySqlSyntaxAndTestCon = dbDataSource.isVerifySqlSyntaxAndTestCon();
if (verifySqlSyntaxAndTestCon) {
// validate parameters, test db connectivity, validate SQL syntax
SQLUtils.validateDataSourceParameters(dbType, dbDataSource);
}

long endTimeMillis = System.currentTimeMillis();
logger.info(
Expand Down Expand Up @@ -91,7 +94,7 @@ public void prepareData() throws DatasetException {

long endTimeMillis = System.currentTimeMillis();
logger.info(
" ==> data source processor stage prepare data end merge chunk data, datasetId: {}, cvsFilePath: {}, cost(ms): {}",
" ==> data source processor stage prepare data end, datasetId: {}, cvsFilePath: {}, cost(ms): {}",
datasetId,
cvsFilePath,
endTimeMillis - startTimeMillis);
Expand Down Expand Up @@ -149,12 +152,15 @@ public void uploadData() throws DatasetException {

String csvFilePath = dataSourceProcessorContext.getCvsFilePath();
UserInfo userInfo = dataSourceProcessorContext.getUserInfo();
DataSourceMeta dataSourceMeta = dataSourceProcessorContext.getDataSourceMeta();
DatasetConfig datasetConfig = dataSourceProcessorContext.getDatasetConfig();

FileStorageInterface fileStorage = dataSourceProcessorContext.getFileStorage();

try {
String userDatasetPath =
WeDPRCommonConfig.getUserDatasetPath(userInfo.getUser(), datasetId);
datasetConfig.getDatasetStoragePath(
userInfo.getUser(), datasetId, dataSourceMeta.dynamicDataSource());

StoragePath storagePath = fileStorage.upload(true, csvFilePath, userDatasetPath, false);

Expand All @@ -164,6 +170,7 @@ public void uploadData() throws DatasetException {
.getDataset()
.setDatasetStorageType(fileStorage.type().toString());
this.dataSourceProcessorContext.getDataset().setDatasetStoragePath(storagePathStr);
this.dataSourceProcessorContext.setStoragePath(storagePath);

long endTimeMillis = System.currentTimeMillis();
logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ public interface DataSourceProcessor {
default void setContext(DataSourceProcessorContext context) {}

// prepare data
// ie:
// merge chunk data
// convert excel to csv
// ie: merge chunk data 、convert excel to csv
void prepareData() throws DatasetException;

// analyze data
Expand All @@ -32,9 +30,9 @@ default void setContext(DataSourceProcessorContext context) {}
// process
default void processData(DataSourceProcessorContext context) throws DatasetException {
try {
// init context
setContext(context);
// preprocess data
// ie: convert data to .cvs format, and other operations
prepareData();
// data analysis, reading data fields and data volume
analyzeData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.webank.wedpr.components.dataset.mapper.wapper.DatasetTransactionalWrapper;
import com.webank.wedpr.components.dataset.service.ChunkUploadApi;
import com.webank.wedpr.components.storage.api.FileStorageInterface;
import com.webank.wedpr.components.storage.api.StoragePath;
import lombok.Builder;
import lombok.Data;

Expand All @@ -25,4 +26,5 @@ public class DataSourceProcessorContext {
// intermediate state
private String cvsFilePath;
private String mergedFilePath;
private StoragePath storagePath;
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package com.webank.wedpr.components.dataset.datasource.processor;

import com.webank.wedpr.components.dataset.config.DatasetConfig;
import com.webank.wedpr.components.dataset.dao.Dataset;
import com.webank.wedpr.components.dataset.datasource.DataSourceMeta;
import com.webank.wedpr.components.dataset.datasource.category.HdfsDataSource;
import com.webank.wedpr.components.dataset.exception.DatasetException;
import com.webank.wedpr.components.dataset.utils.CsvUtils;
import com.webank.wedpr.components.dataset.utils.FileUtils;
import com.webank.wedpr.components.dataset.utils.JsonUtils;
import com.webank.wedpr.components.storage.api.FileStorageInterface;
import com.webank.wedpr.components.storage.impl.hdfs.HDFSStoragePath;
import com.webank.wedpr.core.protocol.StorageType;
import com.webank.wedpr.core.utils.Common;
import java.io.File;
import java.util.Arrays;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -33,6 +41,15 @@ public DataSourceMeta parseDataSourceMeta(String strDataSourceMeta) throws Datas
Common.requireNonEmpty("filePath", filePath);

FileStorageInterface fileStorage = dataSourceProcessorContext.getFileStorage();

StorageType storageType = fileStorage.type();
if (!storageType.getName().equalsIgnoreCase(StorageType.HDFS.getName())) {
// NOT HDFS Storage
logger.error("Not supported for HDFS data source, type: {}", storageType);
throw new DatasetException(
"Not supported for HDFS data source, type: " + storageType.getName());
}

checkFileExists(fileStorage, filePath);

long endTimeMillis = System.currentTimeMillis();
Expand All @@ -56,16 +73,108 @@ public void checkFileExists(FileStorageInterface storageInterface, String filePa
}

@Override
public void prepareData() throws DatasetException {}
public void prepareData() throws DatasetException {
long startTimeMillis = System.currentTimeMillis();

DatasetConfig datasetConfig = dataSourceProcessorContext.getDatasetConfig();
FileStorageInterface fileStorage = dataSourceProcessorContext.getFileStorage();
HdfsDataSource hdfsDataSource =
(HdfsDataSource) dataSourceProcessorContext.getDataSourceMeta();

Dataset dataset = dataSourceProcessorContext.getDataset();
String datasetId = dataset.getDatasetId();

String datasetBaseDir = datasetConfig.getDatasetBaseDir();
String cvsFilePath = datasetBaseDir + File.separator + datasetId;

String filePath = hdfsDataSource.getFilePath();
HDFSStoragePath hdfsStoragePath = new HDFSStoragePath(filePath);
fileStorage.download(hdfsStoragePath, cvsFilePath);

dataSourceProcessorContext.setCvsFilePath(cvsFilePath);

long endTimeMillis = System.currentTimeMillis();
logger.info(
" ==> data source processor stage prepare data end, datasetId: {}, cvsFilePath: {}, cost(ms): {}",
datasetId,
cvsFilePath,
endTimeMillis - startTimeMillis);
}

@Override
public void analyzeData() throws DatasetException {}
public void analyzeData() throws DatasetException {
String cvsFilePath = dataSourceProcessorContext.getCvsFilePath();
Dataset dataset = dataSourceProcessorContext.getDataset();

long startTimeMillis = System.currentTimeMillis();

// read csv header field
List<String> fieldList = CsvUtils.readCsvHeader(cvsFilePath);

// [ x, y ,z] => x,y,z
String fieldListString = Arrays.toString(fieldList.toArray());
String fieldString =
fieldListString
.replace("'", "")
.replace("\\r", "")
.replace("[", "")
.replace("]", "")
.trim();

int columnNum = fieldList.size();
int rowNum = FileUtils.getFileLinesNumber(cvsFilePath);
String md5Hash = FileUtils.calculateFileHash(cvsFilePath, "MD5");
long fileSize = FileUtils.getFileSize(cvsFilePath);

this.dataSourceProcessorContext.getDataset().setDatasetFields(fieldString);
this.dataSourceProcessorContext.getDataset().setDatasetColumnCount(columnNum);
this.dataSourceProcessorContext.getDataset().setDatasetRecordCount(rowNum);
this.dataSourceProcessorContext.getDataset().setDatasetVersionHash(md5Hash);
this.dataSourceProcessorContext.getDataset().setDatasetSize(fileSize);

String datasetId = dataset.getDatasetId();

long endTimeMillis = System.currentTimeMillis();
logger.info(
" => data source processor stage analyze data end, datasetId: {}, fieldString: {}, columnNum: {}, rowNum: {}, cost(ms): {}",
datasetId,
fieldString,
columnNum,
rowNum,
endTimeMillis - startTimeMillis);
}

@Override
public void uploadData() throws DatasetException {
// do nothing
}

@Override
public void cleanupData() throws DatasetException {}
public void cleanupData() throws DatasetException {
long startTimeMillis = System.currentTimeMillis();
DatasetConfig datasetConfig = dataSourceProcessorContext.getDatasetConfig();
Dataset dataset = dataSourceProcessorContext.getDataset();

String datasetId = dataset.getDatasetId();
String datasetBaseDir = datasetConfig.getDatasetBaseDir();
String cvsFilePath = datasetBaseDir + File.separator + datasetId;
try {
FileUtils.deleteDirectory(new File(cvsFilePath));
logger.info(
"remove temp csv success, datasetId: {}, cvsFilePath: {}",
datasetId,
cvsFilePath);
} catch (Exception e) {
logger.warn(
"remove temp csv failed, datasetId: {}, cvsFilePath: {}",
datasetId,
cvsFilePath);
}

long endTimeMillis = System.currentTimeMillis();
logger.info(
" => data source processor stage cleanup data end, datasetId: {}, cost(ms): {}",
datasetId,
endTimeMillis - startTimeMillis);
}
}
Loading

0 comments on commit 89a39e4

Please sign in to comment.