Skip to content

Commit

Permalink
Lots of Goodies
Browse files Browse the repository at this point in the history
New Dataflow editor
Support Different Delimiters
Auto Detect File delimiters
Bug Fixes
New Age Look and Feel
  • Loading branch information
datasetutil committed May 9, 2015
1 parent 6f2af10 commit b0f10d9
Show file tree
Hide file tree
Showing 21 changed files with 1,250 additions and 162 deletions.
44 changes: 44 additions & 0 deletions src/main/java/com/sforce/dataset/Config.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2014, salesforce.com, inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are permitted provided
* that the following conditions are met:
*
* Redistributions of source code must retain the above copyright notice, this list of conditions and the
* following disclaimer.
*
* Redistributions in binary form must reproduce the above copyright notice, this list of conditions and
* the following disclaimer in the documentation and/or other materials provided with the distribution.
*
* Neither the name of salesforce.com, inc. nor the names of its contributors may be used to endorse or
* promote products derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
* TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

package com.sforce.dataset;



public class Config {
public String proxyUsername = null;
public String proxyPassword = null;
public String proxyNtlmDomain = null;
public String proxyHost = null;
public int proxyPort = 0;

public int timeoutSecs = 540;
public int connectionTimeoutSecs = 60;

public boolean noCompression = false;

public boolean debugMessages = false;
}
62 changes: 62 additions & 0 deletions src/main/java/com/sforce/dataset/DatasetUtilConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,18 @@
package com.sforce.dataset;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.Charset;

import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.input.BOMInputStream;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.sforce.dataset.util.DatasetUtils;


public class DatasetUtilConstants {
Expand All @@ -52,11 +61,16 @@ public class DatasetUtilConstants {
public static final String errorDirName = "error";
public static final String dataDirName = "data";
public static final String configDirName = "config";
public static final String configFileName = "settings.json";
public static final String debugFileName = "debug.log";



public static final String errorCsvParam = "ERROR_CSV";
public static final String metadataJsonParam = "METADATA_JSON";
public static final String hdrIdParam = "HEADER_ID";
public static final String serverStatusParam = "SERVER_STATUS";
public static final String clientId = "com.sforce.dataset.utils";



Expand Down Expand Up @@ -139,4 +153,52 @@ public static final File getConfigDir(String orgId)
return logsDir;
}

public static final File getDebugFile()
{
File logsDir = new File(DatasetUtilConstants.currentDir,logsDirName);
try {
FileUtils.forceMkdir(logsDir);
} catch (IOException e) {
e.printStackTrace();
}
File debugFile = new File(logsDir,debugFileName);
return debugFile;
}

public static final Config getSystemConfig()
{
Config conf = new Config();
File configDir = new File(DatasetUtilConstants.currentDir,configDirName);
try {
FileUtils.forceMkdir(configDir);
} catch (IOException e) {
e.printStackTrace();
}
File configFile = new File(configDir,configFileName);
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
if(configFile != null && configFile.exists())
{
InputStreamReader reader = null;
try {
reader = new InputStreamReader(new BOMInputStream(new FileInputStream(configFile), false), DatasetUtils.utf8Decoder(null , Charset.forName("UTF-8")));
conf = mapper.readValue(reader, Config.class);
} catch (Throwable e) {
e.printStackTrace();
}finally
{
IOUtils.closeQuietly(reader);
}
}else
{
try
{
mapper.writerWithDefaultPrettyPrinter().writeValue(configFile, conf);
} catch (Throwable e) {
e.printStackTrace();
}
}
return conf;
}

}
25 changes: 12 additions & 13 deletions src/main/java/com/sforce/dataset/DatasetUtilMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,7 @@ else if(args[i-1].equalsIgnoreCase("--codingErrorAction"))
}catch(Throwable me)
{
}
}

}
}

if(params.sessionId==null)
Expand Down Expand Up @@ -382,15 +381,15 @@ else if(args[i-1].equalsIgnoreCase("--codingErrorAction"))
System.out.println("*******************************************************************************\n");
System.out.println();

System.out.println("\n*******************************************************************************");
try {
DatasetUtilServer datasetUtilServer = new DatasetUtilServer();
datasetUtilServer.init(args, false);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("*******************************************************************************\n");
System.out.println();
// System.out.println("\n*******************************************************************************");
// try {
// DatasetUtilServer datasetUtilServer = new DatasetUtilServer();
// datasetUtilServer.init(args, false);
// } catch (Exception e) {
// e.printStackTrace();
// }
// System.out.println("*******************************************************************************\n");
// System.out.println();

while(true)
{
Expand Down Expand Up @@ -437,7 +436,7 @@ public static void printUsage()
System.out.println("--sessionId : (Optional) the salesforce sessionId. if specified,specify endpoint");
System.out.println("--fileEncoding : (Optional) the encoding of the inputFile default UTF-8");
System.out.println("--uploadFormat : (Optional) the whether to upload as binary or csv. default binary");
System.out.println("--createNewDateParts : (Optional) wether to create new date parts");
// System.out.println("--createNewDateParts : (Optional) wether to create new date parts");
// System.out.println("jsonConfig: (Optional) the dataflow definition json file");
System.out.println("*******************************************************************************\n");
System.out.println("Usage Example 1: Upload a csv to a dataset");
Expand Down Expand Up @@ -1003,7 +1002,7 @@ public static File validateInputFile(String inputFile, String action)
}

String ext = FilenameUtils.getExtension(temp.getName());
if(ext == null || !(ext.equalsIgnoreCase("csv") || ext.equalsIgnoreCase("bin") || ext.equalsIgnoreCase("gz") || ext.equalsIgnoreCase("json")))
if(ext == null || !(ext.equalsIgnoreCase("csv") || ext.equalsIgnoreCase("txt") || ext.equalsIgnoreCase("bin") || ext.equalsIgnoreCase("gz") || ext.equalsIgnoreCase("json")))
{
System.out.println("\nERROR: inputFile does not have valid extension");
return null;
Expand Down
63 changes: 53 additions & 10 deletions src/main/java/com/sforce/dataset/loader/DatasetLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,54 @@ public static boolean uploadDataset(String inputFileString,
schemaFile = ExternalFileSchema.getSchemaFile(inputFile, logger);
ExternalFileSchema schema = null;

if(schemaFile != null && schemaFile.exists() && schemaFile.length()>0)
//If this is incremental, fetch last uploaded json instead of generating a new one
if(schemaFile == null || !schemaFile.exists() || schemaFile.length()==0)
{
//If this is incremental, fetch last uploaded json instead of generating a new one
if(Operation.equalsIgnoreCase("Append") || (Operation.equalsIgnoreCase("Upsert")) || (Operation.equalsIgnoreCase("Delete")))
{
schema = getLastUploadedJson(partnerConnection, datasetAlias, logger);
if(schemaFile != null && schema !=null)
{
ExternalFileSchema.save(schemaFile, schema, logger);
}
}
}

// if(schema!=null)
// {
// if(schemaFile != null && schemaFile.exists() && schemaFile.length()>0)
// {
// ExternalFileSchema extSchema = ExternalFileSchema.load(schemaFile, inputFileCharset, logger);
// ExternalFileSchema.mergeExtendedFields(extSchema, schema, logger);
// }else if(schemaFile != null)
// {
// ExternalFileSchema.save(schemaFile, schema, logger);
// }
// }

CsvPreference pref = null;
String fileExt = FilenameUtils.getExtension(inputFile.getName());
boolean isParsable = false;
if(fileExt != null && (fileExt.equalsIgnoreCase("csv") || fileExt.equalsIgnoreCase("txt") ))
{
isParsable = true;
// if(!fileExt.equalsIgnoreCase("csv"))
// {
// char sep = SeparatorGuesser.guessSeparator(inputFile, inputFileCharset, true);
// if(sep!=0)
// {
// pref = new CsvPreference.Builder((char) CsvPreference.STANDARD_PREFERENCE.getQuoteChar(), sep, CsvPreference.STANDARD_PREFERENCE.getEndOfLineSymbols()).build();
// }else
// {
// throw new DatasetLoaderException("Failed to determine field Delimiter for file {"+inputFile+"}");
// }
// }else
// {
// pref = CsvPreference.STANDARD_PREFERENCE;
// }
}



if(session.isDone())
{
Expand All @@ -260,14 +300,14 @@ public static boolean uploadDataset(String inputFileString,
if(schema==null)
{
logger.println("\n*******************************************************************************");
if(FilenameUtils.getExtension(inputFile.getName()).equalsIgnoreCase("csv"))
if(isParsable)
{
if(schemaFile != null && schemaFile.exists() && schemaFile.length()>0)
session.setStatus("LOADING SCHEMA");
else
session.setStatus("DETECTING SCHEMA");

schema = ExternalFileSchema.init(inputFile, inputFileCharset, logger);
schema = ExternalFileSchema.init(inputFile, inputFileCharset,logger);
if(schema==null)
{
logger.println("Failed to parse schema file {"+ ExternalFileSchema.getSchemaFile(inputFile, logger) +"}");
Expand All @@ -277,7 +317,7 @@ public static boolean uploadDataset(String inputFileString,
{
if(schemaFile != null && schemaFile.exists() && schemaFile.length()>0)
session.setStatus("LOADING SCHEMA");
schema = ExternalFileSchema.load(inputFile, inputFileCharset, logger);
schema = ExternalFileSchema.load(inputFile, inputFileCharset,logger);
if(schema==null)
{
logger.println("Failed to load schema file {"+ ExternalFileSchema.getSchemaFile(inputFile, logger) +"}");
Expand All @@ -299,6 +339,9 @@ public static boolean uploadDataset(String inputFileString,
{
throw new DatasetLoaderException("Schema File {"+ExternalFileSchema.getSchemaFile(inputFile, logger) +"} has a uniqueId set. Choose 'Upsert' operation instead");
}

pref = new CsvPreference.Builder((char) CsvPreference.STANDARD_PREFERENCE.getQuoteChar(), schema.getFileFormat().getFieldsDelimitedBy().charAt(0), CsvPreference.STANDARD_PREFERENCE.getEndOfLineSymbols()).build();

}

if(session.isDone())
Expand Down Expand Up @@ -370,7 +413,7 @@ public static boolean uploadDataset(String inputFileString,
throw new DatasetLoaderException("Operation terminated on user request");
}

inputFile = CsvExternalSort.sortFile(inputFile, inputFileCharset, false, 1, schema);
inputFile = CsvExternalSort.sortFile(inputFile, inputFileCharset, false, 1, schema, pref);

if(session.isDone())
{
Expand All @@ -380,7 +423,7 @@ public static boolean uploadDataset(String inputFileString,
//Create the Bin file
//File binFile = new File(csvFile.getParent(), datasetName + ".bin");
File gzbinFile = inputFile;
if(!FilenameUtils.getExtension(inputFile.getName()).equalsIgnoreCase("csv"))
if(!isParsable)
{
if(!FilenameUtils.getExtension(inputFile.getName()).equalsIgnoreCase("gz") || !FilenameUtils.getExtension(inputFile.getName()).equalsIgnoreCase("zip"))
{
Expand All @@ -393,7 +436,7 @@ public static boolean uploadDataset(String inputFileString,
File lastgzbinFile = new File(datasetArchiveDir, hdrId + "." + FilenameUtils.getBaseName(inputFile.getName()) + ".gz");
if(!lastgzbinFile.exists())
{
if(uploadFormat.equalsIgnoreCase("binary") && FilenameUtils.getExtension(inputFile.getName()).equalsIgnoreCase("csv"))
if(uploadFormat.equalsIgnoreCase("binary") && isParsable)
{
FileOutputStream fos = null;
BufferedOutputStream out = null;
Expand All @@ -413,11 +456,11 @@ public static boolean uploadDataset(String inputFileString,
long errorRowCount = 0;
long startTime = System.currentTimeMillis();
EbinFormatWriter ebinWriter = new EbinFormatWriter(out, schema.getObjects().get(0).getFields().toArray(new FieldType[0]), logger);
ErrorWriter errorWriter = new ErrorWriter(inputFile,",");
ErrorWriter errorWriter = new ErrorWriter(inputFile,",", pref);

session.setParam(DatasetUtilConstants.errorCsvParam, errorWriter.getErrorFile().getAbsolutePath());

CsvListReader reader = new CsvListReader(new InputStreamReader(new BOMInputStream(new FileInputStream(inputFile), false), DatasetUtils.utf8Decoder(codingErrorAction , inputFileCharset )), CsvPreference.STANDARD_PREFERENCE);
CsvListReader reader = new CsvListReader(new InputStreamReader(new BOMInputStream(new FileInputStream(inputFile), false), DatasetUtils.utf8Decoder(codingErrorAction , inputFileCharset )), pref);
WriterThread writer = new WriterThread(q, ebinWriter, errorWriter, logger,session);
Thread th = new Thread(writer,"Writer-Thread");
th.setDaemon(true);
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/sforce/dataset/loader/ErrorWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class ErrorWriter {



public ErrorWriter(File inputCsv,String delimiter)
public ErrorWriter(File inputCsv,String delimiter, CsvPreference pref)
throws IOException
{
if(inputCsv==null|| !inputCsv.exists())
Expand All @@ -81,7 +81,7 @@ public ErrorWriter(File inputCsv,String delimiter)

this.delimiter = delimiter;

CsvListReader reader = new CsvListReader(new InputStreamReader(new BOMInputStream(new FileInputStream(inputCsv), false), DatasetUtils.utf8Decoder(CodingErrorAction.IGNORE, null)), CsvPreference.STANDARD_PREFERENCE);
CsvListReader reader = new CsvListReader(new InputStreamReader(new BOMInputStream(new FileInputStream(inputCsv), false), DatasetUtils.utf8Decoder(CodingErrorAction.IGNORE, null)), pref);
headerColumns = reader.getHeader(true);
reader.close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public String getFieldsDelimitedBy() {
return fieldsDelimitedBy;
}
public void setFieldsDelimitedBy(String fieldsDelimitedBy) {
this.fieldsDelimitedBy = fieldsDelimitedBy;
if(fieldsDelimitedBy != null && !fieldsDelimitedBy.isEmpty())
this.fieldsDelimitedBy = fieldsDelimitedBy;
}
public char getFieldsEnclosedBy() {
return fieldsEnclosedBy;
Expand Down
Loading

0 comments on commit b0f10d9

Please sign in to comment.